dcsimg

More Concurrent Programming Topics

In the past two months we've introduced threads and mutexes, the locking mechanism used to prevent race conditions in threaded applications. In this month's column, we'll look at two types of concurrent programming techniques used to synchronize the execution of code in threads. Hopefully, this discussion will further your perception of locks and how they are used in concurrent programming.

In the past two months we’ve introduced threads and mutexes, the locking mechanism used to prevent race conditions in threaded applications. In this month’s column, we’ll look at two types of concurrent programming techniques used to synchronize the execution of code in threads. Hopefully, this discussion will further your perception of locks and how they are used in concurrent programming.

Recall that a mutex is a lock that only one thread can “hold” at any given time. If a thread tries to acquire the lock while another thread “holds” it, that thread will be blocked until the lock is released. This mechanism is quite helpful when you want only one thread executing certain code at a given time (for example, when a common variable’s value is read and then written).

However, imagine a situation where you have multiple resources that are being used concurrently. This occurs all the time: ten cashiers at Fry’s Electronics, five copiers at Kinko’s, three servers at Ben and Jerry’s and so on. In each of these cases, customers “compete” for the resources available, and whenever all the resources are busy, the remaining unserved customers must wait.

You may be wondering how these examples relate to programming. It’s true that you are unlikely to write a computer program that tells people standing in line that they cannot be served until a cashier, copier, or ice-cream server is ready to serve them.

However, picture a “virtual” line, like when you call a ticket agent to buy a plane ticket. If all the ticket agents are serving other customers when you call, you will be put on hold until “the next available ticket agent” can take your call. Somehow, the resources (ticket agents) must be distributed to the consumers (you and anyone else who wishes to fly) in such a way that all ticket agents are taking calls and no ticket agent is assigned more than one customer at a time.

If you try to solve this problem using only mutexes, you will probably have great difficulty. Luckily for us, there are better ways to solve this type of problem.

Introducing Semaphores

A “semaphore” is a counter that can be used to keep track of resources in threaded applications. It basically represents a non-negative value that can be signaled or waited on. If a thread signals a semaphore, the value of the semaphore increases by one. If a thread waits on a semaphore, two things can happen. If the value of the semaphore is greater than zero, that value is decreased by one, and the call to wait returns. However, if the value of the semaphore is zero, the thread that waited on it is blocked until the value is greater than zero. The value of the semaphore can be thought of as the number of unused resources. When this number reaches zero, a thread that wishes to use a resource must wait until one is available.

To create a semaphore in your threaded program, simply call the function sem_init(). Its prototype, as listed in <semaphore.h>, is:


int sem_init (sem_t* sem, int pshared, unsigned int value);

All semaphore functions return zero on success and non-zero on failure. See the manual page for more information on error codes that can be returned. This function takes in a sem_t as the first argument. This is the semaphore to be initialized. Normally you would declare the semaphore statically and then pass the address of the variable to this function for initialization. The second argument, pshared, should always be zero when programming in Linux. It is for semaphores that can be shared across processes — a feature not supported in Linux. The last argument, value, is the initial value of the semaphore. So, for example, if you had five ticket agents and a semaphore to represent them, you would initialize it with a call like:


#define NUM_AGENTS 5
sem_t agent_semaphore;
sem_init (&agent_semaphore, 0, NUM_AGENTS);

To signal the semaphore, you call sem_post():


int sem_post (sem_t* sem);

This will increase the value of the semaphore by one. To wait on the semaphore, you call sem_wait():


int sem_wait (sem_t* sem);

This will return immediately if the value of the semaphore is greater than zero and block the thread otherwise.

Condition Variables

Sometimes semaphores do not conceptually represent all the types of synchronization you might want to use when programming with threads. For an example, we’ll again turn to the ticket agency. Once the agency has successfully sold the total number of tickets available, someone might have the job of closing the books concerning the venue, changing the status of the venue to “Sold Out,” etc. However, we only want this bookkeeping to occur when all of the tickets are sold. Although we could use a semaphore to do this, it would be clearer conceptually if we had a way to tie the condition (total sold = total available) to a lock that will only wake a thread when this is true. To do this, Linux provides “condition variables,” or “conditions” for short.

A condition variable is tied to a mutex and can be used to block multiple threads until another thread signals the condition. This then wakes all the threads that were waiting on it. Whenever you want to use a condition variable, you must have a mutex for the condition. Unlike semaphores, condition variables have no memory. When one thread signals the condition, the threads that are currently waiting on it are woken up immediately. Any thread that isn’t waiting on it does not receive the signal. By tying a mutex to the condition, you can insure that a race condition does not occur between the thread waiting on the condition and the thread signaling the condition. (Again, see last month’s column if you want to brush up on race conditions.)

