Thursday, September 27, 2012

Producer-Consumer Model with Python

In this article, we investigate the producer-consumer model in Python. The producer-consumer model describes a situation where one or more players (the producers) are responsible for placing data into a buffer, and one or more players (the consumers) are responsible for removing data from that buffer. The chief hurdles to this arrangement are
  1. Ensuring the producer(s) do not add more data than the buffer can hold.
  2. Ensuring the consumers(s) do not take from an empty buffer,
  3. Ensuring the activities of all players are synchronized.

In Python, the combined use of the Queue and threading modules provides an elegant way to approach the problem that abstracts away many of the details. In particular, the Queue.Queue datatype ensures synchronized access to its data, freeing the user from having to use a locking scheme.

The program below is slightly redudant in that we specify the consumer threads as daemons, but then indirectly wait for them to complete. I will explain this is a second. First, let's talk about the purpose of daemon threads.

The Python documentation states that, "The entire Python program exits when no alive non-daemon threads are left. A clearer phrasing is, "The program exits when all non-daemon threads have completed." By default, all threads (including the 'main' thread) are non-daemon. By specifying that the consumers are daemon, we are saying that the program can terminate even though the consumers have not completed. In many programs (think GUIs) this is the desired behavior. We, however, also delay the 'main' thread from exiting until the producers have stopped and the queue is empty, which effectively waits for the consumers to complete.

The example program below lets the user spawn an arbitrary number of producer and consumer threads. Each producer threads generates a random pair of integers and places the pair in a queue. The consumers remove the pairs from the queue and compute their greatest common divisor.

gcd_processor.py


#!/usr/bin/env python

from optparse import OptionParser
import Queue
import random
import sys
import threading
import time

data_queue = Queue.Queue()
stdout_mutex = threading.Lock()

def gcd(a, b):
    while b != 0:
        a, b = b, a % b
    return a

def consumer(idnum):
    while True:
        try:
            data = data_queue.get(block=False)
        except Queue.Empty:
            pass
        else:
             with stdout_mutex:
                print('\tconsumer %d: computed gcd(%d, %d) = %d' % \
                        (idnum, data[0], data[1], gcd(data[0], data[1])))
        time.sleep(0.1)

def producer(idnum, count):
    for i in range(count):
        a, b  = random.randint(1, sys.maxint), random.randint(1, sys.maxint)
        with stdout_mutex:
            print('\tproducer %d: generated (%d, %d)' % (idnum, a, b))
        data_queue.put((a, b))
        time.sleep(0.1)


if __name__ == '__main__':
    usage = '''usage: %prog [options]
   
Demonstrate producer-consumer model.

Each producer generates a sequence of integer pairs.  The integer pair is
placed on a global queue.  Each consumer removes pairs from the queue and
computes the greatest common divisor for that pair.
'''
    version = '%prog 1.0'
    parser = OptionParser(usage=usage, version=version)
    parser.add_option('-c', '--num-consumers', dest='num_consumers', 
            type='int', default=2, 
            help='number of consumer threads')
    parser.add_option('-p', '--num-producers', dest='num_producers',
            type='int', default=4,
            help='number of producer threads')
    parser.add_option('-n', '--num-integer-pairs', dest='num_integer_pairs',
            type='int', default=10,
            help='the number of integer pairs each producer generates')
    (options, args) = parser.parse_args()

    print('[*] beginning main thread')
    for i in range(options.num_consumers):
        t = threading.Thread(target=consumer, args=(i, ))
        t.daemon = True
        t.start()

    joinable = []
    for i in range(options.num_producers):
        t = threading.Thread(target=producer,
                args=(i, options.num_integer_pairs));
        joinable.append(t)
        t.start()

    # wait for all of the producer threads to finish
    for thread in joinable: thread.join()
    with stdout_mutex:
        print('[*] all producer threads finished')

    # wait for all of the consumer threads to finish
    while not data_queue.empty(): pass
    with stdout_mutex:
        print('[*] all consumer threads finished')
    with stdout_mutex:
        print('[*] main thread exited')

Below is a sample run of the program with two consumers and three producers, where each producer produces four integer pairs.

$ ./gcd_processor.py -c 2 -p 3 -n 4
[*] beginning main thread
 producer 0: generated (8650083748717510955, 7042817999880859144)
 producer 1: generated (8411030783605146193, 5649382909553114176)
 producer 2: generated (7598317520065638838, 1583073951150684026)
 consumer 1: computed gcd(8650083748717510955, 7042817999880859144) = 1
 producer 2: generated (9132283067627759945, 5550586898968097650)
 consumer 0: computed gcd(8411030783605146193, 5649382909553114176) = 1
 producer 0: generated (941876034000530679, 2254827834336877783)
 producer 1: generated (3831435553181605295, 6532409849491920585)
 producer 1: generated (89084273097138564, 7529330907292728475)
 producer 0: generated (810343105420840002, 7098464443848996296)
 producer 2: generated (1459983564619067730, 7336893082805612268)
 consumer 0: computed gcd(7598317520065638838, 1583073951150684026) = 2
 consumer 1: computed gcd(9132283067627759945, 5550586898968097650) = 5
 producer 0: generated (5748866744760111634, 4998219244374494451)
 consumer 0: computed gcd(941876034000530679, 2254827834336877783) = 1
 producer 1: generated (8257193796991062027, 218310341014437559)
 consumer 1: computed gcd(3831435553181605295, 6532409849491920585) = 5
 producer 2: generated (4387473705826720899, 3521353578419199591)
 consumer 0: computed gcd(89084273097138564, 7529330907292728475) = 1
 consumer 1: computed gcd(810343105420840002, 7098464443848996296) = 2
[*] all producer threads finished
 consumer 0: computed gcd(1459983564619067730, 7336893082805612268) = 6
 consumer 1: computed gcd(5748866744760111634, 4998219244374494451) = 1
 consumer 1: computed gcd(8257193796991062027, 218310341014437559) = 1
 consumer 0: computed gcd(4387473705826720899, 3521353578419199591) = 9
[*] all consumer threads finished
[*] main thread exited

No comments:

Post a Comment