1、背景

项目接入货邮数据,对方只提供msmq接入方式,网上找了个JNI实现,但是消息属性只有label和消息体。需要增加lookupid和消息到达时间属性,这两个属性是MSMQ的API有的

2、POJO 添加属性

upload successful

3、JNI方法

JNIEXPORT jint JNICALL Java_ionic_Msmq_Queue_nativeReceiveBytes
(JNIEnv *jniEnv, jobject object, jobject msg, jint timeout, jint ReadOrPeek)
{
    HRESULT  hr = 0;
    try {
        MsmqQueue *q = GetReceiverQueue(jniEnv, object, NULL, &hr);
        if (hr != 0) return (jint)hr;
        // get message from the Queue
        WCHAR wszMessageLabel[MQ_MAX_MSG_LABEL_LEN] = L"";
        BYTE pCorrelationId[PROPID_M_CORRELATIONID_SIZE];
        ULONGLONG pLookupId = 0;
        ULONG arrdt = 0; // msg arrive time
        // initialize variables
        BYTE* pbMessage = NULL;
        DWORD dwMessageLength = 0;
        hr = q->receiveBytes(&pbMessage,
            &dwMessageLength,
            (WCHAR*)wszMessageLabel,
            pCorrelationId,
            &pLookupId,
            &arrdt,
            timeout,
            ReadOrPeek);
        if (hr == 0) {
            CHAR szLabel[MQ_MAX_MSG_LABEL_LEN];
            int len = wcslen(wszMessageLabel);
            int rc = 0;
            if (len > 0)
                rc = WideCharToMultiByte(
                (UINT)CP_ACP,             // code page
                (DWORD)0,                 // conversion flags
                (LPCWSTR)wszMessageLabel, // wide-character string to convert
                len,                       // number of chars in string.
                (LPSTR)szLabel,           // buffer for new string
                MQ_MAX_MSG_LABEL_LEN,      // size of buffer
                (LPCSTR)NULL,             // default for unmappable chars
                (LPBOOL)NULL              // set when default char used
                );
            // terminate
            if (rc>0)
                szLabel[rc] = '\0';
            else if (rc<0)
                szLabel[0] = '\0';
            SetJavaByteArray(jniEnv, msg, "_messageBody", pbMessage, dwMessageLength);
            SetJavaString(jniEnv, msg, "_label", (char *)szLabel);
            SetJavaByteArray(jniEnv, msg, "_correlationId", pCorrelationId, PROPID_M_CORRELATIONID_SIZE);
                char buf[1024];
                memset(buf,0x00,sizeof buf);
                sprintf(buf,"%I64d",pLookupId);
            SetJavaString(jniEnv, msg, "_lookupId", buf);
            //char buff[1024];
            //memset(buff,0x00,sizeof buff);

            //sprintf(buff,"%d",arrdt);
            //MessageBoxA(NULL,buff,"",MB_OK);
            setJavaDate(jniEnv, msg, "arrTime",arrdt);
        }
        delete[] pbMessage;

        if (hr != 0)  return hr;
    }
    catch (...) {
        DIAG("Read() : Exception\n");
        jniEnv->ExceptionDescribe();
        jniEnv->ExceptionClear();
        hr = -99;
    }
    fflush(stdout);
    return (jint)hr;
}
![upload successful](/images/pasted-88.png)

4、接收消息实现

HRESULT receiveBytes(
        BYTE    **ppbMessageBody,
        DWORD   *dwBodyLen,
        WCHAR   *swzMessageLabel,
        BYTE    *pCorrelationId,
        ULONGLONG*    pLookupId,
        ULONG* arrtm,
        DWORD   dwTimeOut,
        int     ReadOrPeek
        );


