程序間通訊 - 訊號量



首先想到的問題是,為什麼我們需要訊號量?簡單來說,是為了保護多個程序共享的關鍵/公共區域。

假設多個程序使用相同的程式碼區域,如果所有程序都想要並行訪問,則結果會發生重疊。例如,多個使用者僅使用一臺印表機(公共/關鍵部分),假設有3個使用者,同時提交3個作業,如果所有作業都並行啟動,則一個使用者的輸出會與另一個使用者的輸出重疊。因此,我們需要使用訊號量來保護它,即在一個程序執行時鎖定關鍵部分,並在完成後解鎖。這將對每個使用者/程序重複執行,以確保一個作業不會與另一個作業重疊。

基本上,訊號量分為兩種型別:

二元訊號量 - 只有兩種狀態 0 和 1,即鎖定/解鎖或可用/不可用,互斥實現。

計數訊號量 - 允許任意資源計數的訊號量稱為計數訊號量。

假設我們有5臺印表機(為了理解,假設一臺印表機只接受一個作業),我們有3個作業需要列印。現在,3個作業將分配給3臺印表機(每臺一臺)。在此過程中,又來了4個作業。現在,在2臺可用的印表機中,已經安排了2個作業,我們還剩下2個作業,只有在其中一個資源/印表機可用後才能完成。這種根據資源可用性進行的排程可以視為計數訊號量。

要使用訊號量執行同步,請執行以下步驟:

步驟1 - 建立訊號量或連線到已存在的訊號量 (semget())

步驟2 - 對訊號量執行操作,即分配、釋放或等待資源 (semop())

步驟3 - 對訊息佇列執行控制操作 (semctl())

現在,讓我們用我們擁有的系統呼叫來檢查一下。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>

int semget(key_t key, int nsems, int semflg)

此係統呼叫建立或分配 System V 訊號量集。需要傳遞以下引數:

  • 第一個引數 key 識別訊息佇列。key 可以是任意值,也可以是從庫函式 ftok() 派生的值。

  • 第二個引數 nsems 指定訊號量的數量。如果是二元訊號量,則為 1,表示需要 1 個訊號量集,否則根據所需的訊號量集數量而定。

  • 第三個引數 semflg 指定所需的訊號量標誌,例如 IPC_CREAT(如果訊號量不存在則建立)或 IPC_EXCL(與 IPC_CREAT 一起使用以建立訊號量,如果訊號量已存在則呼叫失敗)。還需要傳遞許可權。

注意 - 請參閱前面的部分了解許可權的詳細資訊。

此呼叫成功時將返回有效的訊號量識別符號(用於進一步呼叫訊號量),失敗時返回 -1。要了解失敗的原因,請檢查 errno 變數或 perror() 函式。

此呼叫相關的各種錯誤包括 EACCESS(許可權被拒絕)、EEXIST(佇列已存在,無法建立)、ENOENT(佇列不存在)、ENOMEM(沒有足夠的記憶體來建立佇列)、ENOSPC(超過最大集合限制)等。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>

int semop(int semid, struct sembuf *semops, size_t nsemops)

此係統呼叫對 System V 訊號量集執行操作,例如分配資源、等待資源或釋放資源。需要傳遞以下引數:

  • 第一個引數 semid 指示由 semget() 建立的訊號量集識別符號。

  • 第二個引數 semops 是指向要對訊號量集執行的運算元組的指標。結構如下:

struct sembuf {
   unsigned short sem_num; /* Semaphore set num */
   short sem_op; /* Semaphore operation */
   short sem_flg; /* Operation flags, IPC_NOWAIT, SEM_UNDO */
};

上述結構中的元素 sem_op 指示需要執行的操作:

  • 如果 sem_op 為負數,則分配或獲取資源。阻塞呼叫程序,直到其他程序釋放足夠的資源,以便此程序可以分配。

  • 如果 sem_op 為零,則呼叫程序等待或休眠,直到訊號量值達到 0。

  • 如果 sem_op 為正數,則釋放資源。