The functions to initialize condition variables (as defined in <pthread.h>) are as follows:


int pthread_cond_init
(pthread_cond_t* cond,
pthread_condattr_t*
cond_attr);

Alternately, you may use the static initializer like this:


pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

to get the default attributes for the condition. To wait on a condition, simply call the wait function:


int pthread_cond_wait
(pthread_cond_t*cond,
pthread_mutex_t*mutex);

Note, as I described above, that each condition variable must be used with a mutex. You must acquire the mutex (with the pthread_mutex_lock() function) before calling pthread_cond_wait(). To signal a single thread that is waiting on the condition, use the following signal function:


int pthread_cond_signal
(pthread_cond_t* cond);

This will wake a waiting thread at random and allow it to continue execution. Note that there is no way to control which thread is woken up, so the function you wish to use instead of pthread_cond_ signal() is often the broadcast function:


intpthread_cond_broadcast
(pthread_cond_t* cond);

This function wakes up all threads that are currently waiting on the condition and allows them to continue execution. As is true with the other thread functions, these all return zero on success and non-zero on failure. Make sure to see the manual page for information about failure codes for these functions.

The Ticket Agency Again

Let’s now look at a complete version of the ticket agency example we’ve been using, employing all the techniques we’ve described so far. Using locks, semaphores and condition variables, we will construct a program that can: 1) handle callers and distribute them to ticket agents without allocating two callers to any agent simultaneously; 2) have all ticket agents responding to calls when there are customers waiting to buy tickets; 3) assure that only the total number of tickets available are sold; 4) notify a bookkeeper that will perform certain actions once all the tickets are sold (and not before). Of course, this program will only be the framework (i.e., the synchronization techniques necessary to make sure each of the parts cooperate as desired).

First, let’s start with the callers. The rule for callers is simple to state and splits into two cases. Case 1: If there is at least one ticket agent available when a call comes in, the call should be answered immediately by a ticket agent. Case 2: If there are no ticket agents available when a call comes in, the caller will be placed on hold until a ticket agent is available.

To code this, we will assume that we have a queue data structure with functions to place an integer on a queue and to remove the first integer put on the queue (the functions queue_add() and queue_remove() respectively). Thus, we will simply keep a queue of callers. These callers will be our “resource” in the synchronization sense of the word, and the ticket agents will be the consumer of the resources (callers). Therefore, we will have a semaphore to keep track of the number of callers who are waiting to be helped. This will ensure that two ticket agents don’t attempt to help the same customer.

By using a mutex for the queue of callers, we can verify that when two customers are waiting on one of two agents to help them, each customer will be helped. We will use another semaphore for the communication from the agent to the line signifying that the call is over and should be hung up. Each line will have a semaphore that it will wait on once it queues the caller. The agent that takes that call will signal the corresponding semaphore when the call is over before taking another call.

To handle bookkeeping after all the tickets have been sold, we simply use a condition variable on the condition (total_sold < NUM_TICKETS). While this is true, the bookkeeper just waits. We could just as easily have used a while() loop with no body, but this would take a lot of processor time constantly executing the instructions necessary to test the condition. For now, let’s look at the code in Listing One (pg. 68).

The code in Listing One is a demonstration of the synchronization features described in this and last month’s articles. It is a program that is probably similar to those used when you are on hold waiting for tech support or to buy airline tickets. After studying this code, other possible solutions for this specific problem will probably occur to you. For example, we could use a semaphore for the bookkeeper instead of a condition variable.

As I alluded earlier, in general, a semaphore can be used to achieve the same functionality as a condition variable. In fact, simple mutexes can be used to mimic the behavior of semaphores and condition variables (and vice versa). Given any one of the synchronization techniques I’ve described so far, you can build any of the others. These different mechanisms exist to allow programmers to use the method of synchronization best suited for the problem at hand.

The Right Tool For the Right Job

Semaphores and condition variables are two more tools you can use to synchronize your threaded applications. It’s important to pick the correct tool when writing your programs, not only to make it easier for you to write, but also to aid those who will later need to read and understand your code. Hopefully, by this time, you’ve got a pretty good sense of the options available (and when to use specific options) for writing threaded code.

Enjoy experimenting with concurrent programming, and as always, happy hacking!




Listing One: ticket_agent () with Semaphores and Conditions


#include <pthread.h>
#include <sys/time.h>
#include <unistd.h>
#include <semaphore.h>

#define NUM_TICKETS 100
#define NUM_AGENTS 5
#define NUM_LINES 10

