#include<stdio.h> #include<unistd.h> #include<sys/types.h> main() { intfd[2]; pid_t childpid; pipe(fd); if((childpid=fork())==-1) { perror("fork"); exit(1); } if(childpid==0) { /*Child PRocess closes up in put side of pipe*/ close(fd[0]); } else { /*Parent process closes up out put side of pipe*/ close(fd[1]); }.. }
#include<stdio.h> #include<unistd.h> #include<sys/types.h> intmain(void) { intfd[2],nbytes; pid_tchildpid; charstring[]="Hello,world! "; charreadbuffer[80]; pipe(fd); if((childpid=fork())==-1) { perror("fork"); exit(1); } if(childpid==0) { /*Child process closes up in put side of pipe*/ close(fd[0]); /*Send"string"through the out put side of pipe*/
write(fd[1],string,strlen(string)); exit(0); } else { /*Parent process closes up out put side of pipe*/ close(fd[1]); /*Readinastringfromthepipe*/ nbytes=read(fd[0],readbuffer,sizeof(readbuffer)); printf("Receivedstring:%s",readbuffer); } return(0); }
再看下面的程序: .. childpid=fork(); if(childpid==0) { /*Close up standard input of the child*/ close(0); /*Dup licate the input side of pipe to stdin*/ dup(fd[0]); execlp("sort","sort",NULL); . } 因為文件描述符0(stdin)被關閉,所以dup()把管道的輸入描述符復制到它的標準輸入中。這樣我們可以調用execlp(),使用sort程序覆蓋子進程的正文段。因為新創(chuàng)建的程序從它的父進程中繼續(xù)了標準輸入/輸出流,所以它實際上繼續(xù)了管道的輸入端作為它的標準輸入端。現(xiàn)在,最初的父進程送往管道的任何數(shù)據(jù)都將會直接送往sort函數(shù)。
intsend_message(int qid,struct mymsgbuf *qbuf) { intresult,length; /*The length is essentially the size of the structure minus sizeof(mtype)*/ length=sizeof(structmymsgbuf)-sizeof(long); if((result=msgsnd(qid,qbuf,length,0))==-1) { return(-1); } return(result); }
int change_queue_mode(int qid, char *mode ) { struct msqid_ds tmpbuf; /* Retrieve a current copy of the internal data structure */ get_queue_ds( qid, &tmpbuf); /* Change the permissions using an old trick */ sscanf(mode, "%ho", &tmpbuf.msg_perm.mode); /* Update the internal data structure */ if( msgctl( qid, IPC_SET, &tmpbuf) == -1) { return(-1); } return( }
信號量是一個可以用來控制多個進程存取共享資源的計數(shù)器。它經(jīng)常作為一種鎖定機制來防止當一個進程正在存取共享資源時,另一個進程也存取同一資源。下面先簡要地介紹一下信號量中涉及到的數(shù)據(jù)結構。 1.內核中的數(shù)據(jù)結構semid_ds 和消息隊列一樣,系統(tǒng)內核為內核地址空間中的每一個信號量集都保存了一個內部的數(shù)據(jù)結構。數(shù)據(jù)結構的原型是semid_ds。它是在linux/sem.h中做如下定義的: /*One semid data structure for each set of semaphores in the system.*/ structsemid_ds{ structipc_permsem_perm;/*permissions..seeipc.h*/ time_tsem_otime;/*last semop time*/ time_tsem_ctime;/*last change time*/ structsem*sem_base;/*ptr to first semaphore in array*/ structwait_queue*eventn; structwait_queue*eventz; structsem_undo*undo;/*undo requestson this array*/ ushortsem_nsems;/*no. of semaphores in array*/ }; sem_perm是在linux/ipc.h定義的數(shù)據(jù)結構ipc_perm的一個實例。它保存有信號量集的存取權限的信息,以及信號量集創(chuàng)建者的有關信息。 sem_otime最后一次semop()操作的時間。 sem_ctime最后一次改動此數(shù)據(jù)結構的時間。 sem_base指向數(shù)組中第一個信號量的指針。 sem_undo數(shù)組中沒有完成的請求的個數(shù)。 sem_nsems信號量集(數(shù)組)中的信號量的個數(shù)。
數(shù)據(jù)結構sem。它也是在linux/sem.h中定義的: /*One semaphore structure for each semaphore in the system.*/ structsem{ shortsempid;/*pid of las tOperation*/ ushortsemval;/*current value*/ ushortsemncnt;/*num procs awaiting increase in semval*/ ushortsemzcnt;/*num procs awaiting semval=0*/ }; sem_pid最后一個操作的PID(進程ID)。 sem_semval信號量的當前值。 sem_semncnt等待資源的進程數(shù)目。 sem_semzcnt等待資源完全空閑的進程數(shù)目。
/*semop systemcall takes an array of these*/ structsembuf{ ushortsem_num;/*semaphore index in array*/ shortsem_op;/*semaphore operation*/ shortsem_flg;/*operation flags*/ sem_num將要處理的信號量的個數(shù)。 sem_op要執(zhí)行的操作。 sem_flg操作標志。
參數(shù)arg代表一個semun的實例。semun是在linux/sem.h中定義的: /*arg for semctl systemcalls.*/ unionsemun{ intval;/*value for SETVAL*/ structsemid_ds*buf;/*buffer for IPC_STAT&IPC_SET*/ ushort*array;/*array for GETALL&SETALL*/ structseminfo*__buf;/*buffer for IPC_INFO*/ void*__pad;
/* One shmid data structure for each shared memory segment in the system. */ struct shmid_ds { struct ipc_perm shm_perm; /* operation perms */ int shm_segsz; /* size of segment (bytes) */ time_t shm_atime; /* last attach time */ time_t shm_dtime; /* last detach time */ time_t shm_ctime; /* last change time */ unsigned short shm_cpid; /* pid of creator */ unsigned short shm_lpid; /* pid of last operator */
short shm_nattch; /* no. of current attaches */ /* the following are private */ unsigned short shm_npages; /* size of segment (pages) */ unsigned long *shm_pages; /* array of ptrs to frames -> SHMMAX */ struct vm_area_struct *attaches; /* descriptors for attaches */ };
shm_perm 是數(shù)據(jù)結構ipc_perm的一個實例。這里保存的是內存段的存取權限,和其他的有關內存段創(chuàng)建者的信息。 shm_segsz 內存段的字節(jié)大小。 shm_atime 最后一個進程存取內存段的時間。 shm_dtime 最后一個進程離開內存段的時間。 shm_ctime 內存段最后改動的時間。 shm_cpid 內存段創(chuàng)建進程的P I D。 shm_lpid 最后一個使用內存段的進程的P I D。 shm_nattch 當前使用內存段的進程總數(shù)。
系統(tǒng)調用: shmat(); 原型:int shmat ( int shmid, char *shmaddr, int shmflg); 返回值:假如成功,則返回共享內存段連接到進程中的地址。假如失敗,則返回- 1:errno = EINVAL (無效的IPC ID 值或者無效的地址) ENOMEM (沒有足夠的內存) EACCES (存取權限不夠) 假如參數(shù)a d d r的值為0,那么系統(tǒng)內核則試圖找出一個沒有映射的內存區(qū)域。我們推薦使用這種方法。你可以指定一個地址,但這通常是為了加快對硬件設備的存取,或者解決和其他程序的沖突。 下面的程序中的調用參數(shù)是一個內存段的I P C標識符,返回內存段連接的地址:
這樣是否達到了我們的要求了呢?不盡如此,因為依靠時間的延遲執(zhí)行同步是不可靠的。這里碰到的情形和一個分布程序和共享資源的情形一樣。共享的資源是標準的輸出設備,分布計算的程序是三個線程。 其實這里還有另外一個錯誤。函數(shù)sleep和函數(shù)e x i t一樣和進程有關。當線程調用sleep時,整個的進程都處于睡眠狀態(tài),也就是說,所有的三個線程都進入睡眠狀態(tài)。這樣我們實際上沒有解決任何的問題。希望使一個線程睡眠的函數(shù)是pthread_delay_np。例如讓一個線程睡眠2秒鐘,用如下程序:
(void *)&reader_function, NULL); w r i t e r _ f u n c t i o n ( ) ; } void writer_function(void) { w h i l e ( 1 ) { semaphore_down( &writers_turn ); b u ffer = make_new_item(); semaphore_up( &readers_turn ); } } void reader_function(void) { w h i l e ( 1 ) { semaphore_down( &readers_turn ); consume_item( buffer ); semaphore_up( &writers_turn ); } } 這個例子也沒有完全地利用一般信號量的所有函數(shù)。我們可以使用信號量重新編寫“Hello world” 的程序: void print_message_function( void *ptr ); Semaphore child_counter; Semaphore worlds_turn; main( ) { pthread_t thread1, thread2; char *message1 = "Hello"; char *message2 = "Wo r l d " ; semaphore_init( &child_counter ); semaphore_init( &worlds_turn ); semaphore_down( &worlds_turn ); /* world goes second */ semaphore_decrement( &child_counter ); /* value now 0 */ semaphore_decrement( &child_counter ); /* value now -1 */ /* * child_counter now must be up-ed 2 times for a thread blocked on it * to be released * * / pthread_create( &thread1, pthread_attr_default, (void *) &print_message_function, (void *) message1); semaphore_down( &worlds_turn ); pthread_create(&thread2, pthread_attr_default, (void *) &print_message_function, (void *) message2); semaphore_down( &child_counter ); /* not really necessary to destroy since we are exiting anyway */ semaphore_destroy ( &child_counter ); semaphore_destroy ( &worlds_turn ); e x i t ( 0 ) ; } void print_message_function( void *ptr ) { char *message; message = (char *) ptr; printf("%s ", message); fflush(stdout); semaphore_up( &worlds_turn ); semaphore_up( &child_counter ); p t h r e a d _ e x i t ( 0 ) ; } 信號量c h i l d _ c o u n t e r用來強迫父線程阻塞,直到兩個子線程執(zhí)行完p r i n t f語句和其后的semaphore_up( &child_counter )語句才繼續(xù)執(zhí)行。 Semaphore.h
#ifndef SEMAPHORES #define SEMAPHORES #include #include typedef struct Semaphore { int v; pthread_mutex_t mutex; pthread_cond_t cond; } S e m a p h o r e ; int semaphore_down (Semaphore * s); int semaphore_decrement (Semaphore * s); int semaphore_up (Semaphore * s); void semaphore_destroy (Semaphore * s); void semaphore_init (Semaphore * s); int semaphore_value (Semaphore * s); int tw_pthread_cond_signal (pthread_cond_t * c); int tw_pthread_cond_wait (pthread_cond_t * c, pthread_mutex_t * m); int tw_pthread_mutex_unlock (pthread_mutex_t * m); int tw_pthread_mutex_lock (pthread_mutex_t * m); void do_error (char *msg); # e n d i f
Semaphore.c
#include "semaphore.h" / *
* function must be called prior to semaphore use. * * / v o i d semaphore_init (Semaphore * s) { s->v = 1; if (pthread_mutex_init (&(s->mutex), pthread_mutexattr_default) == -1) do_error ("Error setting up semaphore mutex"); if (pthread_cond_init (&(s->cond), pthread_condattr_default) == -1) do_error ("Error setting up semaphore condition signal"); * function should be called when there is no longer a need for * the semaphore. * * / v o i d semaphore_destroy (Semaphore * s) { if (pthread_mutex_destroy (&(s->mutex)) == -1) do_error ("Error destroying semaphore mutex"); if (pthread_cond_destroy (&(s->cond)) == -1) do_error ("Error destroying semaphore condition signal"); } / * * function increments the semaphore and signals any threads that * are blocked waiting a change in the semaphore. * * / i n t semaphore_up (Semaphore * s) { int value_after_op; tw_pthread_mutex_lock (&(s->mutex)); ( s - > v ) + + ; value_after_op = s->v; tw_pthread_mutex_unlock (&(s->mutex)); tw_pthread_cond_signal (&(s->cond)); return (value_after_op); } / * * function decrements the semaphore and blocks if the semaphore is * <= 0 until another thread signals a change. * * / i n t semaphore_down (Semaphore * s) { int value_after_op; tw_pthread_mutex_lock (&(s->mutex)); while (s->v <= 0) { tw_pthread_cond_wait (&(s->cond), &(s->mutex)); } ( s - > v ) - - ; value_after_op = s->v; tw_pthread_mutex_unlock (&(s->mutex)); return (value_after_op); } / * * function does NOT block but simply decrements the semaphore. * should not be used instead of down -- only for programs where * multiple threads must up on a semaphore before another thread * can go down, i.e., allows programmer to set the semaphore to * a negative value prior to using it for synchronization. * * / i n t semaphore_decrement (Semaphore * s) { int value_after_op; tw_pthread_mutex_lock (&(s->mutex)); s - > v - - ; value_after_op = s->v; tw_pthread_mutex_unlock (&(s->mutex)); return (value_after_op); } / * * function returns the value of the semaphore at the time the * critical section is accessed. obviously the value is not guarenteed * after the function unlocks the critical section. provided only * for casual debugging, a better approach is for the programmar to * protect one semaphore with another and then check its value. * an alternative is to simply record the value returned by semaphore_up
* or semaphore_down. * * / i n t semaphore_value (Semaphore * s) { /* not for sync */ int value_after_op; tw_pthread_mutex_lock (&(s->mutex)); value_after_op = s->v; tw_pthread_mutex_unlock (&(s->mutex)); return (value_after_op); } /* -------------------------------------------------------------------- */ /* The following functions replace standard library functions in that */ /* they exit on any error returned from the system calls. Saves us */ /* from having to check each and every call above. */ /* -------------------------------------------------------------------- */ i n t tw_pthread_mutex_unlock (pthread_mutex_t * m) { int return_value; if ((return_value = pthread_mutex_unlock (m)) == -1) do_error ("pthread_mutex_unlock"); return (return_value); } i n t tw_pthread_mutex_lock (pthread_mutex_t * m) { int return_value; if ((return_value = pthread_mutex_lock (m)) == -1) do_error ("pthread_mutex_lock"); return (return_value); } i n t tw_pthread_cond_wait (pthread_cond_t * c, pthread_mutex_t * m) { int return_value; if ((return_value = pthread_cond_wait (c, m)) == -1) do_error ("pthread_cond_wait"); return (return_value); } i n t tw_pthread_cond_signal (pthread_cond_t * c) { int return_value; if ((return_value = pthread_cond_signal (c)) == -1) do_error ("pthread_cond_signal"); return (return_value); } / * * function just prints an error message and exits * * / v o i d do_error (char *msg) { perror (msg); exit (1); }