1、背景
项目接入货邮数据,对方只提供msmq接入方式,网上找了个JNI实现,但是消息属性只有label和消息体。需要增加lookupid和消息到达时间属性,这两个属性是MSMQ的API有的
2、POJO 添加属性
3、JNI方法
JNIEXPORT jint JNICALL Java_ionic_Msmq_Queue_nativeReceiveBytes
(JNIEnv *jniEnv, jobject object, jobject msg, jint timeout, jint ReadOrPeek)
{
HRESULT hr = 0;
try {
MsmqQueue *q = GetReceiverQueue(jniEnv, object, NULL, &hr);
if (hr != 0) return (jint)hr;
// get message from the Queue
WCHAR wszMessageLabel[MQ_MAX_MSG_LABEL_LEN] = L"";
BYTE pCorrelationId[PROPID_M_CORRELATIONID_SIZE];
ULONGLONG pLookupId = 0;
ULONG arrdt = 0; // msg arrive time
// initialize variables
BYTE* pbMessage = NULL;
DWORD dwMessageLength = 0;
hr = q->receiveBytes(&pbMessage,
&dwMessageLength,
(WCHAR*)wszMessageLabel,
pCorrelationId,
&pLookupId,
&arrdt,
timeout,
ReadOrPeek);
if (hr == 0) {
CHAR szLabel[MQ_MAX_MSG_LABEL_LEN];
int len = wcslen(wszMessageLabel);
int rc = 0;
if (len > 0)
rc = WideCharToMultiByte(
(UINT)CP_ACP, // code page
(DWORD)0, // conversion flags
(LPCWSTR)wszMessageLabel, // wide-character string to convert
len, // number of chars in string.
(LPSTR)szLabel, // buffer for new string
MQ_MAX_MSG_LABEL_LEN, // size of buffer
(LPCSTR)NULL, // default for unmappable chars
(LPBOOL)NULL // set when default char used
);
// terminate
if (rc>0)
szLabel[rc] = '\0';
else if (rc<0)
szLabel[0] = '\0';
SetJavaByteArray(jniEnv, msg, "_messageBody", pbMessage, dwMessageLength);
SetJavaString(jniEnv, msg, "_label", (char *)szLabel);
SetJavaByteArray(jniEnv, msg, "_correlationId", pCorrelationId, PROPID_M_CORRELATIONID_SIZE);
char buf[1024];
memset(buf,0x00,sizeof buf);
sprintf(buf,"%I64d",pLookupId);
SetJavaString(jniEnv, msg, "_lookupId", buf);
//char buff[1024];
//memset(buff,0x00,sizeof buff);
//sprintf(buff,"%d",arrdt);
//MessageBoxA(NULL,buff,"",MB_OK);
setJavaDate(jniEnv, msg, "arrTime",arrdt);
}
delete[] pbMessage;
if (hr != 0) return hr;
}
catch (...) {
DIAG("Read() : Exception\n");
jniEnv->ExceptionDescribe();
jniEnv->ExceptionClear();
hr = -99;
}
fflush(stdout);
return (jint)hr;
}

