– L¨osung zur Praktikumsaufgabe 2 –
Thema: Semaphore II
1. Der Prozess erzeugt zun¨ achst eine Semaphore und initialisiert diese offen. Danach er- zeugt er einen Sohn, schl¨ aft zwei Sekunden und f¨ uhrt dann P() ¨ uber der Semaphore aus.
Die Semaphore wurde in der Zwischenzeit durch den Sohn gesperrt, das SEM_UNDO- Flag wurde allerdings gesetzt. Der Sohn schl¨ aft zehn Sekunden und beendet sich dann.
Der Semaphorausgleichswert sorgt nun daf¨ ur, dass der Vater die P()-Operation kom- plettiert, da das P() des Sohnes bei dessen Beendigung r¨ uckg¨ angig gemacht wird. Der Vater ¨ offnet zuletzt die Semaphore per V().
Im zweiten Durchgang erfolgt genau der gleiche Ablauf mit dem Unterschied, dass bei der P()-Operation des Sohnes nun das SEM_UNDO-Flag nicht gesetzt wird. Da- mit ist der Vater unendlich blockiert, die Beendigung des Sohnes ¨ andert daran nichts.
Die einzelnen Etappen der Demonstration werden durch geeignete Kommentare ver- anschaulicht.
Listing 1: L¨osung der Aufgabe 1 (sem-undo-demo.c)
/*
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
union semun {
int val; /* Value for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
unsigned short *array; /* Array for GETALL, SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO (Linux-specific) */
};
int main(int argc, char *argv[]) {
int semid, sonid, ret;
union semun mysemun;
struct sembuf mysemop;
semid = semget(IPC_PRIVATE, 1, IPC_CREAT|S_IRUSR|S_IWUSR);
if (semid == -1) {
perror("semget(), father");
}
printf("Father: Semaphore created.\n");
mysemun.val = 1;
ret = semctl(semid, 0, SETVAL, mysemun);
if (ret == -1) {
perror("semctl(), SETVAL, father");
exit(EXIT_FAILURE);
}
printf("Father: Semaphore initialized as open.\n");
sonid = fork();
if (sonid == -1) {
perror("fork(), father");
exit(EXIT_FAILURE);
}
if (sonid == 0) { /* Son */
/* P */
mysemop.sem_num = 0;
mysemop.sem_op = -1;
mysemop.sem_flg = SEM_UNDO;
ret = semop(semid, &mysemop, 1);
if (ret == -1) {
perror("Son: semop, P() failed.");
exit(EXIT_FAILURE);
}
printf("Son: Semaphore closed, SEM_UNDO set.\n");
printf("Son: sleeping for 10 seconds.\n");
sleep(10);
printf("Son: exiting. Father should wakeup on semaphore.\n");
exit(EXIT_SUCCESS);
}
else { /* Father */
printf("Father: Father sleeps for 2 seconds.\n");
sleep(2);
/* P */
printf("Father: P() - should block until son exits.\n");
mysemop.sem_num = 0;
mysemop.sem_op = -1;
mysemop.sem_flg = 0;
ret = semop(semid, &mysemop, 1);
if (ret == -1) {
perror("Father: semop, P() failed.");
exit(EXIT_FAILURE);
}
printf("Father: P() completed\n");
}
/* Father only comes here */
/* V */
mysemop.sem_num = 0;
mysemop.sem_op = +1;
mysemop.sem_flg = 0;
ret = semop(semid, &mysemop, 1);
if (ret == -1) {
perror("Father: semop, V() failed.");
exit(EXIT_FAILURE);
}
printf("Father: V() - semaphore open again.\n");
sonid = fork();
if (sonid == -1) {
perror("fork(), father");
exit(EXIT_FAILURE);
}
if (sonid == 0) { /* Son */
/* P */
mysemop.sem_num = 0;
mysemop.sem_op = -1;
mysemop.sem_flg = 0;
ret = semop(semid, &mysemop, 1);
if (ret == -1) {
perror("Son: semop, P() failed.");
exit(EXIT_FAILURE);
}
printf("Son: Semaphore closed, SEM_UNDO _not_ set.\n");
printf("Son: sleeping for 10 seconds.\n");
sleep(10);
printf("Son: exiting. Father will wait on semaphore forever.\n");
exit(EXIT_SUCCESS);
}
else { /* Father */
printf("Father: Father sleeps for 2 seconds.\n");
sleep(2);
/* P */
printf("Father: P() \n");
mysemop.sem_num = 0;
mysemop.sem_op = -1;
mysemop.sem_flg = 0;
ret = semop(semid, &mysemop, 1);
if (ret == -1) {
perror("Father: semop, P() failed.");
exit(EXIT_FAILURE);
}
/* Never reached without intervention */
printf("Father: P() completed\n");
}
/* Father only comes here */
ret = semctl(semid, 1, IPC_RMID);
if (ret == -1) {
perror("semctl(), IPC_RMID, father");
exit(EXIT_FAILURE);
}
printf("Father: Semaphore removed.\n");
exit(EXIT_SUCCESS);
}
2. Das erste Listing zeigt eine Implementierung auf der Basis von System-V-Semaphoren und mehreren Prozessen. Die gemeinsam genutzte Datenbasis ist eine Datei RESSFILE.
Der Z¨ ahler f¨ ur die Leseprozesse rc wird durch eine Semaphore implementiert. Nach Beendigung aller Lese- und Schreibprozesse pr¨ uft der Vaterprozess, ob RESSFILE den korrekten Endwert enth¨ alt (jeder Schreiber inkrementiert den Wert von RESSFILE ITERATIONSmal um 1).
Listing 2: L¨osung der Aufgabe 2 mittels System-V-Semaphoren und Prozessen (rw-problem-sem.c)
/*
implements the ’classic’ solution to the readers writers problem which
prioritizes readers
- uses processes and System V semaphores - the shared resource is a file RESSFILE
- instead of a set containing two semaphores, _two sets_ with one semaphore each are used
- the shared counter ’rc’ is implemented by another set of semaphores
Literature:
[1] P.J. Courtois, F. Heymans, D.L. Parnas: Concurrent Control with
‘‘Readers’’ and ‘‘Writers’’, CACM 10(14)1971, pp. 667-668 [2] Andrew S. Tanenbaum, Modern Operating Systems, 3rd ed.,
Pearson,
2009, pp.165-166
[3] William Stallings, Operating Systems, 6th ed., Prentice Hall, 2007,
pp. 245ff and many more
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <fcntl.h>
#include <limits.h>
#define MAXPROCESSES 20000
#define RESSFILE "zahl.dat"
#define ITERATIONS 100
#define INITVALUE 0UL
#define SEMKEY_MUTEX 0xDEADBEEF
#define SEMKEY_WRI 0xDEAFBABE
#define SEMKEY_RC 0xABCDABCD
/* The calling program must define union semun */
union semun {
int val; /* Value for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
unsigned short *array; /* Array for GETALL, SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO (Linux-specific) */
};
/* some prototypes */
void fatal(char*);
void write_file(void);
void read_file(void);
/*
global resources
*/
int id_mutex, id_wri; /* semids of semaphores */
int id_rc; /* semid of semaphore abused as counter rc (!) */
unsigned long readers, writers;
/*
a fatal error has occured -> exit
*/
void fatal(char *msg) {
assert (msg != NULL);
perror(msg);
exit(EXIT_FAILURE);
} /*
a non-fatal error has occured -> warn, but continue
*/
void nonfatal(char *msg) {
assert (msg != NULL);
perror(msg);
return;
}
void P(int semid) {
struct sembuf mysemop;
mysemop.sem_num = 0;
mysemop.sem_op = -1;
mysemop.sem_flg = 0;
if ( semop(semid, &mysemop, 1) == -1 ) { fatal("P()");
} }
void V(int semid) {
struct sembuf mysemop;
mysemop.sem_num = 0;
mysemop.sem_op = 1;
mysemop.sem_flg = 0;
if ( semop(semid, &mysemop, 1) == -1 ) { fatal("V()");
} }
void writer(void) {
int c;
printf("Writer, PID %d, starts\n", getpid());
for (c=0; c < ITERATIONS; c++) { P(id_wri);
write_file();
V(id_wri);
}
printf("Writer, PID %d, finished\n", getpid());
exit(EXIT_SUCCESS);
}
void reader(void) {
int c, ret;
union semun rsemun;
printf("Reader, PID %d, starts\n", getpid());
for (c=0; c < ITERATIONS; c++) { P(id_mutex);
/* rc += 1 */
ret = semctl(id_rc, 0, GETVAL);
if (ret == -1) {
fatal("semctl(), GETVAL, rc+=1");
}
rsemun.val = ret + 1;
semctl(id_rc,0, SETVAL, rsemun);
if (ret == -1) {
fatal("semctl(), SETVAL, rc+=1");
}
if (rsemun.val == 1) { P(id_wri);
}
V(id_mutex);
read_file();
P(id_mutex);
/* rc -= 1 */
ret = semctl(id_rc, 0, GETVAL);
if (ret == -1) {
fatal("semctl(), GETVAL, rc-=1");
}
rsemun.val = ret - 1;
semctl(id_rc,0, SETVAL, rsemun);
if (ret == -1) {
fatal("semctl(), SETVAL, rc-=1");
}
if (rsemun.val == 0) { V(id_wri);
}
V(id_mutex);
}
printf("Reader, PID %d, finished\n", getpid());
exit(EXIT_SUCCESS);
} /*
must be called atomically by writers
*/
void write_file(void) {
FILE *fin;
int ret;
unsigned long x;
fin = fopen(RESSFILE, "r+");
if (fin == NULL) {
fatal("fopen for write");
}
ret = fscanf(fin, "%lu", &x); /* returns # of converted items */
if (ret != 1) {
fatal("fscanf during write access");
x+=1;
rewind(fin);
ret = fprintf(fin, "%ld\n", x);
if (ret == -1){
fatal("fprintf");
}
printf("Writer, PID %i, wrote %ld\n", getpid(), x);
ret = fclose(fin);
if (ret == EOF) { fatal("fclose");
} }
void read_file(void) {
FILE *fin;
int ret;
unsigned long x;
fin = fopen(RESSFILE, "r");
if (fin == NULL) {
fatal ("fopen for reading");
}
ret = fscanf(fin, "%lu", &x); /* returns # of converted items */
if (ret != 1) { fatal ("fscanf");
}
ret = fclose(fin);
if (ret == EOF) {
fatal ("fclose after reading");
} }
int main(int argc, char* argv[]) {
pid_t son[MAXPROCESSES], tmppid;
int c, ret;
union semun mysem;
FILE *fin;
unsigned long x;
/* parse command line parameters */
if (argc != 3) {
printf("Usage: %s <readers> <writers>\n", argv[0]);
exit(EXIT_FAILURE);
}
readers = strtoul(argv[1], NULL, 10);
writers = strtoul(argv[2], NULL, 10);
if ((writers == ULONG_MAX) || (readers == ULONG_MAX)) {
printf("Could not convert number of readers of writers.\n");
exit(EXIT_FAILURE);
}
if (readers+writers > MAXPROCESSES) {
printf("Maximum Number of processes (%i) exceeded.\n", MAXPROCESSES);
exit(EXIT_FAILURE);
}
/* create and reset shared file */
fin = fopen(RESSFILE, "w");
if (fin == NULL) {
fatal("main, fopen for write");
}
fprintf(fin, "%lu\n", INITVALUE);
fclose(fin);
/* create named semaphore set ’mutex’ */
id_mutex = semget(SEMKEY_MUTEX, 1, IPC_CREAT|S_IRUSR|S_IWUSR);
if (id_mutex == -1) { fatal("semget mutex");
}
/* create named semaphore set ’wri’ */
id_wri = semget(SEMKEY_WRI, 1, IPC_CREAT|S_IRUSR|S_IWUSR);
if (id_wri == -1) { fatal("semget wri");
}
/* create named semaphore set ’rc’ */
id_rc = semget(SEMKEY_RC, 1, IPC_CREAT|S_IRUSR|S_IWUSR);
if (id_rc == -1) { fatal("semget rc");
}
/* init mutex and wri as "open" */
mysem.val = 1;
ret = semctl(id_mutex, 0, SETVAL, mysem);
if (ret == -1) {
fatal("init mutex");
/* XXX remove semaphore set */
}
ret = semctl(id_wri, 0, SETVAL, mysem);
if (ret == -1) { fatal("init wri");
/* XXX remove semaphore set */
}
/* create processes, readers first */
for (c=0; c<readers; c++) { tmppid = fork();
if (tmppid == -1) {
fatal("fork a reader");
}
if (tmppid != 0) { /* Father */
son[c-1] = tmppid;
else { reader();
assert(0); /* may not be reached, because reader() terminates */
} }
/* only father reaches this */
/* create writers processes */
for ( ; c<readers+writers; c++) { tmppid = fork();
if (tmppid == -1) {
fatal("fork a reader");
}
if (tmppid != 0) { /* Father */
son[c-1] = tmppid;
} else {
writer();
assert(0); /* may not be reached, because writer() terminates */
} }
/* only father reaches this */
/* wait for children to terminate */
for (c=0; c<readers+writers; c++) { tmppid = wait(&ret);
printf("Son with PID %ld returned %d.\n", (long) tmppid, ret);
}
/* destroy the semaphores */
ret = semctl(id_wri, 0, IPC_RMID);
if (ret == -1) {
fatal("delete semaphore wri");
}
ret = semctl(id_mutex, 0, IPC_RMID);
if (ret == -1) {
fatal("delete semaphore mutex");
}
ret = semctl(id_rc, 0, IPC_RMID);
if (ret == -1) {
fatal("delete semaphore rc");
}
/* open shared file and check its final value */
fin = fopen(RESSFILE, "r");
if (fin == NULL) {
nonfatal("main, fopen for check");
}
ret = fscanf(fin, "%lu", &x); /* returns # of converted items */
if (ret != 1) {
nonfatal ("main, fscanf while checking");
}
fclose(fin);
if (x == writers*ITERATIONS) {
printf("Correct value %ld in %s reached.\nDeleting.", x, RESSFILE)
;
ret = unlink(RESSFILE);
if (ret == -1) { fatal("unlink");
} } else {
printf("Incorrect value %ld in %s .\n", x, RESSFILE);
printf("Leaving file for inspection.\n");
}
exit(EXIT_SUCCESS);
}
Uberzeugen Sie sich, dass Schreibprozesse behindert werden (sie werden erst am Schluss ¨ bearbeitet), wenn sehr viele Leseprozesse gestartet werden.
Eine zweite L¨ osung (Listing 2) arbeitet mittels Pthreads und Mutexen. Der Code reflektiert viel besser den eigentlichen Algorithmus und ist damit weitaus besser lesbar.
Listing 3: L¨osung der Aufgabe 2 mittels Pthreads und Mutexen (rw-problem-pthreads.c)
/*
implements the ’classic’ solution to the readers writers problem which
prioritizes readers - uses pthreads
- stack size of the individual threads is set to STACKSIZE (small value),
to permit a large number of threads - the shared resource is a file RESSFILE
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <fcntl.h>
#include <limits.h>
#define MAXTHREADS 20000
#define STACKSIZE 65536
#define RESSFILE "zahl.dat"
#define ITERATIONS 100
#define INITVALUE 0L
/* some helper functions */
void fatal(char*);
void write_file(void);
void read_file(void);
/*
global resources
*/
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t wri = PTHREAD_MUTEX_INITIALIZER;
int rc; /* reader counter */
unsigned long readers, writers;
/*
a fatal error has occured -> print a message, errno and exit
*/
void fatal(char *msg) {
assert (msg != NULL);
perror(msg);
exit(EXIT_FAILURE);
} /*
a non-fatal error has occured -> warn, but continue
*/
void nonfatal(char *msg) {
assert (msg != NULL);
perror(msg);
return;
}
void* writer(void* in) {
int c;
printf("Writer, TID %x, starts\n", (unsigned int) pthread_self());
for (c=0; c < ITERATIONS; c++) { pthread_mutex_lock(&wri);
write_file();
pthread_mutex_unlock(&wri);
}
printf("Writer, TID %x, finished\n", (unsigned int) pthread_self());
pthread_exit(NULL);
}
void* reader(void* in) {
int c;
printf("Reader, TID %x, starts\n", (unsigned int) pthread_self());
for (c=0; c < ITERATIONS; c++) { pthread_mutex_lock(&mutex);
rc += 1;
if (rc == 1) {
pthread_mutex_lock(&wri);
}
pthread_mutex_unlock(&mutex);
read_file();
pthread_mutex_lock(&mutex);
rc -= 1;
if (rc == 0) {
pthread_mutex_unlock(&wri);
}
pthread_mutex_unlock(&mutex);
}
printf("Reader, TID %x, finished\n", (unsigned int) pthread_self());
pthread_exit(NULL);
} /*
must be called atomically by writers
*/
void write_file(void) {
FILE *fin;
int ret;
unsigned long x;
fin = fopen(RESSFILE, "r+");
if (fin == NULL) {
fatal("fopen for write");
}
ret = fscanf(fin, "%lu", &x); /* returns # of converted items */
if (ret != 1) {
fatal("fscanf during write access");
} x+=1;
rewind(fin);
ret = fprintf(fin, "%ld\n", x);
fatal("fprintf");
}
ret = fclose(fin);
if (ret == EOF) { fatal("fclose");
} }
void read_file(void) {
FILE *fin;
int ret;
unsigned long x;
fin = fopen(RESSFILE, "r");
if (fin == NULL) {
fatal ("fopen for reading");
}
ret = fscanf(fin, "%lu", &x); /* returns # of converted items */
if (ret != 1) { fatal ("fscanf");
}
ret = fclose(fin);
if (ret == EOF) {
fatal ("fclose after reading");
} }
int main(int argc, char* argv[]) {
pthread_t son[MAXTHREADS];
pthread_attr_t tattr;
int c, ret;
FILE *fin;
unsigned long x;
/* parse command line parameters */
if (argc != 3) {
printf("Usage: %s <readers> <writers>\n", argv[0]);
exit(EXIT_FAILURE);
}
readers = strtoul(argv[1], NULL, 10);
writers = strtoul(argv[2], NULL, 10);
if ((writers == ULONG_MAX) || (readers == ULONG_MAX)) {
printf("Could not convert number of readers of writers.\n");
exit(EXIT_FAILURE);
}
if (readers+writers > MAXTHREADS) {
printf("Maximum Number of threads (%i) exceeded.\n", MAXTHREADS);
exit(EXIT_FAILURE);
}
printf("Creating %lu readers and %lu writers.\n", readers, writers);
/* create and reset shared file */
fin = fopen(RESSFILE, "w");
if (fin == NULL) {
fatal("main, fopen for write");
}
fprintf(fin, "%ld\n", INITVALUE);
fclose(fin);
/* reduce stack size of threads to be created */
pthread_attr_init(&tattr);
ret = pthread_attr_setstacksize(&tattr, STACKSIZE);
if (ret != 0) {
printf("pthread_attr_setstacksize() failed, error=%d\n", ret);
}
/* create threads, readers first */
for (c=0; c<readers; c++) {
ret = pthread_create(&son[c], &tattr, &reader, NULL);
if (ret != 0) {
fatal("pthread_create(), reader");
} }
/* create writer threads */
for ( ; c<readers+writers; c++) {
ret = pthread_create(&son[c], &tattr, &writer, NULL);
if (ret != 0) {
fatal("pthread_create, writer");
} }
/* only father reaches this */
/* wait for threads to terminate */
for (c=0; c<readers+writers; c++) { pthread_join(son[c], (void**) NULL);
printf("Thread no.%d, TID %x, returned %d.\n", c, (unsigned int) son[c], ret);
}
/* open shared file and check its final value */
fin = fopen(RESSFILE, "r");
if (fin == NULL) {
nonfatal("main, fopen() for check");
}
ret = fscanf(fin, "%lu", &x); /* returns # of converted items */
if (ret != 1) {
nonfatal ("main, fscanf() while checking");
}
fclose(fin);
if (x == writers*ITERATIONS) {
;
ret = unlink(RESSFILE);
if (ret == -1) { fatal("unlink");
} } else {
printf("Incorrect value %ld in %s .\n", x, RESSFILE);
printf("Leaving file for inspection.\n");
}
exit(EXIT_SUCCESS);
}
Bei der Arbeit mit Pthreads zeigt sich noch ein weiteres Problem. Wenn man die Stackgr¨ oße jedes Threads unver¨ andert l¨ aßt, dann kann man nur eine verh¨ altnism¨ aßig kleine Anzahl Threads (ca. 380) erzeugen, da Speichermangel resultiert.
Listing 3 zeigt ein Programm, mit dessen Hilfe dieses Ph¨ anomen genauer untersucht werden kann. Es setzt mittels pthread_attr_setstacksize die Stackgr¨ oße f¨ ur zu erzeugende Threads auf einen einzugebenden Wert und versucht dann, ein Maxi- mum von Threads zu erzeugen. Bei Eingabe von 0 wird die Stackgr¨ oße nicht ver¨ an- dert, sondern ermittelt und ausgegeben. Achtung: es gibt eine systemweite Untergrenze (PTHREAD_STACK_MIN) f¨ ur die Stackgr¨ oße, deren Unterschreitung durch das System mit einem Fehler quittiert wird.
Listing 4: Programm zur Ermittlung der maximalen Threadanzahl in Abh¨angigkeit von der Stackgr¨oße (howmanythreads.c)
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <limits.h>
void* threadf(void *in) {
printf("o");
pthread_exit(NULL);
}
int main (int argc, char* argv[]) {
int ret;
unsigned long c;
pthread_t tid;
pthread_attr_t tattr;
size_t size;
printf("PTHREAD_STACK_MIN=%d\n", PTHREAD_STACK_MIN);
if (argc != 2) {
printf("Usage: %s <stacksize in bytes>\n", argv[0]);
exit(EXIT_FAILURE);
}
size = strtoul(argv[1], NULL, 10);
if (size == ULONG_MAX) {
printf("Integer conversion of %s failed.\n", argv[1]);
exit(EXIT_FAILURE);
}
pthread_attr_init(&tattr);
if (size != 0) {
ret = pthread_attr_setstacksize(&tattr, size);
if (ret != 0) {
printf("pthread_attr_setstacksize() failed, error=%d\n", ret);
printf("(using default stack size)\n");
}
else {
printf("Set the stacksize for all threads to %ld bytes.\n", size );
} }
else { /* size == 0 */
ret = pthread_attr_getstacksize(&tattr, &size);
if (ret != 0) {
printf("pthread_attr_getstacksize() failed, error=%d\n", ret);
}
printf("Left stacksize at default, which is %d\n", size);
}
for (c=0; ; c++) {
ret = pthread_create(&tid, &tattr, &threadf, NULL);
if (ret != 0) {
printf("\n%ldth pthread_create() failed, error=%d\n", c, ret);
perror("");
exit(EXIT_FAILURE);
} } }
Einige Probel¨ aufe dokumentiert (die Ausgaben der Threads wurden gek¨ urzt):
robge@ipaetz2:~$ ./howmanythreads 0 PTHREAD_STACK_MIN=16384
Left stacksize at default, which is 8388608 ooo...
382th pthread_create() failed, error=11 Resource temporarily unavailable
robge@ipaetz2:~$ ./howmanythreads 1 PTHREAD_STACK_MIN=16384
pthread_attr_setstacksize() failed, error=22 (using default stack size)
ooo...
Resource temporarily unavailable
robge@ipaetz2:~$ ./howmanythreads 1048576 PTHREAD_STACK_MIN=16384
Set the stacksize for all threads to 1048576 bytes.
ooo...
3054th pthread_create() failed, error=11 Resource temporarily unavailable
robge@ipaetz2:~$ ./howmanythreads 16384 PTHREAD_STACK_MIN=16384
Set the stacksize for all threads to 16384 bytes.
ooo...
32756th pthread_create() failed, error=12 Cannot allocate memory
Die Default-Stackgr¨ oße betr¨ agt exakt 8 MiB, damit sind auf der betrachteten Maschi- ne 381 Threads erzeugbar. Reduziert man den Stack auf 1 MiB, so ist man bereits in der Lage, 3053 Threads zu erzeugen. Wird nur das absolute Minimum an Stack reserviert, verf¨ ugt man ¨ uber die gewaltige Zahl von 32755 Threads. Multipliziert man Threadanzahl mit Stackgr¨ oße, erh¨ alt man (etwa) 3 GiB, was der Adressraumgr¨ oße eines Nutzerprozesses in der genutzten Linuxvariante entspricht. Pthreads nutzt die konzeptionellen Limits der Architektur also ziemlich gut aus.
Die letzte L¨ osungsvariante nutzt wiederum System-V-Semaphore, der Z¨ ahler der Lese- prozesse wird als Variable in einem Shared-Memory-Segment angelegt.
Listing 5: L¨osung der Aufgabe 2 mittels Semaphoren und Z¨ahler in Shared- Memory-Segment (rw-problem-shm.c)
/*
implements the ’classic’ solution to the readers writers problem which
prioritizes readers
- uses processes and System V semaphores - the shared resource is a file RESSFILE
- instead of a set containing two semaphores, _two sets_ with one semaphore each are used
- the shared counter ’rc’ is implemented as a shared variable in a shared memory segment
Literature:
[1] P.J. Courtois, F. Heymans, D.L. Parnas: Concurrent Control with
‘‘Readers’’ and ‘‘Writers’’, CACM 10(14)1971, pp. 667-668 [2] Andrew S. Tanenbaum, Modern Operating Systems, 3rd ed.,
Pearson,
2009, pp.165-166
[3] William Stallings, Operating Systems, 6th ed., Prentice Hall, 2007,
pp. 245ff and many more
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <fcntl.h>
#include <limits.h>
#define MAXPROCESSES 20000
#define RESSFILE "zahl.dat"
#define ITERATIONS 100
#define INITVALUE 0UL
#define SEMKEY_MUTEX 0xDEADBEEF
#define SEMKEY_WRI 0xDEAFBABE
/* The calling program must define union semun */
union semun {
int val; /* Value for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
unsigned short *array; /* Array for GETALL, SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO (Linux-specific) */
};
/* some prototypes */
void fatal(char*);
void write_file(void);
void read_file(void);
/*
global resources
*/
int id_mutex, id_wri; /* semids of semaphores */
int *rc; /* reader counter in shm segment */
unsigned long readers, writers;
/*
a fatal error has occured -> exit
*/
void fatal(char *msg) {
assert (msg != NULL);
perror(msg);
exit(EXIT_FAILURE);
/*
a non-fatal error has occured -> warn, but continue
*/
void nonfatal(char *msg) {
assert (msg != NULL);
perror(msg);
return;
}
void P(int semid) {
struct sembuf mysemop;
mysemop.sem_num = 0;
mysemop.sem_op = -1;
mysemop.sem_flg = 0;
if ( semop(semid, &mysemop, 1) == -1 ) { fatal("P()");
} }
void V(int semid) {
struct sembuf mysemop;
mysemop.sem_num = 0;
mysemop.sem_op = 1;
mysemop.sem_flg = 0;
if ( semop(semid, &mysemop, 1) == -1 ) { fatal("V()");
} }
void writer(void) {
int c;
printf("Writer, PID %d, starts\n", getpid());
for (c=0; c < ITERATIONS; c++) { P(id_wri);
write_file();
V(id_wri);
}
printf("Writer, PID %d, finished\n", getpid());
exit(EXIT_SUCCESS);
}
void reader(void) {
int c;
printf("Reader, PID %d, starts\n", getpid());
for (c=0; c < ITERATIONS; c++) { P(id_mutex);
*rc += 1;
if (*rc == 1) { P(id_wri);
}
printf("rc = %d\n", *rc);
V(id_mutex);
read_file();
P(id_mutex);
*rc -= 1;
if (*rc == 0) { V(id_wri);
}
V(id_mutex);
}
printf("Reader, PID %d, finished\n", getpid());
exit(EXIT_SUCCESS);
} /*
must be called atomically by writers
*/
void write_file(void) {
FILE *fin;
int ret;
unsigned long x;
fin = fopen(RESSFILE, "r+");
if (fin == NULL) {
fatal("fopen for write");
}
ret = fscanf(fin, "%lu", &x); /* returns # of converted items */
if (ret != 1) {
fatal("fscanf during write access");
} x+=1;
rewind(fin);
ret = fprintf(fin, "%ld\n", x);
if (ret == -1){
fatal("fprintf");
}
ret = fclose(fin);
if (ret == EOF) { fatal("fclose");
} }
void read_file(void) {
FILE *fin;
int ret;
unsigned long x;
fin = fopen(RESSFILE, "r");
if (fin == NULL) {
fatal ("fopen for reading");
}
ret = fscanf(fin, "%lu", &x); /* returns # of converted items */
if (ret != 1) { fatal ("fscanf");
}
ret = fclose(fin);
if (ret == EOF) {
fatal ("fclose after reading");
} }
int main(int argc, char* argv[]) {
pid_t son[MAXPROCESSES], tmppid;
int c, ret, id_shm;
union semun mysem;
FILE *fin;
unsigned long x;
/* parse command line parameters */
if (argc != 3) {
printf("Usage: %s <readers> <writers>\n", argv[0]);
exit(EXIT_FAILURE);
}
readers = strtoul(argv[1], NULL, 10);
writers = strtoul(argv[2], NULL, 10);
if ((writers == ULONG_MAX) || (readers == ULONG_MAX)) {
printf("Could not convert number of readers of writers.\n");
exit(EXIT_FAILURE);
}
if (readers+writers > MAXPROCESSES) {
printf("Maximum Number of processes (%i) exceeded.\n", MAXPROCESSES);
exit(EXIT_FAILURE);
}
/* create and reset shared file */
fin = fopen(RESSFILE, "w");
if (fin == NULL) {
fatal("main, fopen for write");
}
fprintf(fin, "%lu\n", INITVALUE);
fclose(fin);
/* create and attach an unnamed shared memory segment */
id_shm = shmget(IPC_PRIVATE, 4096, IPC_CREAT|0600);
if (id_shm == -1) { fatal("shmget");
}
rc = (int*) shmat(id_shm, NULL, 0);
if (rc == (void*) -1) { fatal("shmat");
}
/* create named semaphore set ’mutex’ */
id_mutex = semget(SEMKEY_MUTEX, 1, IPC_CREAT|S_IRUSR|S_IWUSR);
if (id_mutex == -1) { fatal("semget mutex");
}
/* create named semaphore set ’wri’ */
id_wri = semget(SEMKEY_WRI, 1, IPC_CREAT|S_IRUSR|S_IWUSR);
if (id_wri == -1) { fatal("semget wri");
}
/* init mutex and wri as "open" */
mysem.val = 1;
ret = semctl(id_mutex, 0, SETVAL, mysem);
if (ret == -1) {
fatal("init mutex");
/* XXX remove semaphore set */
}
ret = semctl(id_wri, 0, SETVAL, mysem);
if (ret == -1) { fatal("init wri");
/* XXX remove semaphore set */
}
/* create processes, readers first */
for (c=0; c<readers; c++) { tmppid = fork();
if (tmppid == -1) {
fatal("fork a reader");
}
if (tmppid != 0) { /* Father */
son[c] = tmppid;
} else {
reader();
assert(0); /* may not be reached, because reader() terminates */
}
/* only father reaches this */
/* create writers processes */
for ( ; c<readers+writers; c++) { tmppid = fork();
if (tmppid == -1) {
fatal("fork a reader");
}
if (tmppid != 0) { /* Father */
son[c] = tmppid;
} else {
writer();
assert(0); /* may not be reached, because writer() terminates */
} }
/* only father reaches this */
/* wait for children to terminate */
for (c=0; c<readers+writers; c++) { tmppid = wait(&ret);
printf("Son with PID %ld returned %d.\n", (long) tmppid, ret);
}
/* detach and delete shared memory segment */
ret = shmdt((void*) rc);
if (ret == -1) { fatal("shmdt");
}
ret = shmctl(id_shm, IPC_RMID, NULL);
if (ret == -1) {
fatal("delete shared memory segment");
}
/* destroy the semaphores */
ret = semctl(id_wri, 0, IPC_RMID);
if (ret == -1) {
fatal("delete semaphore wri");
}
ret = semctl(id_mutex, 0, IPC_RMID);
if (ret == -1) {
fatal("delete semaphore mutex");
}
/* open shared file and check its final value */
fin = fopen(RESSFILE, "r");
if (fin == NULL) {
nonfatal("main, fopen for check");
}
ret = fscanf(fin, "%lu", &x); /* returns # of converted items */
if (ret != 1) {
nonfatal ("main, fscanf while checking");
}
fclose(fin);
if (x == writers*ITERATIONS) {
printf("Correct value %ld in %s reached.\nDeleting.", x, RESSFILE)
;
ret = unlink(RESSFILE);
if (ret == -1) { fatal("unlink");
} } else {
printf("Incorrect value %ld in %s .\n", x, RESSFILE);
printf("Leaving file for inspection.\n");
}
exit(EXIT_SUCCESS);
}