程序間通訊 - 訊號量
首先想到的問題是,為什麼我們需要訊號量?簡單來說,是為了保護多個程序共享的關鍵/公共區域。
假設多個程序使用相同的程式碼區域,如果所有程序都想要並行訪問,則結果會發生重疊。例如,多個使用者僅使用一臺印表機(公共/關鍵部分),假設有3個使用者,同時提交3個作業,如果所有作業都並行啟動,則一個使用者的輸出會與另一個使用者的輸出重疊。因此,我們需要使用訊號量來保護它,即在一個程序執行時鎖定關鍵部分,並在完成後解鎖。這將對每個使用者/程序重複執行,以確保一個作業不會與另一個作業重疊。
基本上,訊號量分為兩種型別:
二元訊號量 - 只有兩種狀態 0 和 1,即鎖定/解鎖或可用/不可用,互斥實現。
計數訊號量 - 允許任意資源計數的訊號量稱為計數訊號量。
假設我們有5臺印表機(為了理解,假設一臺印表機只接受一個作業),我們有3個作業需要列印。現在,3個作業將分配給3臺印表機(每臺一臺)。在此過程中,又來了4個作業。現在,在2臺可用的印表機中,已經安排了2個作業,我們還剩下2個作業,只有在其中一個資源/印表機可用後才能完成。這種根據資源可用性進行的排程可以視為計數訊號量。
要使用訊號量執行同步,請執行以下步驟:
步驟1 - 建立訊號量或連線到已存在的訊號量 (semget())
步驟2 - 對訊號量執行操作,即分配、釋放或等待資源 (semop())
步驟3 - 對訊息佇列執行控制操作 (semctl())
現在,讓我們用我們擁有的系統呼叫來檢查一下。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int semget(key_t key, int nsems, int semflg)
此係統呼叫建立或分配 System V 訊號量集。需要傳遞以下引數:
第一個引數 key 識別訊息佇列。key 可以是任意值,也可以是從庫函式 ftok() 派生的值。
第二個引數 nsems 指定訊號量的數量。如果是二元訊號量,則為 1,表示需要 1 個訊號量集,否則根據所需的訊號量集數量而定。
第三個引數 semflg 指定所需的訊號量標誌,例如 IPC_CREAT(如果訊號量不存在則建立)或 IPC_EXCL(與 IPC_CREAT 一起使用以建立訊號量,如果訊號量已存在則呼叫失敗)。還需要傳遞許可權。
注意 - 請參閱前面的部分了解許可權的詳細資訊。
此呼叫成功時將返回有效的訊號量識別符號(用於進一步呼叫訊號量),失敗時返回 -1。要了解失敗的原因,請檢查 errno 變數或 perror() 函式。
此呼叫相關的各種錯誤包括 EACCESS(許可權被拒絕)、EEXIST(佇列已存在,無法建立)、ENOENT(佇列不存在)、ENOMEM(沒有足夠的記憶體來建立佇列)、ENOSPC(超過最大集合限制)等。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int semop(int semid, struct sembuf *semops, size_t nsemops)
此係統呼叫對 System V 訊號量集執行操作,例如分配資源、等待資源或釋放資源。需要傳遞以下引數:
第一個引數 semid 指示由 semget() 建立的訊號量集識別符號。
第二個引數 semops 是指向要對訊號量集執行的運算元組的指標。結構如下:
struct sembuf {
unsigned short sem_num; /* Semaphore set num */
short sem_op; /* Semaphore operation */
short sem_flg; /* Operation flags, IPC_NOWAIT, SEM_UNDO */
};
上述結構中的元素 sem_op 指示需要執行的操作:
如果 sem_op 為負數,則分配或獲取資源。阻塞呼叫程序,直到其他程序釋放足夠的資源,以便此程序可以分配。
如果 sem_op 為零,則呼叫程序等待或休眠,直到訊號量值達到 0。
如果 sem_op 為正數,則釋放資源。
例如:
struct sembuf sem_lock = { 0, -1, SEM_UNDO };
struct sembuf sem_unlock = {0, 1, SEM_UNDO };
第三個引數 nsemops 是該陣列中的運算元。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int semctl(int semid, int semnum, int cmd, …)
此係統呼叫對 System V 訊號量執行控制操作。需要傳遞以下引數:
第一個引數 semid 是訊號量的識別符號。此 ID 是訊號量識別符號,它是 semget() 系統呼叫的返回值。
第二個引數 semnum 是訊號量的編號。訊號量從 0 開始編號。
第三個引數 cmd 是執行所需訊號量控制操作的命令。
第四個引數的型別是 union semun,取決於 cmd。對於某些情況,第四個引數不適用。
讓我們檢查一下 union semun:
union semun {
int val; /* val for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT and IPC_SET */
unsigned short *array; /* Buffer for GETALL and SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO and SEM_INFO*/
};
在 sys/sem.h 中定義的 semid_ds 資料結構如下:
struct semid_ds {
struct ipc_perm sem_perm; /* Permissions */
time_t sem_otime; /* Last semop time */
time_t sem_ctime; /* Last change time */
unsigned long sem_nsems; /* Number of semaphores in the set */
};
注意 - 請參閱手冊頁瞭解其他資料結構。
union semun arg; cmd 的有效值為:
IPC_STAT - 將 struct semid_ds 的每個成員的當前值的信 息複製到 arg.buf 指向的結構中。此命令需要對訊號量具有讀取許可權。
IPC_SET - 設定所有者的使用者 ID、組 ID、許可權等,這些都由 semid_ds 結構指向。
IPC_RMID - 刪除訊號量集。
IPC_INFO - 返回有關訊號量限制和引數的資訊,這些資訊包含在 arg.__buf 指向的 semid_ds 結構中。
SEM_INFO - 返回一個 seminfo 結構,其中包含有關訊號量消耗的系統資源的資訊。
此呼叫將返回一個值(非負值),具體取決於傳遞的命令。成功時,IPC_INFO 和 SEM_INFO 或 SEM_STAT 返回最高使用條目的索引或識別符號(根據訊號量),或者 GETNCNT 的 semncnt 值,或者 GETPID 的 sempid 值,或者 GETVAL 的 semval 值,其他操作成功時返回 0,失敗時返回 -1。要了解失敗的原因,請檢查 errno 變數或 perror() 函式。
在檢視程式碼之前,讓我們瞭解一下它的實現:
建立兩個程序,例如子程序和父程序。
建立共享記憶體,主要用於儲存計數器和其他標誌,以指示共享記憶體中讀/寫程序的結束。
計數器由父程序和子程序遞增。計數器值可以作為命令列引數傳遞,也可以作為預設值(如果未作為命令列引數傳遞或值小於 10000)。呼叫具有特定休眠時間,以確保父程序和子程序同時訪問共享記憶體,即並行訪問。
由於計數器由父程序和子程序以 1 為步長遞增,因此最終值應該是計數器的兩倍。由於父程序和子程序同時執行操作,因此計數器未按要求遞增。因此,我們需要確保一個程序完成之後才能執行另一個程序。
所有上述實現都在檔案 shm_write_cntr.c 中執行。
檢查計數器值是否在檔案 shm_read_cntr.c 中實現。
為了確保完成,訊號量程式在檔案 shm_write_cntr_with_sem.c 中實現。在整個程序完成後(讀取完成後)刪除訊號量。
由於我們有單獨的檔案來讀取共享記憶體中的計數器值,並且寫入不會對其產生任何影響,因此讀取程式保持不變 (shm_read_cntr.c)
最好在一個終端中執行寫入程式,在另一個終端中執行讀取程式。由於程式只有在寫入和讀取過程完成後才能完成執行,因此在完全執行寫入程式後執行程式是可以的。寫入程式將等待讀取程式執行,並且只有在讀取程式完成後才會結束。
無訊號量的程式。
/* Filename: shm_write_cntr.c */
#include<stdio.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<sys/types.h>
#include<string.h>
#include<errno.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#define SHM_KEY 0x12345
struct shmseg {
int cntr;
int write_complete;
int read_complete;
};
void shared_memory_cntr_increment(int pid, struct shmseg *shmp, int total_count);
int main(int argc, char *argv[]) {
int shmid;
struct shmseg *shmp;
char *bufptr;
int total_count;
int sleep_time;
pid_t pid;
if (argc != 2)
total_count = 10000;
else {
total_count = atoi(argv[1]);
if (total_count < 10000)
total_count = 10000;
}
printf("Total Count is %d\n", total_count);
shmid = shmget(SHM_KEY, sizeof(struct shmseg), 0644|IPC_CREAT);
if (shmid == -1) {
perror("Shared memory");
return 1;
}
// Attach to the segment to get a pointer to it.
shmp = shmat(shmid, NULL, 0);
if (shmp == (void *) -1) {
perror("Shared memory attach");
return 1;
}
shmp->cntr = 0;
pid = fork();
/* Parent Process - Writing Once */
if (pid > 0) {
shared_memory_cntr_increment(pid, shmp, total_count);
} else if (pid == 0) {
shared_memory_cntr_increment(pid, shmp, total_count);
return 0;
} else {
perror("Fork Failure\n");
return 1;
}
while (shmp->read_complete != 1)
sleep(1);
if (shmdt(shmp) == -1) {
perror("shmdt");
return 1;
}
if (shmctl(shmid, IPC_RMID, 0) == -1) {
perror("shmctl");
return 1;
}
printf("Writing Process: Complete\n");
return 0;
}
/* Increment the counter of shared memory by total_count in steps of 1 */
void shared_memory_cntr_increment(int pid, struct shmseg *shmp, int total_count) {
int cntr;
int numtimes;
int sleep_time;
cntr = shmp->cntr;
shmp->write_complete = 0;
if (pid == 0)
printf("SHM_WRITE: CHILD: Now writing\n");
else if (pid > 0)
printf("SHM_WRITE: PARENT: Now writing\n");
//printf("SHM_CNTR is %d\n", shmp->cntr);
/* Increment the counter in shared memory by total_count in steps of 1 */
for (numtimes = 0; numtimes < total_count; numtimes++) {
cntr += 1;
shmp->cntr = cntr;
/* Sleeping for a second for every thousand */
sleep_time = cntr % 1000;
if (sleep_time == 0)
sleep(1);
}
shmp->write_complete = 1;
if (pid == 0)
printf("SHM_WRITE: CHILD: Writing Done\n");
else if (pid > 0)
printf("SHM_WRITE: PARENT: Writing Done\n");
return;
}
編譯和執行步驟
Total Count is 10000 SHM_WRITE: PARENT: Now writing SHM_WRITE: CHILD: Now writing SHM_WRITE: PARENT: Writing Done SHM_WRITE: CHILD: Writing Done Writing Process: Complete
現在,讓我們檢查共享記憶體讀取程式。
/* Filename: shm_read_cntr.c */
#include<stdio.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<sys/types.h>
#include<string.h>
#include<errno.h>
#include<stdlib.h>
#include<unistd.h>
#define SHM_KEY 0x12345
struct shmseg {
int cntr;
int write_complete;
int read_complete;
};
int main(int argc, char *argv[]) {
int shmid, numtimes;
struct shmseg *shmp;
int total_count;
int cntr;
int sleep_time;
if (argc != 2)
total_count = 10000;
else {
total_count = atoi(argv[1]);
if (total_count < 10000)
total_count = 10000;
}
shmid = shmget(SHM_KEY, sizeof(struct shmseg), 0644|IPC_CREAT);
if (shmid == -1) {
perror("Shared memory");
return 1;
}
// Attach to the segment to get a pointer to it.
shmp = shmat(shmid, NULL, 0);
if (shmp == (void *) -1) {
perror("Shared memory attach");
return 1;
}
/* Read the shared memory cntr and print it on standard output */
while (shmp->write_complete != 1) {
if (shmp->cntr == -1) {
perror("read");
return 1;
}
sleep(3);
}
printf("Reading Process: Shared Memory: Counter is %d\n", shmp->cntr);
printf("Reading Process: Reading Done, Detaching Shared Memory\n");
shmp->read_complete = 1;
if (shmdt(shmp) == -1) {
perror("shmdt");
return 1;
}
printf("Reading Process: Complete\n");
return 0;
}
編譯和執行步驟
Reading Process: Shared Memory: Counter is 11000 Reading Process: Reading Done, Detaching Shared Memory Reading Process: Complete
如果觀察上面的輸出,計數器應該是 20000,但是,由於在一個程序任務完成之前,另一個程序也在並行處理,因此計數器值並非預期值。輸出會因系統而異,並且每次執行都會有所不同。為了確保兩個程序在完成一個任務後執行任務,應該使用同步機制來實現。
現在,讓我們使用訊號量檢查相同的應用程式。
注意 - 讀取程式保持不變。
/* Filename: shm_write_cntr_with_sem.c */
#include<stdio.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<sys/sem.h>
#include<string.h>
#include<errno.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#define SHM_KEY 0x12345
#define SEM_KEY 0x54321
#define MAX_TRIES 20
struct shmseg {
int cntr;
int write_complete;
int read_complete;
};
void shared_memory_cntr_increment(int, struct shmseg*, int);
void remove_semaphore();
int main(int argc, char *argv[]) {
int shmid;
struct shmseg *shmp;
char *bufptr;
int total_count;
int sleep_time;
pid_t pid;
if (argc != 2)
total_count = 10000;
else {
total_count = atoi(argv[1]);
if (total_count < 10000)
total_count = 10000;
}
printf("Total Count is %d\n", total_count);
shmid = shmget(SHM_KEY, sizeof(struct shmseg), 0644|IPC_CREAT);
if (shmid == -1) {
perror("Shared memory");
return 1;
}
// Attach to the segment to get a pointer to it.
shmp = shmat(shmid, NULL, 0);
if (shmp == (void *) -1) {
perror("Shared memory attach: ");
return 1;
}
shmp->cntr = 0;
pid = fork();
/* Parent Process - Writing Once */
if (pid > 0) {
shared_memory_cntr_increment(pid, shmp, total_count);
} else if (pid == 0) {
shared_memory_cntr_increment(pid, shmp, total_count);
return 0;
} else {
perror("Fork Failure\n");
return 1;
}
while (shmp->read_complete != 1)
sleep(1);
if (shmdt(shmp) == -1) {
perror("shmdt");
return 1;
}
if (shmctl(shmid, IPC_RMID, 0) == -1) {
perror("shmctl");
return 1;
}
printf("Writing Process: Complete\n");
remove_semaphore();
return 0;
}
/* Increment the counter of shared memory by total_count in steps of 1 */
void shared_memory_cntr_increment(int pid, struct shmseg *shmp, int total_count) {
int cntr;
int numtimes;
int sleep_time;
int semid;
struct sembuf sem_buf;
struct semid_ds buf;
int tries;
int retval;
semid = semget(SEM_KEY, 1, IPC_CREAT | IPC_EXCL | 0666);
//printf("errno is %d and semid is %d\n", errno, semid);
/* Got the semaphore */
if (semid >= 0) {
printf("First Process\n");
sem_buf.sem_op = 1;
sem_buf.sem_flg = 0;
sem_buf.sem_num = 0;
retval = semop(semid, &sem_buf, 1);
if (retval == -1) {
perror("Semaphore Operation: ");
return;
}
} else if (errno == EEXIST) { // Already other process got it
int ready = 0;
printf("Second Process\n");
semid = semget(SEM_KEY, 1, 0);
if (semid < 0) {
perror("Semaphore GET: ");
return;
}
/* Waiting for the resource */
sem_buf.sem_num = 0;
sem_buf.sem_op = 0;
sem_buf.sem_flg = SEM_UNDO;
retval = semop(semid, &sem_buf, 1);
if (retval == -1) {
perror("Semaphore Locked: ");
return;
}
}
sem_buf.sem_num = 0;
sem_buf.sem_op = -1; /* Allocating the resources */
sem_buf.sem_flg = SEM_UNDO;
retval = semop(semid, &sem_buf, 1);
if (retval == -1) {
perror("Semaphore Locked: ");
return;
}
cntr = shmp->cntr;
shmp->write_complete = 0;
if (pid == 0)
printf("SHM_WRITE: CHILD: Now writing\n");
else if (pid > 0)
printf("SHM_WRITE: PARENT: Now writing\n");
//printf("SHM_CNTR is %d\n", shmp->cntr);
/* Increment the counter in shared memory by total_count in steps of 1 */
for (numtimes = 0; numtimes < total_count; numtimes++) {
cntr += 1;
shmp->cntr = cntr;
/* Sleeping for a second for every thousand */
sleep_time = cntr % 1000;
if (sleep_time == 0)
sleep(1);
}
shmp->write_complete = 1;
sem_buf.sem_op = 1; /* Releasing the resource */
retval = semop(semid, &sem_buf, 1);
if (retval == -1) {
perror("Semaphore Locked\n");
return;
}
if (pid == 0)
printf("SHM_WRITE: CHILD: Writing Done\n");
else if (pid > 0)
printf("SHM_WRITE: PARENT: Writing Done\n");
return;
}
void remove_semaphore() {
int semid;
int retval;
semid = semget(SEM_KEY, 1, 0);
if (semid < 0) {
perror("Remove Semaphore: Semaphore GET: ");
return;
}
retval = semctl(semid, 0, IPC_RMID);
if (retval == -1) {
perror("Remove Semaphore: Semaphore CTL: ");
return;
}
return;
}
編譯和執行步驟
Total Count is 10000 First Process SHM_WRITE: PARENT: Now writing Second Process SHM_WRITE: PARENT: Writing Done SHM_WRITE: CHILD: Now writing SHM_WRITE: CHILD: Writing Done Writing Process: Complete
現在,我們將透過讀取程序檢查計數器值。
執行步驟
Reading Process: Shared Memory: Counter is 20000 Reading Process: Reading Done, Detaching Shared Memory Reading Process: Complete