Saturday, March 15, 2008

Practical threaded programming with Python

Threaded programming in Python can be done with a minimal amount of complexity by combining threads with Queues. This article explores using threads and queues together to create simple yet effective patterns for solving problems that require concurrency.

Introduction

With Python, there is no shortage of options for concurrency, the standard library includes support for threading, processes, and asynchronous I/O. In many cases Python has removed much of the difficulty in using these various methods of concurrency by creating high-level modules such as asynchronous, threading, and subprocess. Outside of the standard library, there are third solutions such as twisted, stackless, and the processing module, to name a few. This article focuses exclusively on threading in Python, using practicle examples. There are many great resources online that document the threading API, but this article attempts to provide practicle examples of common threading usage patterns.

It is important to first define the differences between processes and threads. Threads are different than processes in that they share state, memory, and resources. This simple difference is both a strength and a weakness for threads. On one hand, threads are lightweight and easy to communicate with, but on the other hand, they bring up a whole host of problems including deadlocks, race conditions, and sheer complexity. Fortunately, due to both the GIL and the queuing module, threading in Python is much less complex to implement than in other languages.

Hello Python threads

To follow along, I assume that you have Python 2.5 or greater installed, as many examples will be using newer features of the Python language that only appear in at least Python2.5. To get started with threads in Python, we will start with a simple "Hello World" example:


hello_threads_example
 
                
        
        import threading
        import datetime
        
        class ThreadClass(threading.Thread):
          def run(self):
            now = datetime.datetime.now()
            print "%s says Hello World at time: %s" % 
            (self.getName(), now)
        
        for i in range(2):
          t = ThreadClass()
          t.start()
      

 

If you run this example, you get the following output:

      # python hello_threads.py 
      Thread-1 says Hello World at time: 2008-05-13 13:22:50.252069
      Thread-2 says Hello World at time: 2008-05-13 13:22:50.252576
      

 

Looking at this output, you can see that you received a Hello World statement from two threads with date stamps. If you look at the actual code, there are two import statements; one imports the datetime module and the other imports the threading module. The class ThreadClass inherits from threading.Thread and because of this, you need to define a run method that executes the code you run inside of the thread. The only thing of importance to note in the run method is that self.getName() is a method that will identify the name of the thread.

The last three lines of code actually call the class and start the threads. If you notice, t.start() is what actually starts the threads. The threading module was designed with inheritance in mind, and was actually built on top of a lower-level thread module. For most situations, it would be considered a best practice to inherit from threading.Thread, as it creates a very natural API for threaded programming.

Using queues with threads

As I referred to earlier, threading can be complicated when threads need to share data or resources. The threading module does provide many synchronization primatives, including semaphores, condition variables, events, and locks. While these options exist, it is considered a best practice to instead concentrate on using queues. Queues are much easier to deal with, and make threaded programming considerably safer, as they effectively funnel all access to a resource to a single thread, and allow a cleaner and more readible design pattern.

In the next example, you will first create a program that will serially, or one after the other, grab a URL of a website, and print out the first 1024 bytes of the page. This is a classic example of something that could be done quicker using threads. First, let's use the urllib2 module to grab these pages one at a time, and time the code:


URL fetch serial
 
                
        import urllib2
        import time
        
        hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"]
        
        start = time.time()
        #grabs urls of hosts and prints first 1024 bytes of page
        for host in hosts:
          url = urllib2.urlopen(host)
          print url.read(1024)
        
        print "Elapsed Time: %s" % (time.time() - start)
      

 

When you run this, you get a lot of output to standard out, as the pages are being partially printed. But you get this at the finish:

        Elapsed Time: 2.40353488922  
        

 

Let's look a little at this code. You import only two modules. First, the urllib2 module is what does the heavy lifting and grabs the Web pages. Second, you create a start time value by calling time.time(), and then call it again and subtract the initial value to determine how long the program takes to execute. Finally, in looking at the speed of the program, the result of "Two and a half seconds" isn't horrible, but if you had hundreds of Web pages to retrieve, it would take approximately 50 seconds, given the current average. Look at how creating a threaded version speeds things up:


URL fetch threaded
 
                
          #!/usr/bin/env python
          import Queue
          import threading
          import urllib2
          import time
          
          hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
          "http://ibm.com", "http://apple.com"]
          
          queue = Queue.Queue()
          
          class ThreadUrl(threading.Thread):
          """Threaded Url Grab"""
            def __init__(self, queue):
              threading.Thread.__init__(self)
              self.queue = queue
          
            def run(self):
              while True:
                #grabs host from queue
                host = self.queue.get()
            
                #grabs urls of hosts and prints first 1024 bytes of page
                url = urllib2.urlopen(host)
                print url.read(1024)
            
                #signals to queue job is done
                self.queue.task_done()
          
          start = time.time()
          def main():
          
            #spawn a pool of threads, and pass them queue instance 
            for i in range(5):
              t = ThreadUrl(queue)
              t.setDaemon(True)
              t.start()
              
           #populate queue with data   
              for host in hosts:
                queue.put(host)
           
           #wait on the queue until everything has been processed     
           queue.join()
          
          main()
          print "Elapsed Time: %s" % (time.time() - start)
      

 