例如:

struct sembuf sem_lock = { 0, -1, SEM_UNDO };

struct sembuf sem_unlock = {0, 1, SEM_UNDO };

  • 第三個引數 nsemops 是該陣列中的運算元。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>

int semctl(int semid, int semnum, int cmd, …)

此係統呼叫對 System V 訊號量執行控制操作。需要傳遞以下引數:

  • 第一個引數 semid 是訊號量的識別符號。此 ID 是訊號量識別符號,它是 semget() 系統呼叫的返回值。

  • 第二個引數 semnum 是訊號量的編號。訊號量從 0 開始編號。

  • 第三個引數 cmd 是執行所需訊號量控制操作的命令。

  • 第四個引數的型別是 union semun,取決於 cmd。對於某些情況,第四個引數不適用。

讓我們檢查一下 union semun:

union semun {
   int val; /* val for SETVAL */
   struct semid_ds *buf; /* Buffer for IPC_STAT and IPC_SET */
   unsigned short *array; /* Buffer for GETALL and SETALL */
   struct seminfo *__buf; /* Buffer for IPC_INFO and SEM_INFO*/
};

在 sys/sem.h 中定義的 semid_ds 資料結構如下:

struct semid_ds {
   struct ipc_perm sem_perm; /* Permissions */
   time_t sem_otime; /* Last semop time */
   time_t sem_ctime; /* Last change time */
   unsigned long sem_nsems; /* Number of semaphores in the set */
};

注意 - 請參閱手冊頁瞭解其他資料結構。

union semun arg; cmd 的有效值為:

  • IPC_STAT - 將 struct semid_ds 的每個成員的當前值的信 息複製到 arg.buf 指向的結構中。此命令需要對訊號量具有讀取許可權。

  • IPC_SET - 設定所有者的使用者 ID、組 ID、許可權等,這些都由 semid_ds 結構指向。

  • IPC_RMID - 刪除訊號量集。

  • IPC_INFO - 返回有關訊號量限制和引數的資訊,這些資訊包含在 arg.__buf 指向的 semid_ds 結構中。

  • SEM_INFO - 返回一個 seminfo 結構,其中包含有關訊號量消耗的系統資源的資訊。

此呼叫將返回一個值(非負值),具體取決於傳遞的命令。成功時,IPC_INFO 和 SEM_INFO 或 SEM_STAT 返回最高使用條目的索引或識別符號(根據訊號量),或者 GETNCNT 的 semncnt 值,或者 GETPID 的 sempid 值,或者 GETVAL 的 semval 值,其他操作成功時返回 0,失敗時返回 -1。要了解失敗的原因,請檢查 errno 變數或 perror() 函式。

在檢視程式碼之前,讓我們瞭解一下它的實現:

  • 建立兩個程序,例如子程序和父程序。

  • 建立共享記憶體,主要用於儲存計數器和其他標誌,以指示共享記憶體中讀/寫程序的結束。

  • 計數器由父程序和子程序遞增。計數器值可以作為命令列引數傳遞,也可以作為預設值(如果未作為命令列引數傳遞或值小於 10000)。呼叫具有特定休眠時間,以確保父程序和子程序同時訪問共享記憶體,即並行訪問。

  • 由於計數器由父程序和子程序以 1 為步長遞增,因此最終值應該是計數器的兩倍。由於父程序和子程序同時執行操作,因此計數器未按要求遞增。因此,我們需要確保一個程序完成之後才能執行另一個程序。

  • 所有上述實現都在檔案 shm_write_cntr.c 中執行。

  • 檢查計數器值是否在檔案 shm_read_cntr.c 中實現。

  • 為了確保完成,訊號量程式在檔案 shm_write_cntr_with_sem.c 中實現。在整個程序完成後(讀取完成後)刪除訊號量。

  • 由於我們有單獨的檔案來讀取共享記憶體中的計數器值,並且寫入不會對其產生任何影響,因此讀取程式保持不變 (shm_read_cntr.c)

  • 最好在一個終端中執行寫入程式,在另一個終端中執行讀取程式。由於程式只有在寫入和讀取過程完成後才能完成執行,因此在完全執行寫入程式後執行程式是可以的。寫入程式將等待讀取程式執行,並且只有在讀取程式完成後才會結束。

