訊息佇列



當我們已經擁有共享記憶體時,為什麼還需要訊息佇列?這可能有幾個原因,讓我們嘗試將其分解成多個要點以簡化理解:

  • 如我們所知,一旦一個程序接收了訊息,該訊息將不再可供任何其他程序使用。而在共享記憶體中,多個程序都可以訪問資料。

  • 如果我們希望使用小型訊息格式進行通訊。

  • 當多個程序同時通訊時,需要使用同步機制來保護共享記憶體資料。

  • 如果使用共享記憶體進行讀寫操作的頻率很高,那麼實現該功能將非常複雜。在這種情況下,使用它並不划算。

  • 如果並非所有程序都需要訪問共享記憶體,而只有少數幾個程序需要訪問,那麼使用訊息佇列來實現會更好。

  • 如果我們希望使用不同的資料包進行通訊,例如程序 A 向程序 B 傳送訊息型別 1,向程序 C 傳送訊息型別 10,向程序 D 傳送訊息型別 20。在這種情況下,使用訊息佇列實現更簡單。為了簡化給定的訊息型別(如 1、10、20),它可以是 0 或正數或負數,如下所述。

  • 當然,訊息佇列的順序是 FIFO(先進先出)。佇列中插入的第一條訊息將是第一個被檢索到的訊息。

使用共享記憶體或訊息佇列取決於應用程式的需求以及如何有效地利用它們。

使用訊息佇列進行通訊可以以下列方式進行:

  • 一個程序寫入共享記憶體,另一個程序從共享記憶體讀取。我們知道,多個程序也可以進行讀取。

Message Queue
  • 一個程序使用不同的資料包寫入共享記憶體,多個程序從中讀取,即根據訊息型別讀取。

Multiple Message Queue

在瞭解了有關訊息佇列的某些資訊後,現在是時候檢查支援訊息佇列的系統呼叫 (System V) 了。

要使用訊息佇列進行通訊,需要執行以下步驟:

步驟 1 - 建立訊息佇列或連線到已存在的佇列 (msgget())

步驟 2 - 寫入訊息佇列 (msgsnd())

步驟 3 - 從訊息佇列讀取 (msgrcv())

步驟 4 - 對訊息佇列執行控制操作 (msgctl())

現在,讓我們檢查以上呼叫的語法和一些資訊。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg)

此係統呼叫建立或分配一個 System V 訊息佇列。需要傳遞以下引數:

  • 第一個引數 key 用於識別訊息佇列。key 可以是任意值,也可以是從庫函式 ftok() 派生的值。

  • 第二個引數 shmflg 指定所需的訊息佇列標誌,例如 IPC_CREAT(如果不存在則建立訊息佇列)或 IPC_EXCL(與 IPC_CREAT 一起使用以建立訊息佇列,如果訊息佇列已存在則呼叫失敗)。還需要傳遞許可權。

注意 - 有關許可權的詳細資訊,請參考前面的章節。

如果成功,此呼叫將返回一個有效的訊息佇列識別符號(用於後續的訊息佇列呼叫),如果失敗則返回 -1。要了解失敗的原因,請檢查 errno 變數或 perror() 函式。

與此呼叫相關的各種錯誤包括 EACCESS(許可權被拒絕)、EEXIST(佇列已存在,無法建立)、ENOENT(佇列不存在)、ENOMEM(記憶體不足以建立佇列)等。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg)

此係統呼叫將訊息傳送/追加到訊息佇列 (System V)。需要傳遞以下引數:

  • 第一個引數 msgid 用於識別訊息佇列,即訊息佇列識別符號。識別符號值在 msgget() 成功後接收。

  • 第二個引數 msgp 是指向傳送到呼叫者的訊息的指標,以以下形式定義的結構形式定義:

struct msgbuf {
   long mtype;
   char mtext[1];
};

變數 mtype 用於與不同的訊息型別進行通訊,在 msgrcv() 呼叫中詳細解釋。變數 mtext 是一個數組或其他結構,其大小由 msgsz(正值)指定。如果未提及 mtext 欄位,則將其視為零大小訊息,這是允許的。

  • 第三個引數 msgsz 是訊息的大小(訊息應以空字元結尾)。

  • 第四個引數 msgflg 指示某些標誌,例如 IPC_NOWAIT(在佇列中找不到訊息時立即返回)或 MSG_NOERROR(如果超過 msgsz 位元組則截斷訊息文字)。

如果成功,此呼叫將返回 0,如果失敗則返回 -1。要了解失敗的原因,請檢查 errno 變數或 perror() 函式。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgrcv(int msgid, const void *msgp, size_t msgsz, long msgtype, int msgflg)

此係統呼叫從訊息佇列 (System V) 中檢索訊息。需要傳遞以下引數:

  • 第一個引數 msgid 用於識別訊息佇列,即訊息佇列識別符號。識別符號值在 msgget() 成功後接收。

  • 第二個引數 msgp 是從呼叫者接收的訊息的指標。它以以下形式定義的結構形式定義:

struct msgbuf {
   long mtype;
   char mtext[1];
};

變數 mtype 用於與不同的訊息型別進行通訊。變數 mtext 是一個數組或其他結構,其大小由 msgsz(正值)指定。如果未提及 mtext 欄位,則將其視為零大小訊息,這是允許的。

  • 第三個引數 msgsz 是接收的訊息的大小(訊息應以空字元結尾)。

  • 第四個引數 msgtype 指示訊息的型別:

    • 如果 msgtype 為 0 - 讀取佇列中接收到的第一個訊息。

    • 如果 msgtype 為正數 - 讀取佇列中型別為 msgtype 的第一個訊息(如果 msgtype 為 10,則只讀取型別為 10 的第一個訊息,即使在佇列開頭可能存在其他型別)。

    • 如果 msgtype 為負數 - 讀取型別小於或等於訊息型別絕對值的第一個訊息(例如,如果 msgtype 為 -5,則讀取型別小於 5 的第一個訊息,即型別為 1 到 5 的訊息)。

  • 第五個引數 msgflg 指示某些標誌,例如 IPC_NOWAIT(在佇列中找不到訊息時立即返回)或 MSG_NOERROR(如果超過 msgsz 位元組則截斷訊息文字)。

