Example 6-1 shows the use of the DECthreads pthread routines in a C program that performs a prime number search. The program finds a specified number of prime numbers, then sorts and displays these numbers. Several threads participate in the search: each thread takes a number (the next one to be checked), checks if it is a prime, records it if it is prime, and then takes another number-and so on.
This program shows the work crew model of programming (see Section 1.4.2.) The workers (threads) increment a number (current_num) to get their next work assignment. As a whole, the worker threads are responsible for finding a specified number of prime numbers, at which point their work is completed.
The number of workers to be used and the requested number of prime numbers to be found are defined constants. A macro is used to check for error status and to print a given string and the associated error value. Data to be accessed by all threads (mutexes, condition variables, and so forth) are declared as global items.
Worker threads execute the prime search routine, which begins by synchronizing with the parent thread utilizing a predicate and a condition variable. Enclose a condition wait in a predicate loop to prevent a thread from continuing if it is wrongly signaled or broadcasted. The lock associated with the condition variable must be held by the thread during the call to condition wait. The lock is released within the call and acquired again upon being signaled or broadcasted. The same mutex must be used for all operations performed on a specific condition variable.
After the parent sets the predicate and broadcasts, the workers begin finding prime numbers until canceled by a fellow worker who has found the last requested prime number. Upon each iteration the workers increment the current number to be worked on and take the new value as their work item. A mutex is locked and unlocked around getting the next work item, in order to ensure that no two threads are working on the same item. This type of locking protocol should be performed on all global data to ensure its integrity.
Each worker thread then determines if its current work item (number) is prime by trying to divide numbers into it. If the number proves to be nondivisible, it is put on the list of primes. Cancels are turned off while working with the list of primes to better control any cancels that do occur. The list of primes and its current count are protected by locks, which also protect the cancellation process of all other worker threads upon finding the last requested prime. While still under the prime list lock, the current worker checks to see if it has found the last requested prime, and, if so, unsets a predicate and cancels all other worker threads. Cancels are then reenabled. The canceling thread should fall out of the work loop as a result of the predicate that it unsets.
The parent thread's flow of execution is as follows:
Setting up the environment means initializing mutexes and the one condition variable used in the example.
Creation of worker threads is straightforward and utilizes the default attributes. Worker threads immediately wait on a condition variable.
As the parent joins each of the returning worker threads, it receives an exit value from them that indicates whether a thread exited normally or not. In this case the exit values on all but one of the worker threads should be -1, indicating that they were canceled.
The following pthread routines are used in Example 6-1:
/* * * DECthreads example program conducting a prime number search * */ #include#include #include /* * Constants used by the example. */ #define workers 5 /* Threads to perform prime check */ #define request 110 /* Number of primes to find */ /* * Macros */ #define check(status,string) \ if (status != 0) perror (string) /* * Global data */ pthread_mutex_t prime_list; /* Mutex for use in accessing the prime */ pthread_mutex_t current_mutex; /* Mutex associated with current number */ pthread_mutex_t cond_mutex; /* Mutex used for ensuring CV integrity */ pthread_cond_t cond_var; /* Condition variable for thread start */ int current_num= -1;/* Next number to be checked, start odd */ int thread_hold=1; /* Number associated with condition state */ int count=0; /* Count of prime numbers - index to primes */ int primes[request];/* Store prime numbers - synchronize access */ pthread_t threads[workers]; /* Array of worker threads */ int orig_c_s; /* Original cancel state */ int work_c_s; /* Working cancel state */ int s; /* Cancel routine status */ static void unlock_cond (void *arg) { int status; /* Hold status from pthread calls */ status = pthread_mutex_unlock (&cond_mutex); check(status,"3:Mutex_unlock bad status\n"); } /* * Worker thread routine. * * Worker threads start with this routine, which begins with a condition * wait designed to synchronize the workers and the parent. Each worker * thread then takes a turn taking a number for which it will determine * whether or not it is prime. * */ static void * prime_search (void *arg) { div_t div_results; /* DIV results: quot and rem */ int numerator; /* Used for determining primeness */ int denominator; /* Used for determining primeness */ int cut_off; /* Number being checked div 2 */ int notifiee; /* Used during a cancellation */ int prime; /* Flag used to indicate primeness */ int my_number; /* Worker thread identifier */ int status; /* Hold status from pthread calls */ int not_done=1; /* Work loop predicate */ my_number = (int)arg; /* * Synchronize threads and the parent using a condition variable, of * which the predicate (thread_hold) will be set by the parent. */ status = pthread_mutex_lock (&cond_mutex); check(status,"1:Mutex_lock bad status\n"); pthread_cleanup_push (unlock_cond, NULL); while (thread_hold) { status = pthread_cond_wait (&cond_var, &cond_mutex); check(status,"2:Cond_wait bad status\n"); } pthread_cleanup_pop (1); /* * Perform checks on ever larger integers until the requested * number of primes is found. */ while (not_done) { /* Cancellation point */ pthread_testcancel (); /* Get next integer to be checked */ status = pthread_mutex_lock (¤t_mutex); check(status,"4:Mutex_lock bad status\n"); current_num = current_num + 2; /* Skip even numbers */ numerator = current_num; status = pthread_mutex_unlock (¤t_mutex); check(status,"5:Mutex_unlock bad status\n"); /* Only need to divide in half if number to verify not prime */ cut_off = numerator/2 + 1; prime = 1; /* Check for prime; exit if something evenly divides */ for (denominator = 2; ((denominator < cut_off) && (prime)); denominator++) { prime = numerator % denominator; } if (prime != 0) { /* Explicitly turn off all cancels */ s = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &orig_c_s); /* * Lock a mutex and add this prime number to the list. Also, * if this fulfills the request, cancel all other threads. */ status = pthread_mutex_lock (&prime_list); check(status,"6:Mutex_lock bad status\n"); if (count < request) { primes[count] = numerator; count++; } else if (count == request) { not_done = 0; count++; for (notifiee = 0; notifiee < workers; notifiee++) { if (notifiee != my_number) { status = pthread_cancel ( threads[notifiee] ); check(status,"12:Cancel bad status\n"); } } } status = pthread_mutex_unlock (&prime_list); check(status,"13:Mutex_unlock bad status\n"); /* Reenable cancels */ s = pthread_setcancelstate(orig_c_s, &work_c_s); } pthread_testcancel (); } pthread_exit ((void *)my_number); return (void *)0; } main() { int worker_num; /* Counter used when indexing workers */ void *exit_value; /* Individual worker's return status */ int list; /* Used to print list of found primes */ int status; /* Hold status from pthread calls */ int index1; /* Used in sorting prime numbers */ int index2; /* Used in sorting prime numbers */ int temp; /* Used in a swap; part of sort */ int not_done; /* Indicates swap made in sort */ /* * Create mutexes */ status = pthread_mutex_init (&prime_list, NULL); check(status,"7:Mutex_init bad status\n"); status = pthread_mutex_init (&cond_mutex, NULL); check(status,"8:Mutex_init bad status\n"); status = pthread_mutex_init (¤t_mutex, NULL); check(status,"9:Mutex_init bad status\n"); /* * Create condition variable */ status = pthread_cond_init (&cond_var, NULL); check(status,"10:Cond_init bad status\n"); /* * Create the worker threads. */ for (worker_num = 0; worker_num < workers; worker_num++) { status = pthread_create ( &threads[worker_num], NULL, prime_search, (void *)worker_num); check(status,"11:Pthread_create bad status\n"); } /* * Set the predicate thread_hold to zero, and broadcast on the * condition variable that the worker threads may proceed. */ status = pthread_mutex_lock (&cond_mutex); check(status,"12:Mutex_lock bad status\n"); thread_hold = 0; status = pthread_cond_broadcast (&cond_var); status = pthread_mutex_unlock (&cond_mutex); check(status,"13:Mutex_unlock bad status\n"); /* * Join each of the worker threads in order to obtain their * summation totals, and to ensure each has completed * successfully. * * Mark thread storage free to be reclaimed upon termination by * detaching it. */ for (worker_num = 0; worker_num < workers; worker_num++) { status = pthread_join ( threads[worker_num], &exit_value ); check(status,"14:Pthread_join bad status\n"); if (exit_value == (void *)worker_num) printf("Thread terminated normally\n"); /* * Upon normal termination the exit_value is equivalent to * worker_num. */ status = pthread_detach ( threads[worker_num] ); check(status,"15:Pthread_detach bad status\n"); } /* * Take the list of prime numbers found by the worker threads and * sort them from lowest value to highest. The worker threads work * concurrently; there is no guarantee that the prime numbers * will be found in order. Therefore, a sort is performed. */ not_done = 1; for (index1 = 1; ((index1 < request) && (not_done)); index1++) { for (index2 = 0; index2 < index1; index2++) { if (primes[index1] < primes[index2]) { temp = primes[index2]; primes[index2] = primes[index1]; primes[index1] = temp; not_done = 0; } } } /* * Print out the list of prime numbers that the worker threads * found. */ printf ("The list of %d primes follows:\n", request); printf("%d",primes[0]); for (list = 1; list < request; list++) { printf (",\t%d", primes[list]); } printf ("\n"); }