無訊號量的程式。

/* Filename: shm_write_cntr.c */
#include<stdio.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<sys/types.h>
#include<string.h>
#include<errno.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>

#define SHM_KEY 0x12345
struct shmseg {
   int cntr;
   int write_complete;
   int read_complete;
};
void shared_memory_cntr_increment(int pid, struct shmseg *shmp, int total_count);

int main(int argc, char *argv[]) {
   int shmid;
   struct shmseg *shmp;
   char *bufptr;
   int total_count;
   int sleep_time;
   pid_t pid;
   if (argc != 2)
   total_count = 10000;
   else {
      total_count = atoi(argv[1]);
      if (total_count < 10000)
      total_count = 10000;
   }
   printf("Total Count is %d\n", total_count);
   shmid = shmget(SHM_KEY, sizeof(struct shmseg), 0644|IPC_CREAT);

   if (shmid == -1) {
      perror("Shared memory");
      return 1;
   }

   // Attach to the segment to get a pointer to it.
   shmp = shmat(shmid, NULL, 0);
   if (shmp == (void *) -1) {
      perror("Shared memory attach");
      return 1;
   }
   shmp->cntr = 0;
   pid = fork();

   /* Parent Process - Writing Once */
   if (pid > 0) {
      shared_memory_cntr_increment(pid, shmp, total_count);
   } else if (pid == 0) {
      shared_memory_cntr_increment(pid, shmp, total_count);
      return 0;
   } else {
      perror("Fork Failure\n");
      return 1;
   }
   while (shmp->read_complete != 1)
   sleep(1);

   if (shmdt(shmp) == -1) {
      perror("shmdt");
      return 1;
   }

   if (shmctl(shmid, IPC_RMID, 0) == -1) {
      perror("shmctl");
      return 1;
   }
   printf("Writing Process: Complete\n");
   return 0;
}

/* Increment the counter of shared memory by total_count in steps of 1 */
void shared_memory_cntr_increment(int pid, struct shmseg *shmp, int total_count) {
   int cntr;
   int numtimes;
   int sleep_time;
   cntr = shmp->cntr;
   shmp->write_complete = 0;
   if (pid == 0)
   printf("SHM_WRITE: CHILD: Now writing\n");
   else if (pid > 0)
   printf("SHM_WRITE: PARENT: Now writing\n");
   //printf("SHM_CNTR is %d\n", shmp->cntr);
   
   /* Increment the counter in shared memory by total_count in steps of 1 */
   for (numtimes = 0; numtimes < total_count; numtimes++) {
      cntr += 1;
      shmp->cntr = cntr;
      
      /* Sleeping for a second for every thousand */
      sleep_time = cntr % 1000;
      if (sleep_time == 0)
      sleep(1);
   }
   
   shmp->write_complete = 1;
   if (pid == 0)
   printf("SHM_WRITE: CHILD: Writing Done\n");
   else if (pid > 0)
   printf("SHM_WRITE: PARENT: Writing Done\n");
   return;
}

編譯和執行步驟

Total Count is 10000
SHM_WRITE: PARENT: Now writing
SHM_WRITE: CHILD: Now writing
SHM_WRITE: PARENT: Writing Done
SHM_WRITE: CHILD: Writing Done
Writing Process: Complete

現在,讓我們檢查共享記憶體讀取程式。

/* Filename: shm_read_cntr.c */
#include<stdio.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<sys/types.h>
#include<string.h>
#include<errno.h>
#include<stdlib.h>
#include<unistd.h>