如果成功,此呼叫將返回實際接收到的 mtext 陣列中的位元組數,如果失敗則返回 -1。要了解失敗的原因,請檢查 errno 變數或 perror() 函式。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msgid, int cmd, struct msqid_ds *buf)

此係統呼叫執行訊息佇列 (System V) 的控制操作。需要傳遞以下引數:

  • 第一個引數 msgid 用於識別訊息佇列,即訊息佇列識別符號。識別符號值在 msgget() 成功後接收。

  • 第二個引數 cmd 是要對訊息佇列執行所需控制操作的命令。cmd 的有效值為:

IPC_STAT - 將 struct msqid_ds 中每個成員的當前值的副本複製到 buf 指向的傳遞的結構中。此命令需要對訊息佇列具有讀取許可權。

IPC_SET - 設定所有者的使用者 ID、組 ID、許可權等,這些內容由 buf 指向的結構指定。

IPC_RMID - 立即刪除訊息佇列。

IPC_INFO - 返回有關訊息佇列限制和引數的資訊,這些資訊儲存在 buf 指向的結構中,該結構的型別為 struct msginfo。

MSG_INFO - 返回一個 msginfo 結構,其中包含有關訊息佇列消耗的系統資源的資訊。

  • 第三個引數 buf 是指向名為 struct msqid_ds 的訊息佇列結構的指標。此結構的值將根據 cmd 用於設定或獲取。

此呼叫將根據傳遞的命令返回值。IPC_INFO 和 MSG_INFO 或 MSG_STAT 的成功返回訊息佇列的索引或識別符號,其他操作返回 0,如果失敗則返回 -1。要了解失敗的原因,請檢查 errno 變數或 perror() 函式。

在瞭解了有關訊息佇列的基本資訊和系統呼叫後,現在是時候檢查一下程式了。

在檢視程式之前,讓我們先了解一下描述:

步驟 1 - 建立兩個程序,一個用於傳送到訊息佇列 (msgq_send.c),另一個用於從訊息佇列中檢索 (msgq_recv.c)。

步驟 2 - 使用 ftok() 函式建立金鑰。為此,首先建立檔案 msgq.txt 以獲取唯一的金鑰。

步驟 3 - 傳送程序執行以下操作。

  • 從使用者讀取字串輸入。

  • 如果存在,則刪除換行符。

  • 傳送到訊息佇列。

  • 重複此過程,直到輸入結束 (CTRL + D)。

  • 接收到輸入結束訊號後,傳送訊息“end”以表示程序結束。

步驟 4 - 在接收程序中,執行以下操作。

  • 從佇列讀取訊息。
  • 顯示輸出。
  • 如果接收到的訊息為“end”,則完成程序並退出。

為了簡化起見,在此示例中我們不使用訊息型別。此外,一個程序寫入佇列,另一個程序從佇列讀取。這可以根據需要擴充套件,即理想情況下,一個程序寫入佇列,多個程序從佇列讀取。

現在,讓我們檢查一下程序(將訊息傳送到佇列) - 檔案:msgq_send.c

/* Filename: msgq_send.c */
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int len;
   key_t key;
   system("touch msgq.txt");
   
   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }
   
   if ((msqid = msgget(key, PERMS | IPC_CREAT)) == -1) {
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to send messages.\n");
   printf("Enter lines of text, ^D to quit:\n");
   buf.mtype = 1; /* we don't really care in this case */
   
   while(fgets(buf.mtext, sizeof buf.mtext, stdin) != NULL) {
      len = strlen(buf.mtext);
      /* remove newline at end, if it exists */
      if (buf.mtext[len-1] == '\n') buf.mtext[len-1] = '\0';
      if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
      perror("msgsnd");
   }
   strcpy(buf.mtext, "end");
   len = strlen(buf.mtext);
   if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
   perror("msgsnd");
   
   if (msgctl(msqid, IPC_RMID, NULL) == -1) {
      perror("msgctl");
      exit(1);
   }
   printf("message queue: done sending messages.\n");
   return 0;
}

編譯和執行步驟

message queue: ready to send messages.
Enter lines of text, ^D to quit:
this is line 1
this is line 2
message queue: done sending messages.

以下是來自訊息接收程序(從佇列中檢索訊息)的程式碼 - 檔案:msgq_recv.c

/* Filename: msgq_recv.c */
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int toend;
   key_t key;
   
   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }
   
   if ((msqid = msgget(key, PERMS)) == -1) { /* connect to the queue */
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to receive messages.\n");
   
   for(;;) { /* normally receiving never ends but just to make conclusion 
             /* this program ends wuth string of end */
      if (msgrcv(msqid, &buf, sizeof(buf.mtext), 0, 0) == -1) {
         perror("msgrcv");
         exit(1);
      }
      printf("recvd: \"%s\"\n", buf.mtext);
      toend = strcmp(buf.mtext,"end");
      if (toend == 0)
      break;
   }
   printf("message queue: done receiving messages.\n");
   system("rm msgq.txt");
   return 0;
}

編譯和執行步驟

message queue: ready to receive messages.
recvd: "this is line 1"
recvd: "this is line 2"
recvd: "end"
message queue: done receiving messages.
廣告

© . All rights reserved.