Message queues work by exchanging data in buffers. Any number of processes can communicate through message queues, regardless of whether they are related; if a process has adequate access permission, it can send or receive messages through the queue. Message notification can be synchronous or asynchronous. Message queues can store multiple messages, be accessed by multiple processes, be read in any order, and be prioritized according to application needs.
This chapter includes the following sections:
Message Queues, Section 10.1
The Message Interface, Section 10.2
Message Queue Examples, Section 10.3
The POSIX 1003.1b message passing facilities provide a deterministic, efficient means for interprocess communication (IPC). Realtime message passing is designed to work with shared memory in order to accommodate the needs of realtime applications with an efficient, deterministic mechanism to pass arbitrary amounts of data between cooperating processes. Predictability is the primary emphasis behind the design for realtime message passing.
Cooperating processes can send and receive messages by accessing system-wide message queues. These message queues are accessed through names that may be pathnames.
The maximum size of each message is defined by the system to optimize the message sending and receiving functions. Message buffers are preallocated, ensuring the availability of resources when they are needed.
If your application involves heavy message traffic, you can prioritize the order in which processes receive messages by assigning a priority to the message or controlling the priority of the receiving process.
Asynchronous notification of the availability of a message on a queue allows a process to do useful work while waiting to receive a message.
Message passing operations that contribute to kernel overhead have been
eliminated in the realtime message queue interface.
If your application requires
the ability to wait on multiple message queues simultaneously or the broadcast
of a single message to multiple queues, you may need to write this functionality
into your application.
10.2 The Message Interface
The message queue interface is a set of structures and data that allows you to use a message queue for sending and receiving messages. The message queue is a linked list that serves as a holding place for messages being sent to and received by processes sharing access to the message queue.
The following POSIX 1003.1b message queue functions allow you controlled access to messaging operations on a message queue:
Function | Description |
mq_close |
Closes a message queue |
mq_getattr |
Retrieves the attributes of a message queue |
mq_notify |
Requests that a process be notified when a message is available on a queue |
mq_open |
Opens a message queue |
mq_receive |
Receives a message from a queue |
mq_send |
Sends a message to a queue |
mq_setattr |
Sets the attributes of a message queue |
mq_unlink |
Removes a message queue |
General usage for message queues is as follows:
Get a message queue descriptor with a call to the
mq_open
function.
Send and receive messages with calls to the
mq_send
and
mq_receive
functions.
Close the message queue with a call to the
mq_close
function.
Remove the message queue with a call to the
mq_unlink
function.
Data written to a message queue created by one process is available
to all processes that open the same message queue.
Message queues are persistent;
once unlinked, their names and contents remain until all processes that have
opened the queue call the
mq_close
function.
Child processes
inherit the message queue descriptor created by the parent process.
Once the
message queue is opened, the child process can read or write to it according
to access permissions.
Unrelated processes can also use the message queue,
but must first call the
mq_open
function to establish the
connection.
You can identify message queue attributes with a call to the
mq_getattr
function.
You can specify whether the message operation
is blocking or non-blocking by calling the
mq_setattr
function.
A call to the
mq_receive
function receives the oldest,
highest-priority message on the queue.
If two or more processes are waiting
for an incoming message on the same queue, the process with the highest priority
that has been waiting the longest receives the next message.
Often message queues are created and used only while an application
is executing.
The
mq_unlink
function removes (deletes)
the message queue and its contents, unless processes still have the queue
open.
The message queue is deleted only when all processes using it have closed
the queue.
10.2.1 Opening a Message Queue
To set up a message queue, first create a new message queue or open
an existing queue using the
mq_open
function.
If a message
queue of the specified name does not already exist, a new message queue is
allocated and initialized.
If one already exists, the
mq_open
function checks permissions.
A process can create and open message queues early in the life of the
application.
Use the
mq_open
function to open (establish
a connection to) a message queue.
After a process opens the message queue,
each process that needs to use it must call the
mq_open
function specifying the same pathname.
The
mq_open
function provides a set of flags that
prescribe the characteristics of the message queue for the process and define
access modes for the message queue.
Message queue access is determined by
the OR of the file status flags and access modes listed in
Table 10-1.
Table 10-1: Status Flags and Access Modes for the mq_open Function
Flag | Description |
O_RDONLY | Open for read access only |
O_WRONLY | Open for write access only |
O_RDWR | Open for read and write access |
O_CREAT | Create the message queue, if it does not already exist |
O_EXCL | When used with O_CREAT, creates and opens a message queue if a queue of the same name does not already exist. If a message queue of the same name exists, the message queue is not opened. |
O_NONBLOCK | Determines whether a send or receive operation is blocking or nonblocking |
The first process to call the
mq_open
function should
use the O_CREAT flag to create the message queue, to set the queue's user
ID to that of the calling process, and to set the queue's group ID to the
effective group ID of the calling process.
This establishes an environment
whereby the calling process, all cooperating processes, and child processes
share the same effective group ID with the message queue.
All processes that
subsequently open the message queue must have the same access permission as
the creating process.
Each process that uses a message queue must begin by calling the
mq_open
function.
This call can accomplish several objectives:
Create and open the message queue, if it does not yet exist (specify the O_CREAT flag).
Open an existing message queue.
Attempt to create and open the queue but fail if the queue already exists (specify both the O_CREAT and O_EXCL flags).
Open access to the queue for the calling process and establish a connection between the queue and a descriptor. All threads within the same process using the queue use the same descriptor.
Specify the access mode for the process:
Read only
Write only
Read/write
Specify whether the process will block or fail when unable to send a message (the queue is full) or receive a message (the queue is empty) with the oflags argument.
The mode bit is checked to determine if the caller has permission for the requested operation. If the calling process is not the owner and is not in the group, the mode bits must be set for world access before permission is granted. In addition, the appropriate access bits must be set before an operation is performed. That is, to perform a read operation, the read bit must be set.
For example, the following code creates a message queue and, if it does not already exist, opens it for read and write access.
fd = mq_open("new_queue", (O_CREAT|O_EXCL|O_RDWR);
Once a message queue is created, its name and resources are persistent.
It exists until the message queue is unlinked with a call to the
mq_unlink
function and all other references to the queue are gone.
The message flag parameter is either 0 or O_NONBLOCK.
If you specify
a flag of 0, then a sending process sleeps if the message cannot be sent to
the specified queue, due to the queue being full.
The process will sleep
until other messages have been removed from the queue and space becomes available.
When the flag is specified as O_NONBLOCK, the
mq_send
function
returns immediately with an error status.
Example 10-1
shows the code sequence to establish a connection
to a message queue descriptor.
Example 10-1: Opening a Message Queue
#include <unistd.h> #include <sys/types.h> #include <mqueue.h> #include <fcntl.h> main () int md; int status; /* Create message queue */ md = mq_open ("my_queue", O_CREAT|O_RDWR); /* * code to close and unlink the message queue goes here */ status = mq_close(md); /* Close message queue */ status = mq_unlink("my_queue"); /* Unlink message queue */
Use the same access permissions that you would normally use on a call
to the file
open
function.
If you intend to only read the
queue, specify read permission only on the
mq_open
function.
If you intend to read and write to the queue, open the queue with both read
and write permissions.
When finished using a message queue, close the queue with the
mq_close
function, and remove the queue by calling the
mq_unlink
function.
10.2.2 Sending and Receiving Messages
For an application in which the intended recipients of messages might be ambiguous because they all use a single message queue, you can establish multiple queues. In some cases you may need to provide a separate queue for each process that receives a message. Two processes that carry on two-way communication between them normally require two message queues:
Process X sends messages to queue A; process Y receives from it
Process Y sends messages to queue B; process X receives from it
Use of a single queue by multiple processes could be appropriate for an application that collects and processes data. Consider an application that consists of five processes that monitor data points and a sixth process that accumulates and interprets the data. Each of the five monitoring processes could send information to a single message queue. The sixth process could receive the messages from the queue, with assurance that it is receiving information according to the specified priorities of the incoming messages, in first-in first-out order within each priority.
When a process receives a message from a queue, it removes that message from the queue. Therefore, an application that requires one process to send the same message to several other processes should choose one of the following communication methods:
Set up a message queue for each receiving process, and send each message to each queue
Communicate by using signals and shared memory
Once a message queue is open, you can send messages to another process
using the
mq_send
function.
The
mq_send
function takes four parameters, including: the message queue descriptor,
a pointer to a message buffer, the size of the buffer, and the message priority.
The read/write permissions are checked along with the length of the message,
the status of the message queue, and the message flag.
If all checks are successful,
the message is added to the message queue.
If the queue is already full,
the sending process can block until space in the queue becomes available,
or it can return immediately, according to whether it set the O_NONBLOCK
flag when it called the
mq_open
function.
Once a message has been placed on a queue, you can retrieve the message
with a call to the
mq_receive
function.
The
mq_receive
function includes four parameters: the message queue descriptor,
a pointer to a buffer to hold the incoming message, the size of the buffer,
and the priority of the message received (the priority is returned by the
function).
The size of the buffer must be at least the size of the message
queue's size attribute.
As with the
mq_send
function, the read/write operation
permissions are checked on a call to the
mq_receive
function.
If more than one process is waiting to receive a message when a message arrives
at an empty queue, then the process with the highest priority that has been
waiting the longest is selected to receive the message.
When a process uses the
mq_receive
function to read
a message from a queue, the queue may be empty.
The receiving process can
block until a message arrives in the queue, or it can return immediately,
according to the state of the O_NONBLOCK flag established with a preceding
call to the
mq_open
function.
10.2.3 Asynchronous Notification of Messages
A process that wants to read a message from a message queue has three options:
Set the queue to blocking mode, and wait for a message to
be received by calling
mq_receive
Set the queue to non-blocking mode, and call
mq_receive
multiple times until a message is received
Set the queue to non-blocking mode, and call
mq_notify
specifying a signal to be sent when the queue goes from empty to
non-empty
The last option is a good choice for a realtime application.
The
mq_notify
function is used to register a request for asynchronous
notification by a signal when a message becomes available on a previously
empty queue.
The process can then do useful work until a message arrives,
at which time a signal is sent according to the signal information specified
in the
notification
argument of the
mq_notify
function.
After notification, the process can call
mq_receive
to receive the message.
Only one notification request at a time is allowed per message queue
descriptor.
The previous notification request is canceled when another signal
is sent; thus, the request must be re-registered by calling
mq_notify
again.
10.2.4 Prioritizing Messages
A process can control the relative priority of messages it sends to
a specified queue by setting the
msg_prio
parameter
in the
mq_send
function.
If
msg_prio
is specified on the
mq_send
function, the message is inserted into the message queue
according to its priority relative to other messages on the queue.
A message
with a larger numeric value (higher priority) is inserted into the queue before
messages with a lower numeric value.
The
mq_receive
function
always returns the first message on the queue, so if you assign higher priorities
to messages of higher importance, you can receive the most important messages
first.
If you assign lower priorities to less important messages, you can
delay delivery of the messages as more important messages are sent.
Messages
of equal priority are inserted in a first-in, first-out manner.
The ability
to assign priorities to messages on the queue reduces the possibility of
priority inversion in the realtime messaging interface.
10.2.5 Using Message Queue Attributes
Use the
mq_getattr
function to determine the message
queue attributes of an existing message queue.
The attributes are as follows:
Attribute | Description |
mq_flags | The message queue flags |
mq_maxmsg | The maximum number of messages allowed |
mq_msgsize | The maximum message size allowed for the queue |
mq_curmsgs | The number of messages on the queue |
The
mq_curmsgs
attribute describes the current
queue status.
If necessary, call the
mq_setattr
function
to reset the flags.
The
mq_maxmsg
and
mq_msgsize
attributes cannot be modified after the initial queue
creation.
The
mqueue.h
header file contains information
concerning system-wide maximums and other limits pertaining to message queues.
10.2.6 Closing and Removing a Message Queue
Each process that uses a message queue should close its access to the
queue by calling the
mq_close
function before exiting.
When all processes using the queue have called this function, the software
removes the queue.
A process can remove a message queue by calling the
mq_unlink
function.
However, if other processes still have the message queue
open, the
mq_unlink
function returns immediately and destruction
of the queue is postponed until all references to the queue have been closed.
10.3 Message Queue Examples
Example 10-2
creates a message queue and sends a loop of
messages.
The message queue is created using O_CREAT.
Example 10-2: Using Message Queues to Send Data
/* * test_send.c * * This test goes with test_receive.c. * test_send.c does a loop of mq_sends, * and test_receive.c does a loop of mq_receives. */ #include <unistd.h> #include <sys/types.h> #include <stdio.h> #include <sys/time.h> #include <sys/resource.h> #include <time.h> #include <sched.h> #include <sys/mman.h> #include <sys/fcntl.h> #include <signal.h> #include <sys/rt_syscall.h> #include <mqueue.h> #include <errno.h> #define PMODE 0666 extern int errno; int main() { int i; int status = 0; mqd_t mqfd; char msg_buffer[P4IPC_MSGSIZE]; struct mq_attr attr; int open_flags = 0; int num_bytes_to_send; int priority_of_msg; printf("START OF TEST_SEND \n"); /* Fill in attributes for message queue */ attr.mq_maxmsg = 20; attr.mq_msgsize = P4IPC_MSGSIZE; attr.mq_flags = 0; /* Set the flags for the open of the queue. * Make it a blocking open on the queue, meaning it will block if * this process tries to send to the queue and the queue is full. * (Absence of O_NONBLOCK flag implies that the open is blocking) * * Specify O_CREAT so that the file will get created if it does not * already exist. * * Specify O_WRONLY since we are only planning to write to the queue, * although we could specify O_RDWR also. */ open_flags = O_WRONLY|O_CREAT; /* Open the queue, and create it if the receiving process hasn't * already created it. */ mqfd = mq_open("myipc",open_flags,PMODE,&attr); if (mqfd == -1) { perror("mq_open failure from main"); exit(0); }; /* Fill in a test message buffer to send */ msg_buffer[0] = 'P'; msg_buffer[1] = 'R'; msg_buffer[2] = 'I'; msg_buffer[3] = 'O'; msg_buffer[4] = 'R'; msg_buffer[5] = 'I'; msg_buffer[6] = 'T'; msg_buffer[7] = 'Y'; msg_buffer[8] = '1'; msg_buffer[9] = 'a'; num_bytes_to_send = 10; priority_of_msg = 1; /* Perform the send 10 times */ for (i=0; i<10; i++) { status = mq_send(mqfd,msg_buffer,num_bytes_to_send,priority_of_msg); if (status == -1) perror("mq_send failure on mqfd"); else printf("successful call to mq_send, i = %d\n",i); } /* Done with queue, so close it */ if (mq_close(mqfd) == -1) perror("mq_close failure on mqfd"); printf("About to exit the sending process after closing the queue \n"); }
Example 10-3
creates a message queue and receives a loop
of messages.
The message queue is created using O_CREAT.
Example 10-3: Using Message Queues to Receive Data
/* * test_receive.c * * This test goes with test_send.c. * test_send.c does a loop of mq_sends, * and test_receive.c does a loop of mq_receives. */ #include <unistd.h> #include <sys/types.h> #include <stdio.h> #include <sys/time.h> #include <sys/resource.h> #include <time.h> #include <sched.h> #include <sys/mman.h> #include <sys/fcntl.h> #include <signal.h> #include <sys/rt_syscall.h> #include <mqueue.h> #include <errno.h> #define PMODE 0666 extern int errno; int main() { int i; mqd_t mqfd; /* Buffer to receive msg into */ char msg_buffer[P4IPC_MSGSIZE]; struct mq_attr attr; int open_flags = 0; ssize_t num_bytes_received = 0; msg_buffer[10] = 0; /* For printing a null terminated string for testing */ printf("START OF TEST_RECEIVE \n"); /* Fill in attributes for message queue */ attr.mq_maxmsg = 20; attr.mq_msgsize = P4IPC_MSGSIZE; attr.mq_flags = 0; /* Set the flags for the open of the queue. * Make it a blocking open on the queue, * meaning it will block if this process tries to * send to the queue and the queue is full. * (Absence of O_NONBLOCK flag implies that * the open is blocking) * * Specify O_CREAT so that the file will get * created if it does not already exist. * * Specify O_RDONLY since we are only * planning to write to the queue, * although we could specify O_RDWR also. */ open_flags = O_RDONLY|O_CREAT; /* Open the queue, and create it if the sending process hasn't * already created it. */ mqfd = mq_open("myipc",open_flags,PMODE,&attr); if (mqfd == -1) { perror("mq_open failure from main"); exit(0); }; /* Perform the receive 10 times */ for (i=0;i<10;i++) { num_bytes_received = mq_receive(mqfd,msg_buffer,P4IPC_MSGSIZE,0); if (num_bytes_received == -1) { perror("mq_receive failure on mqfd"); } else printf("data read for iteration %d = %s \n",i,msg_buffer); } /* Done with queue, so close it */ if (mq_close(mqfd) == -1) perror("mq_close failure on mqfd"); /* Done with test, so unlink the queue, * which destroys it. * You only need one call to unlink. */ if (mq_unlink("myipc") == -1) perror("mq_unlink failure in test_ipc"); printf("Exiting receiving process after closing and unlinking queue \n"); }