#define SHM_KEY 0x12345
struct shmseg {
   int cntr;
   int write_complete;
   int read_complete;
};

int main(int argc, char *argv[]) {
   int shmid, numtimes;
   struct shmseg *shmp;
   int total_count;
   int cntr;
   int sleep_time;
   if (argc != 2)
   total_count = 10000;
   
   else {
      total_count = atoi(argv[1]);
      if (total_count < 10000)
      total_count = 10000;
   }
   shmid = shmget(SHM_KEY, sizeof(struct shmseg), 0644|IPC_CREAT);
   
   if (shmid == -1) {
      perror("Shared memory");
      return 1;
   }
   // Attach to the segment to get a pointer to it.
   shmp = shmat(shmid, NULL, 0);
   
   if (shmp == (void *) -1) {
      perror("Shared memory attach");
      return 1;
   }
   
   /* Read the shared memory cntr and print it on standard output */
   while (shmp->write_complete != 1) {
      if (shmp->cntr == -1) {
         perror("read");
         return 1;
      }
      sleep(3);
   }
   printf("Reading Process: Shared Memory: Counter is %d\n", shmp->cntr);
   printf("Reading Process: Reading Done, Detaching Shared Memory\n");
   shmp->read_complete = 1;
   
   if (shmdt(shmp) == -1) {
      perror("shmdt");
      return 1;
   }
   printf("Reading Process: Complete\n");
   return 0;
}

編譯和執行步驟

Reading Process: Shared Memory: Counter is 11000
Reading Process: Reading Done, Detaching Shared Memory
Reading Process: Complete

如果觀察上面的輸出,計數器應該是 20000,但是,由於在一個程序任務完成之前,另一個程序也在並行處理,因此計數器值並非預期值。輸出會因系統而異,並且每次執行都會有所不同。為了確保兩個程序在完成一個任務後執行任務,應該使用同步機制來實現。

現在,讓我們使用訊號量檢查相同的應用程式。

注意 - 讀取程式保持不變。

/* Filename: shm_write_cntr_with_sem.c */
#include<stdio.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<sys/sem.h>
#include<string.h>
#include<errno.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>

#define SHM_KEY 0x12345
#define SEM_KEY 0x54321
#define MAX_TRIES 20

struct shmseg {
   int cntr;
   int write_complete;
   int read_complete;
};
void shared_memory_cntr_increment(int, struct shmseg*, int);
void remove_semaphore();

int main(int argc, char *argv[]) {
   int shmid;
   struct shmseg *shmp;
   char *bufptr;
   int total_count;
   int sleep_time;
   pid_t pid;
   if (argc != 2)
   total_count = 10000;
   else {
      total_count = atoi(argv[1]);
      if (total_count < 10000)
      total_count = 10000;
   }
   printf("Total Count is %d\n", total_count);
   shmid = shmget(SHM_KEY, sizeof(struct shmseg), 0644|IPC_CREAT);
   
   if (shmid == -1) {
      perror("Shared memory");
      return 1;
   }
   // Attach to the segment to get a pointer to it.
   shmp = shmat(shmid, NULL, 0);
   
   if (shmp == (void *) -1) {
      perror("Shared memory attach: ");
      return 1;
   }
   shmp->cntr = 0;
   pid = fork();
   
   /* Parent Process - Writing Once */
   if (pid > 0) {
      shared_memory_cntr_increment(pid, shmp, total_count);
   } else if (pid == 0) {
      shared_memory_cntr_increment(pid, shmp, total_count);
      return 0;
   } else {
      perror("Fork Failure\n");
      return 1;
   }
   while (shmp->read_complete != 1)
   sleep(1);
   
   if (shmdt(shmp) == -1) {
      perror("shmdt");
      return 1;
   }
   
   if (shmctl(shmid, IPC_RMID, 0) == -1) {
      perror("shmctl");
      return 1;
   }
   printf("Writing Process: Complete\n");
   remove_semaphore();
   return 0;
}

