C-C++ Code Example: Reading Messages Synchronously
Applies To: Windows 10, Windows 7, Windows 8, Windows 8.1, Windows Server 2008, Windows Server 2008 R2, Windows Server 2012, Windows Server 2012 R2, Windows Server Technical Preview, Windows Vista
The following example provides an application-defined function that synchronously reads each message in a known queue, removing each message as it is read.
For information about reading messages synchronously, see Synchronous Reading.
To read messages synchronously
Define the maximum number of queue properties to be specified and the queue property counter.
Define an MQMSGPROPS structure.
Specify the message properties to retrieve. This example retrieves PROPID_M_LABEL, PROPID_M_LABEL_LEN, PROPID_M_BODY, PROPID_M_BODY_SIZE, and PROPID_M_BODY_TYPE.
Note
The maximum label length, including the string-terminating character, is MQ_MAX_MSG_LABEL_LEN (250 Unicode characters). This value must be reset for each message.
Initialize the MQMSGPROPS structure.
Create the format name of the queue. This format name is used to open the queue. Note that the input strings used to create the format name are validated in this function, but it is the responsibility of the caller to ensure that these strings are null-terminated.
Call MQOpenQueue to open the queue with receive access. Receive access allows the application to peek at or remove the messages in the queue.
In a loop, call MQReceiveMessage to retrieve the messages in the queue. If the body buffer is too small for any of the messages, reallocate memory for it.
Call MQCloseQueue and free the resources.
Code Example
The following code example can be run on all versions of Message Queuing.
HRESULT ReadingDestQueue(
WCHAR * wszQueueName,
WCHAR * wszComputerName
)
{
// Define the required constants and variables.
const int NUMBEROFPROPERTIES = 5;
DWORD cPropId = 0;
HRESULT hr = MQ_OK; // Return code
HANDLE hQueue = NULL; // Queue handle
ULONG ulBufferSize = 2;
// Define an MQMSGPROPS structure.
MQMSGPROPS msgprops;
MSGPROPID aMsgPropId[NUMBEROFPROPERTIES];
MQPROPVARIANT aMsgPropVar[NUMBEROFPROPERTIES];
HRESULT aMsgStatus[NUMBEROFPROPERTIES];
// Specify the message properties to be retrieved.
aMsgPropId[cPropId] = PROPID_M_LABEL_LEN; // Property ID
aMsgPropVar[cPropId].vt = VT_UI4; // Type indicator
aMsgPropVar[cPropId].ulVal = MQ_MAX_MSG_LABEL_LEN; // Length of label
cPropId++;
WCHAR wszLabelBuffer[MQ_MAX_MSG_LABEL_LEN]; // Label buffer
aMsgPropId[cPropId] = PROPID_M_LABEL; // Property ID
aMsgPropVar[cPropId].vt = VT_LPWSTR; // Type indicator
aMsgPropVar[cPropId].pwszVal = wszLabelBuffer; // Label buffer
cPropId++;
UCHAR * pucBodyBuffer = NULL;
pucBodyBuffer = (UCHAR*)malloc(ulBufferSize);
if (pucBodyBuffer == NULL)
{
return MQ_ERROR_INSUFFICIENT_RESOURCES;
}
memset(pucBodyBuffer, 0, ulBufferSize);
aMsgPropId[cPropId] = PROPID_M_BODY_SIZE; // Property ID
aMsgPropVar[cPropId].vt = VT_NULL; // Type indicator
cPropId++;
aMsgPropId[cPropId] = PROPID_M_BODY; // Property ID
aMsgPropVar[cPropId].vt = VT_VECTOR | VT_UI1; // Type indicator
aMsgPropVar[cPropId].caub.pElems = (UCHAR*)pucBodyBuffer; // Body buffer
aMsgPropVar[cPropId].caub.cElems = ulBufferSize; // Buffer size
cPropId++;
aMsgPropId[cPropId] = PROPID_M_BODY_TYPE; // Property ID
aMsgPropVar[cPropId].vt = VT_NULL; // Type indicator
cPropId++;
// Initialize the MQMSGPROPS structure.
msgprops.cProp = cPropId; // Number of message properties
msgprops.aPropID = aMsgPropId; // IDs of the message properties
msgprops.aPropVar = aMsgPropVar; // Values of the message properties
msgprops.aStatus = aMsgStatus; // Error reports
// Validate the input strings.
if (wszQueueName == NULL || wszComputerName == NULL)
{
return MQ_ERROR_INVALID_PARAMETER;
}
// Create a direct format name.
WCHAR * wszFormatName = NULL;
DWORD dwFormatNameLength = 0;
dwFormatNameLength = wcslen(wszQueueName) + wcslen(wszComputerName) + 12;
wszFormatName = new WCHAR[dwFormatNameLength];
if (wszFormatName == NULL)
{
return MQ_ERROR_INSUFFICIENT_RESOURCES;
}
memset(wszFormatName, 0, dwFormatNameLength);
// ************************************
// You must concatenate "DIRECT=OS:", wszComputerName, "\",
// and wszQueueName into the wszFormatName buffer.
// wszFormatName = "DIRECT=OS:" + wszComputerName + "\" +
// wszQueueName
// If the format name is too long for the buffer, return FALSE.
// ************************************
// Open the queue with receive access.
hr = MQOpenQueue(
wszFormatName, // Format name of the queue
MQ_RECEIVE_ACCESS, // Access mode
MQ_DENY_NONE, // Share mode
&hQueue // OUT: Queue handle
);
// Free the memory that was allocated for the format name string.
if (wszFormatName)
{
delete [] wszFormatName;
}
// Handle any error returned by MQOpenQueue.
if (FAILED(hr))
{
return hr;
}
for ( ; ; )
{
aMsgPropVar[0].ulVal = MQ_MAX_MSG_LABEL_LEN;
hr = MQReceiveMessage(
hQueue, // Queue handle
1000, // Max time to (msec) to receive the message
MQ_ACTION_RECEIVE, // Receive action
&msgprops, // Message property structure
NULL, // No OVERLAPPED structure
NULL, // No callback function
NULL, // No cursor handle
MQ_NO_TRANSACTION // Not in a transaction
);
if (hr == MQ_ERROR_BUFFER_OVERFLOW)
{
ulBufferSize = aMsgPropVar[2].ulVal*sizeof(UCHAR);
pucBodyBuffer = (UCHAR*)realloc(pucBodyBuffer, ulBufferSize);
if (pucBodyBuffer == NULL)
{
return MQ_ERROR_INSUFFICIENT_RESOURCES;
}
memset(pucBodyBuffer, 0, ulBufferSize);
aMsgPropVar[3].caub.pElems = (UCHAR*)pucBodyBuffer;
aMsgPropVar[3].caub.cElems = ulBufferSize;
continue;
}
if (FAILED(hr))
{
wprintf(L"No messages. Closing queue\n");
break;
}
// If the message contains a label, print it.
if (msgprops.aPropVar[0].ulVal == 0)
{
wprintf(L"Removed message from queue.\n");
}
else
{
wprintf(L"Removed message '%s' from queue.\n", wszLabelBuffer);
}
// If the message body is a string, display it.
if (msgprops.aPropVar[4].ulVal == VT_BSTR)
{
wprintf(L"Body: %s", (WCHAR*)pucBodyBuffer);
wprintf(L"\n");
}
}
// Close the queue and free the memory allocated for the body buffer.
hr = MQCloseQueue(hQueue);
free(pucBodyBuffer);
return hr;
}