int total_sold = 0;
pthread_mutex_t tickets_sold_lock = PTHREAD_MUTEX_INITIALIZER;

queue line_of_callers;

/* A lock for the queue of callers. */
pthread_mutex_t line_lock = PTHREAD_MUTEX_INITIALIZER;

/* Semaphore representing the current number of callers in line. */
sem_t num_in_line;

/*Semaphores for communication from agents to phone lines to indicate
when a call is finished. Basically used for the agent to tell
the line to “hang up” the call. */
sem_t lines[NUM_LINES];

/* Mutex and condition variable for the bookkeeper to use to
wait for all the tickets to be sold. */
pthread_mutex_t finish_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t finish_cond = PTHREAD_COND_INITIALIZER;

void* call_line (void* data)
{
/* An identifier for which line this is */
int my_num = *((int*)data);
while (1)
{
/* Dummy function that simulates waiting for this line
to ring. */
wait_for_caller ();
/* Get the lock for the line to make sure two call lines
don’t add a caller to the line at the same time. */
pthread_mutex_lock (&line_lock);
queue_add (line_of_callers, my_num);
pthread_mutex_unlock (&line_lock);

/* Signal the semaphore for the total number of callers in
line so that one of the agents will pick up. */
sem_post (&num_in_line);

/* Wait on the signal from the ticket agent so that we know
that the call has been handled and can therefore
handle another call on this line. */
sem_wait (&lines[my_num]);
}

return NULL;
}

void* ticket_agent (void* foo)
{
int currently_helping;
while (1)
{
/* Wait until there is a caller in the line. */
sem_wait (&num_in_line);

/* Get the lock for the line and take the first caller
in line off of the list.*/
pthread_mutex_lock (&line_lock);
currently_helping = queue_remove (line_of_callers);
pthread_mutex_unlock (&line_lock);
printf(“Helping caller on line %d.\n”, currently_helping);
pthread_mutex_lock (&tickets_sold_lock);

if (total_sold < NUM_TICKETS)
{
/* If we still have tickets left, sell the ticket
to the customer. */
if (sell_ticket_to_customer ())
{
/* Add one to the total number sold. If we are
all sold out now, tell the bookkeeper to clean
up through the condition variable. */
pthread_mutex_lock (&finish_mutex);
total_sold++;
if (total_sold == NUM_TICKETS)
pthread_cond_broadcast (&finish_cond);
pthread_mutex_unlock (&finish_mutex);
}
}
else
{
/* Otherwise, tell them we are sold out. */
tell_customer_sold_out ();
}
pthread_mutex_unlock (&tickets_sold_lock);

/* Signal the semaphore telling the line that this
call is over. */
sem_post (&lines[currently_helping]);
printf (“Done with caller on line %d.\n”, currently_helping);
}

return NULL;
}

void* bookkeeper (void* foo)
{
/* Use the condition wait to make sure that we do not do any
bookkeeping until the total_sold is equal to the total
number of tickets. */
pthread_mutex_lock (&finish_mutex);
while (total_sold < NUM_TICKETS)
pthread_cond_wait (&finish_cond, &finish_mutex);

do_bookkeeping ();

pthread_mutex_unlock (&finish_mutex);

return NULL;
}

int main ()
{
pthread_t agents[NUM_AGENTS];
pthread_t phone_lines[NUM_LINES];
pthread_t cleanup;
void* return_val;
int i;

srand (time (0));

/* Initialize the semaphores. */
sem_init (&num_in_line, 0, 0);
for (i = 0; i < NUM_LINES; i++)
sem_init (&(lines[i]), 0, 0);

/* Create the ticket agents. */
for (i = 0; i < NUM_AGENTS; i++)
pthread_create (&agents[i], NULL, ticket_agent, NULL);

/* Create the lines for callers. */
for (i = 0; i < NUM_LINES; i++)
pthread_create (&phone_lines[i], NULL, call_line, &i);

/* Create the bookkeeper. */
pthread_create (&cleanup, NULL, bookkeeper, NULL);
/* We only need to wait on the bookkeeper because we know
that once the bookkeeper is done, we are done with
the program and can exit. The other threads will loop
forever handling calls. */
pthread_join (cleanup, &return_val);

printf (“Sold all the tickets: %d\n”, total_sold);
}



Benjamin Chelf is an author and engineer at CodeSourcery, LLC. He can be reached at chelf@codesourcery.com.

Fatal error: Call to undefined function aa_author_bios() in /opt/apache/dms/b2b/linux-mag.com/site/www/htdocs/wp-content/themes/linuxmag/single.php on line 62