/* Increment the counter of shared memory by total_count in steps of 1 */
void shared_memory_cntr_increment(int pid, struct shmseg *shmp, int total_count) {
   int cntr;
   int numtimes;
   int sleep_time;
   int semid;
   struct sembuf sem_buf;
   struct semid_ds buf;
   int tries;
   int retval;
   semid = semget(SEM_KEY, 1, IPC_CREAT | IPC_EXCL | 0666);
   //printf("errno is %d and semid is %d\n", errno, semid);
   
   /* Got the semaphore */
   if (semid >= 0) {
      printf("First Process\n");
      sem_buf.sem_op = 1;
      sem_buf.sem_flg = 0;
      sem_buf.sem_num = 0;
      retval = semop(semid, &sem_buf, 1);
      if (retval == -1) {
         perror("Semaphore Operation: ");
         return;
      }
   } else if (errno == EEXIST) { // Already other process got it
      int ready = 0;
      printf("Second Process\n");
      semid = semget(SEM_KEY, 1, 0);
      if (semid < 0) {
         perror("Semaphore GET: ");
         return;
      }
      
      /* Waiting for the resource */
      sem_buf.sem_num = 0;
      sem_buf.sem_op = 0;
      sem_buf.sem_flg = SEM_UNDO;
      retval = semop(semid, &sem_buf, 1);
      if (retval == -1) {
         perror("Semaphore Locked: ");
         return;
      }
   }
   sem_buf.sem_num = 0;
   sem_buf.sem_op = -1; /* Allocating the resources */
   sem_buf.sem_flg = SEM_UNDO;
   retval = semop(semid, &sem_buf, 1);
   
   if (retval == -1) {
      perror("Semaphore Locked: ");
      return;
   }
   cntr = shmp->cntr;
   shmp->write_complete = 0;
   if (pid == 0)
   printf("SHM_WRITE: CHILD: Now writing\n");
   else if (pid > 0)
   printf("SHM_WRITE: PARENT: Now writing\n");
   //printf("SHM_CNTR is %d\n", shmp->cntr);
   
   /* Increment the counter in shared memory by total_count in steps of 1 */
   for (numtimes = 0; numtimes < total_count; numtimes++) {
      cntr += 1;
      shmp->cntr = cntr;
      /* Sleeping for a second for every thousand */
      sleep_time = cntr % 1000;
      if (sleep_time == 0)
      sleep(1);
   }
   shmp->write_complete = 1;
   sem_buf.sem_op = 1; /* Releasing the resource */
   retval = semop(semid, &sem_buf, 1);
   
   if (retval == -1) {
      perror("Semaphore Locked\n");
      return;
   }
   
   if (pid == 0)
      printf("SHM_WRITE: CHILD: Writing Done\n");
      else if (pid > 0)
      printf("SHM_WRITE: PARENT: Writing Done\n");
      return;
}
   
void remove_semaphore() {
   int semid;
   int retval;
   semid = semget(SEM_KEY, 1, 0);
      if (semid < 0) {
         perror("Remove Semaphore: Semaphore GET: ");
         return;
      }
   retval = semctl(semid, 0, IPC_RMID);
   if (retval == -1) {
      perror("Remove Semaphore: Semaphore CTL: ");
      return;
   }
   return;
}

編譯和執行步驟

Total Count is 10000
First Process
SHM_WRITE: PARENT: Now writing
Second Process
SHM_WRITE: PARENT: Writing Done
SHM_WRITE: CHILD: Now writing
SHM_WRITE: CHILD: Writing Done
Writing Process: Complete

現在,我們將透過讀取程序檢查計數器值。

執行步驟

Reading Process: Shared Memory: Counter is 20000
Reading Process: Reading Done, Detaching Shared Memory
Reading Process: Complete
廣告
© . All rights reserved.