4、接收消息实现
HRESULT receiveBytes(
BYTE **ppbMessageBody,
DWORD *dwBodyLen,
WCHAR *swzMessageLabel,
BYTE *pCorrelationId,
ULONGLONG* pLookupId,
ULONG* arrtm,
DWORD dwTimeOut,
int ReadOrPeek
);
HRESULT MsmqQueue::receiveBytes(BYTE **ppbMessageBody,
DWORD *dwpBodyLen,
WCHAR *wszMessageLabel,
BYTE *pCorrelationId, // sz PROPID_M_CORRELATIONID_SIZE
ULONGLONG* pLookupId,
ULONG* arrtm,
DWORD dwTimeOut,
int ReadOrPeek
)
{
// for receive message
const int NUMBEROFPROPERTIES = 7;
MQMSGPROPS MsgProps;
MQPROPVARIANT fields[NUMBEROFPROPERTIES];
MSGPROPID propId[NUMBEROFPROPERTIES];
DWORD i = 0;
DWORD dwAction = (ReadOrPeek == 1) ? MQ_ACTION_RECEIVE : MQ_ACTION_PEEK_CURRENT;
HRESULT hr = S_OK;
int iBodyLen = 0;
int iBody = 0;
int iLabelLen = 0;
int iLookUpid = 0;
int iarrtm = 0;
ULONG ulLabelLen = MQ_MAX_MSG_LABEL_LEN;
// initialize all out variables to NULL
if (NULL != wszMessageLabel)
*wszMessageLabel = L'\0';
if (NULL != pCorrelationId)
memset(pCorrelationId, 0, PROPID_M_CORRELATIONID_SIZE);
int MAX_INITIAL_BODY_SIZE = 1024;
*ppbMessageBody = new BYTE[MAX_INITIAL_BODY_SIZE];
// prepare the property array PROPVARIANT of
// message properties that we want to receive
propId[i] = PROPID_M_BODY_SIZE;
fields[i].vt = VT_UI4;
fields[i].ulVal = *dwpBodyLen;
iBodyLen = i;
i++;
propId[i] = PROPID_M_BODY;
fields[i].vt = VT_VECTOR | VT_UI1;
fields[i].caub.cElems = *dwpBodyLen;
fields[i].caub.pElems = (unsigned char *)*ppbMessageBody;
iBody = i;
i++;
if (NULL != pCorrelationId)
{
propId[i] = PROPID_M_CORRELATIONID;
fields[i].vt = VT_VECTOR | VT_UI1;
fields[i].caub.pElems = (LPBYTE)pCorrelationId;
fields[i].caub.cElems = PROPID_M_CORRELATIONID_SIZE;
i++;
}
propId[i] = PROPID_M_LABEL_LEN;
fields[i].vt = VT_UI4;
fields[i].ulVal = ulLabelLen;
iLabelLen = i;
i++;
propId[i] = PROPID_M_LABEL;
fields[i].vt = VT_LPWSTR;
fields[i].pwszVal = wszMessageLabel;
i++;
// lookupid
iLookUpid=i;
propId[i] = PROPID_M_LOOKUPID;
fields[i].vt = VT_UI8;
i++;
iarrtm = i;
propId[i] = PROPID_M_ARRIVEDTIME;
fields[i].vt = VT_NULL;
i++;
// Set the MQMSGPROPS structure
MsgProps.cProp = i; // Number of properties.
MsgProps.aPropID = propId; // Id of properties.
MsgProps.aPropVar = fields; // Value of properties.
MsgProps.aStatus = NULL; // No Error report.
hr = MQReceiveMessage(
hQueue, // handle to the Queue.
dwTimeOut, // Max time (msec) to wait for the message.
dwAction, // Action.
&MsgProps, // properties to retrieve.
NULL, // No overlaped structure.
NULL, // No callback function.
NULL, // No Cursor.
NULL // transaction
);
// handle the case where the buffer is too small
do
{
if (MQ_ERROR_BUFFER_OVERFLOW == hr)
{
if (NULL != *ppbMessageBody)
{
delete[] * ppbMessageBody;
*ppbMessageBody = NULL;
}
INT iNewMsgLen = fields[iBodyLen].ulVal;
*ppbMessageBody = new BYTE[iNewMsgLen];
fields[iBody].caub.cElems =
fields[iBodyLen].ulVal;
fields[iBody].caub.pElems =
(unsigned char *)*ppbMessageBody;
hr = MQReceiveMessage(
hQueue, // handle to the Queue.
dwTimeOut, // Max time (msec) to wait for the message.
dwAction, // Action.
&MsgProps, // properties to retrieve.
NULL, // No overlapped structure.
NULL, // No callback function.
NULL, // No Cursor.
NULL // transaction
);
}
if (hr == MQ_ERROR_LABEL_BUFFER_TOO_SMALL)
{
fields[iLabelLen].ulVal = ulLabelLen;
fields[iLabelLen + 1].pwszVal = wszMessageLabel;
hr = MQReceiveMessage(
hQueue, // handle to the Queue.
dwTimeOut, // Max time (msec) to wait for the message.
dwAction, // Action.
&MsgProps, // properties to retrieve.
NULL, // No overlapped structure.
NULL, // No callback function.
NULL, // No Cursor.
NULL // transaction
);
}
} while (MQ_ERROR_BUFFER_OVERFLOW == hr);
if (FAILED(hr))
{
delete[] * ppbMessageBody;
*ppbMessageBody = NULL;
return hr;
}
*dwpBodyLen = fields[iBodyLen].ulVal;
if (0 == *dwpBodyLen)
{
delete[] * ppbMessageBody;
*ppbMessageBody = NULL;
}
ulLabelLen = fields[iLabelLen].ulVal;
*pLookupId = fields[iLookUpid].uhVal.QuadPart;
*arrtm = fields[iarrtm].ulVal;
return hr;
};

5、测试
Queue queuet = null;
try {
String recpath = "direct=tcp:192.168.182.128\\private$\\AirLogistics_Receipt";
queuet = new Queue(recpath);
Message sendMsg = new Message("test");
queuet.send(sendMsg,TransactionType.SINGLE_MESSAGE);
Message message = queuet.peek();
System.out.println(DateFormatUtils.format(message.getArrTime(),"yyyy-MM-dd HH:mm:ss"));
} catch (MessageQueueException | UnsupportedEncodingException e) {
e.printStackTrace();
System.out.println("MSMQ连接失败");
System.exit(-6);
}
JNI调用JAVA有参构造函数
调用 java.util.Date 类的 Date(long ms)
// set date field
void setJavaDate(JNIEnv * jniEnv, jobject object, char * fieldName, const unsigned long valueToSet){
jclass cls = jniEnv->GetObjectClass(object);
jclass date = jniEnv->FindClass("java/util/Date");
jmethodID dcs = jniEnv->GetMethodID(date,"<init>","(J)V");
jlong ms = ((jlong)valueToSet)*1000;
jobject dobj = jniEnv->NewObject(date,dcs,ms);
jfieldID fieldId;
fieldId = jniEnv->GetFieldID(cls, fieldName, "Ljava/util/Date;");
if (fieldId != 0)
jniEnv->SetObjectField(object, fieldId, dobj);
}
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!