This example has a bit more code to explain, but it isn't that much more complicated than the first threading example, thanks to the use of the queuing module. This pattern is a very common and recommended way to use threads with Python. The steps are described as follows:

  1. Create an instance of Queue.Queue() and then populate it with data.
  2. Pass that instance of populated data into the threading class that you created from inheriting from threading.Thread.
  3. Spawn a pool of daemon threads.
  4. Pull one item out of the queue at a time, and use that data inside of the thread, the run method, to do the work.
  5. After the work is done, send a signal to the queue with queue.task_done() that the task has been completed.
  6. Join on the queue, which really means to wait until the queue is empty, and then exit the main program.

Just a note about this pattern: By setting daemonic threads to true, it allows the main thread, or program, to exit if only daemonic threads are alive. This creates a simple way to control the flow of the program, because you can then join on the queue, or wait until the queue is empty, before exiting. The exact process is best described in the documentation for the queue module, as seen in the Resources:

join()
"Blocks until all items in the queue have been gotten and processed. The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

Working with multiple queues

Because the pattern demonstrated above is so effective, it is relatively simple to extend it by chaining additional thread pools with queues. In the above example, you simply printed out the first portion of a Web page. This next example instead returns the whole Web page that each thread grabs, and then places it into another queue. Then set up another pool of threads that join on the second queue, and then do work on the Web page. The work performed in this example involves parsing the Web page using a third-party Python module called Beautiful Soup. Using just a couple of lines of code, with this module, you will extract the title tag and print it out for each page you visit.


Multiple queues data mining websites
 
                
import Queue
import threading
import urllib2
import time
from BeautifulSoup import BeautifulSoup

hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"]

queue = Queue.Queue()
out_queue = Queue.Queue()

class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, queue, out_queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.out_queue = out_queue

    def run(self):
        while True:
            #grabs host from queue
            host = self.queue.get()

            #grabs urls of hosts and then grabs chunk of webpage
            url = urllib2.urlopen(host)
            chunk = url.read()

            #place chunk into out queue
            self.out_queue.put(chunk)

            #signals to queue job is done
            self.queue.task_done()

class DatamineThread(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, out_queue):
        threading.Thread.__init__(self)
        self.out_queue = out_queue

    def run(self):
        while True:
            #grabs host from queue
            chunk = self.out_queue.get()

            #parse the chunk
            soup = BeautifulSoup(chunk)
            print soup.findAll(['title'])

            #signals to queue job is done
            self.out_queue.task_done()

start = time.time()
def main():

    #spawn a pool of threads, and pass them queue instance
    for i in range(5):
        t = ThreadUrl(queue, out_queue)
        t.setDaemon(True)
        t.start()

    #populate queue with data
    for host in hosts:
        queue.put(host)

    for i in range(5):
        dt = DatamineThread(out_queue)
        dt.setDaemon(True)
        dt.start()


    #wait on the queue until everything has been processed
    queue.join()
    out_queue.join()

main()
print "Elapsed Time: %s" % (time.time() - start)


 

If you run this version of the script, you get the following output:

  # python url_fetch_threaded_part2.py 

  [<title>Google</title>]
  [<title>Yahoo!</title>]
  [<title>Apple</title>]
  [<title>IBM United States</title>]
  [<title>Amazon.com: Online Shopping for Electronics, Apparel,
 Computers, Books, DVDs & more</title>]
  Elapsed Time: 3.75387597084

  

In looking at the code, you can see that we added another instance of a queue, and then passed that queue into the first thread pool class, ThreadURL. Next , you almost copy the exact same structure for the next thread pool class, DatamineThread. In the run method of this class, grab the Web page, chunk, from off of the queue in each thread, and then process this chunk with Beautiful Soup. In this case, you use Beautiful Soup to simply extract the title tags from each page and print them out. This example could quite easily be turned into something more useful, as you have the core for a basic search engine or data mining tool. One idea is to extract the links from each page using Beautiful Soup and then follow them.



 

Summary

This article explored threads in Python and demonstrated the best practice of using queues to allieviate complexity and subtle errors, and to promote readable code. While this basic pattern is relatively simple, it can be used to solve a wide number of problems by chaining queues and thread pools together. In the final section, you began to explore creating a more complex processing pipeline that can serve as a model for future projects. There are quite a few excellent resources on both concurrency in general and threads in the Resources section.

In closing, it is important to point out that threads are not the solution to every problem, and that processes can be quite suitable for many situations. The standard library subprocess module in particular can be much simpler to deal with if you only require forking many processes and listening for a response. Please consult the Resources section for the official documentation on this.

1 comment:

Blogger said...

Did you know that you can earn dollars by locking special sections of your blog / site?
Simply join AdWorkMedia and use their content locking plugin.