HRESULT MsmqQueue::receiveBytes(BYTE  **ppbMessageBody,
    DWORD *dwpBodyLen,
    WCHAR *wszMessageLabel,
    BYTE  *pCorrelationId, // sz PROPID_M_CORRELATIONID_SIZE
    ULONGLONG*  pLookupId,
    ULONG* arrtm,
    DWORD dwTimeOut,
    int   ReadOrPeek
    )
{
    // for receive message
    const int     NUMBEROFPROPERTIES = 7;
    MQMSGPROPS    MsgProps;
    MQPROPVARIANT fields[NUMBEROFPROPERTIES];
    MSGPROPID     propId[NUMBEROFPROPERTIES];
    DWORD         i = 0;
    DWORD         dwAction = (ReadOrPeek == 1) ? MQ_ACTION_RECEIVE : MQ_ACTION_PEEK_CURRENT;
    HRESULT       hr = S_OK;
    int           iBodyLen = 0;
    int           iBody = 0;
    int           iLabelLen = 0;
    int           iLookUpid = 0;
    int           iarrtm = 0;
    ULONG         ulLabelLen = MQ_MAX_MSG_LABEL_LEN;
    // initialize all out variables to NULL
    if (NULL != wszMessageLabel)
        *wszMessageLabel = L'\0';
    if (NULL != pCorrelationId)
        memset(pCorrelationId, 0, PROPID_M_CORRELATIONID_SIZE);
    int MAX_INITIAL_BODY_SIZE = 1024;
    *ppbMessageBody = new BYTE[MAX_INITIAL_BODY_SIZE];
    // prepare the property array PROPVARIANT of
    // message properties that we want to receive
    propId[i] = PROPID_M_BODY_SIZE;
    fields[i].vt = VT_UI4;
    fields[i].ulVal = *dwpBodyLen;
    iBodyLen = i;
    i++;
    propId[i] = PROPID_M_BODY;
    fields[i].vt = VT_VECTOR | VT_UI1;
    fields[i].caub.cElems = *dwpBodyLen;
    fields[i].caub.pElems = (unsigned char *)*ppbMessageBody;
    iBody = i;
    i++;
    if (NULL != pCorrelationId)
    {
        propId[i] = PROPID_M_CORRELATIONID;
        fields[i].vt = VT_VECTOR | VT_UI1;
        fields[i].caub.pElems = (LPBYTE)pCorrelationId;
        fields[i].caub.cElems = PROPID_M_CORRELATIONID_SIZE;
        i++;
    }
    propId[i] = PROPID_M_LABEL_LEN;
    fields[i].vt = VT_UI4;
    fields[i].ulVal = ulLabelLen;
    iLabelLen = i;
    i++;
    propId[i] = PROPID_M_LABEL;
    fields[i].vt = VT_LPWSTR;
    fields[i].pwszVal = wszMessageLabel;
    i++;
    // lookupid
    iLookUpid=i;
    propId[i] = PROPID_M_LOOKUPID;
    fields[i].vt = VT_UI8;
    i++;
    iarrtm = i;
    propId[i] = PROPID_M_ARRIVEDTIME;
    fields[i].vt = VT_NULL;
    i++;

    // Set the MQMSGPROPS structure
    MsgProps.cProp = i;            // Number of properties.
    MsgProps.aPropID = propId;      // Id of properties.
    MsgProps.aPropVar = fields; // Value of properties.
    MsgProps.aStatus = NULL;         // No Error report.
    hr = MQReceiveMessage(
        hQueue,              // handle to the Queue.
        dwTimeOut,           // Max time (msec) to wait for the message.
        dwAction,            // Action.
        &MsgProps,           // properties to retrieve.
        NULL,                // No overlaped structure.
        NULL,                // No callback function.
        NULL,                // No Cursor.
        NULL                 // transaction
        );
    // handle the case where the buffer is too small
    do
    {
        if (MQ_ERROR_BUFFER_OVERFLOW == hr)
        {
            if (NULL != *ppbMessageBody)
            {
                delete[] * ppbMessageBody;
                *ppbMessageBody = NULL;
            }
            INT iNewMsgLen = fields[iBodyLen].ulVal;
            *ppbMessageBody = new BYTE[iNewMsgLen];
            fields[iBody].caub.cElems =
                fields[iBodyLen].ulVal;
            fields[iBody].caub.pElems =
                (unsigned char *)*ppbMessageBody;
            hr = MQReceiveMessage(
                hQueue,              // handle to the Queue.
                dwTimeOut,           // Max time (msec) to wait for the message.
                dwAction,            // Action.
                &MsgProps,           // properties to retrieve.
                NULL,                // No overlapped structure.
                NULL,                // No callback function.
                NULL,                // No Cursor.
                NULL                 // transaction
                );
        }
        if (hr == MQ_ERROR_LABEL_BUFFER_TOO_SMALL)
        {
            fields[iLabelLen].ulVal = ulLabelLen;
            fields[iLabelLen + 1].pwszVal = wszMessageLabel;
            hr = MQReceiveMessage(
                hQueue,              // handle to the Queue.
                dwTimeOut,           // Max time (msec) to wait for the message.
                dwAction,            // Action.
                &MsgProps,           // properties to retrieve.
                NULL,                // No overlapped structure.
                NULL,                // No callback function.
                NULL,                // No Cursor.
                NULL                 // transaction
                );
        }
    } while (MQ_ERROR_BUFFER_OVERFLOW == hr);
    if (FAILED(hr))
    {
        delete[] * ppbMessageBody;
        *ppbMessageBody = NULL;
        return hr;
    }

    *dwpBodyLen = fields[iBodyLen].ulVal;
    if (0 == *dwpBodyLen)
    {
        delete[] * ppbMessageBody;
        *ppbMessageBody = NULL;
    }
    ulLabelLen = fields[iLabelLen].ulVal;
    *pLookupId = fields[iLookUpid].uhVal.QuadPart;
    *arrtm = fields[iarrtm].ulVal;
    return hr;
};

![upload successful](/images/pasted-90.png)

5、测试

Queue queuet = null;
        try {
            String recpath = "direct=tcp:192.168.182.128\\private$\\AirLogistics_Receipt";
            queuet = new Queue(recpath);
            Message sendMsg = new Message("test");
            queuet.send(sendMsg,TransactionType.SINGLE_MESSAGE);
            Message message = queuet.peek();
            System.out.println(DateFormatUtils.format(message.getArrTime(),"yyyy-MM-dd HH:mm:ss"));
        } catch (MessageQueueException | UnsupportedEncodingException e) {
            e.printStackTrace();
            System.out.println("MSMQ连接失败");
            System.exit(-6);
        }

upload successful

JNI调用JAVA有参构造函数

调用  java.util.Date 类的  Date(long ms)

// set date field
void setJavaDate(JNIEnv * jniEnv, jobject object, char * fieldName, const unsigned long valueToSet){

    jclass cls = jniEnv->GetObjectClass(object);
    jclass date = jniEnv->FindClass("java/util/Date");
    jmethodID dcs = jniEnv->GetMethodID(date,"<init>","(J)V");
    jlong ms = ((jlong)valueToSet)*1000;
    jobject dobj  = jniEnv->NewObject(date,dcs,ms);
    jfieldID fieldId;
    fieldId = jniEnv->GetFieldID(cls, fieldName, "Ljava/util/Date;");
    if (fieldId != 0)
        jniEnv->SetObjectField(object, fieldId, dobj);
}



中间件      MSMQ JNI

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!