Threaded programming in Python can be done with a
minimal amount of complexity by combining threads with
Queues. This article explores using threads and
queues together to create simple yet effective patterns
for solving problems that require concurrency.
Introduction
With Python, there is no shortage of options for
concurrency, the standard library includes support for
threading, processes, and asynchronous I/O. In many cases
Python has removed much of the difficulty in using these
various methods of concurrency by creating high-level
modules such as asynchronous, threading, and subprocess.
Outside of the standard library, there are third solutions
such as twisted, stackless, and the processing module, to
name a few. This article focuses exclusively on threading in
Python, using practicle examples. There are many great
resources online that document the threading API, but this
article attempts to provide practicle examples of common
threading usage patterns.
It is important to first define the differences between
processes and threads. Threads are different than processes
in that they share state, memory, and resources. This simple
difference is both a strength and a weakness for threads. On
one hand, threads are lightweight and easy to communicate
with, but on the other hand, they bring up a whole host of
problems including deadlocks, race conditions, and sheer
complexity. Fortunately, due to both the GIL and the queuing
module, threading in Python is much less complex to
implement than in other languages.
Hello Python
threads
To follow along, I assume that you have Python 2.5 or
greater installed, as many examples will be using newer
features of the Python language that only appear in at least
Python2.5. To get started with threads in Python, we will
start with a simple "Hello World" example:
hello_threads_example
import threading
import datetime
class ThreadClass(threading.Thread):
def run(self):
now = datetime.datetime.now()
print "%s says Hello World at time: %s" %
(self.getName(), now)
for i in range(2):
t = ThreadClass()
t.start()
|
If you run this example, you get the following output:
# python hello_threads.py
Thread-1 says Hello World at time: 2008-05-13 13:22:50.252069
Thread-2 says Hello World at time: 2008-05-13 13:22:50.252576
|
Looking at this output, you can see that you received a Hello World
statement from two threads with date stamps. If you look at
the actual code, there are two import statements; one
imports the datetime module and the other imports the
threading module. The class ThreadClass
inherits from threading.Thread and because of
this, you need to define a run method that executes the code
you run inside of the thread. The only thing of importance
to note in the run method is that self.getName()
is a method that will identify the name of the thread.
The last three lines of code actually call the class and
start the threads. If you notice, t.start() is
what actually starts the threads. The threading module was
designed with inheritance in mind, and was actually built on
top of a lower-level thread module. For most situations, it
would be considered a best practice to inherit from
threading.Thread , as it creates a very natural API
for threaded programming.
Using queues with
threads
As I referred to earlier, threading can be complicated
when threads need to share data or resources. The threading
module does provide many synchronization primatives,
including semaphores, condition variables, events, and
locks. While these options exist, it is considered a best
practice to instead concentrate on using queues. Queues are
much easier to deal with, and make threaded programming
considerably safer, as they effectively funnel all access to
a resource to a single thread, and allow a cleaner and more
readible design pattern.
In the next example, you will first create a program that
will serially, or one after the other, grab a URL of a
website, and print out the first 1024 bytes of the page.
This is a classic example of something that could be done
quicker using threads. First, let's use the urllib2
module to grab these pages one at a time, and time the code:
URL fetch serial
import urllib2
import time
hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"]
start = time.time()
#grabs urls of hosts and prints first 1024 bytes of page
for host in hosts:
url = urllib2.urlopen(host)
print url.read(1024)
print "Elapsed Time: %s" % (time.time() - start)
|
When you run this, you get a lot of output to standard out, as the
pages are being partially printed. But you get this at the
finish:
Elapsed Time: 2.40353488922
|
Let's look a little at this code. You import only two modules. First,
the urllib2 module is what does the heavy
lifting and grabs the Web pages. Second, you create a start
time value by calling time.time() , and then
call it again and subtract the initial value to determine
how long the program takes to execute. Finally, in looking
at the speed of the program, the result of "Two and a half
seconds" isn't horrible, but if you had hundreds of Web
pages to retrieve, it would take approximately 50 seconds,
given the current average. Look at how creating a threaded
version speeds things up:
URL fetch threaded
#!/usr/bin/env python
import Queue
import threading
import urllib2
import time
hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"]
queue = Queue.Queue()
class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
#grabs host from queue
host = self.queue.get()
#grabs urls of hosts and prints first 1024 bytes of page
url = urllib2.urlopen(host)
print url.read(1024)
#signals to queue job is done
self.queue.task_done()
start = time.time()
def main():
#spawn a pool of threads, and pass them queue instance
for i in range(5):
t = ThreadUrl(queue)
t.setDaemon(True)
t.start()
#populate queue with data
for host in hosts:
queue.put(host)
#wait on the queue until everything has been processed
queue.join()
main()
print "Elapsed Time: %s" % (time.time() - start)
|
This example has a bit more code to explain, but it isn't that much
more complicated than the first threading example, thanks to
the use of the queuing module. This pattern is a very common
and recommended way to use threads with Python. The steps
are described as follows:
- Create an instance of
Queue.Queue() and
then populate it with data.
- Pass that instance of populated data into the
threading class that you created from inheriting from
threading.Thread .
- Spawn a pool of daemon threads.
- Pull one item out of the queue at a time, and use
that data inside of the thread, the run method, to do
the work.
- After the work is done, send a signal to the queue
with
queue.task_done() that the task has
been completed.
- Join on the queue, which really means to wait until
the queue is empty, and then exit the main program.
Just a note about this pattern: By setting daemonic
threads to true, it allows the main thread, or program, to
exit if only daemonic threads are alive. This creates a
simple way to control the flow of the program, because you
can then join on the queue, or wait until the queue is
empty, before exiting. The exact process is best described
in the documentation for the queue module, as seen in the
Resources:
join()
"Blocks until all items in the queue have been gotten
and processed. The count of unfinished tasks goes up
whenever an item is added to the queue. The count goes
down whenever a consumer thread calls task_done() to
indicate that the item was retrieved and all work on it
is complete. When the count of unfinished tasks drops to
zero, join() unblocks.
Working with
multiple queues
Because the pattern demonstrated above is so effective,
it is relatively simple to extend it by chaining additional
thread pools with queues. In the above example, you simply
printed out the first portion of a Web page. This next
example instead returns the whole Web page that each thread
grabs, and then places it into another queue. Then set up
another pool of threads that join on the second queue, and
then do work on the Web page. The work performed in this
example involves parsing the Web page using a third-party
Python module called Beautiful Soup. Using just a couple of
lines of code, with this module, you will extract the title
tag and print it out for each page you visit.
Multiple
queues data mining websites
import Queue
import threading
import urllib2
import time
from BeautifulSoup import BeautifulSoup
hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"]
queue = Queue.Queue()
out_queue = Queue.Queue()
class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, queue, out_queue):
threading.Thread.__init__(self)
self.queue = queue
self.out_queue = out_queue
def run(self):
while True:
#grabs host from queue
host = self.queue.get()
#grabs urls of hosts and then grabs chunk of webpage
url = urllib2.urlopen(host)
chunk = url.read()
#place chunk into out queue
self.out_queue.put(chunk)
#signals to queue job is done
self.queue.task_done()
class DatamineThread(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, out_queue):
threading.Thread.__init__(self)
self.out_queue = out_queue
def run(self):
while True:
#grabs host from queue
chunk = self.out_queue.get()
#parse the chunk
soup = BeautifulSoup(chunk)
print soup.findAll(['title'])
#signals to queue job is done
self.out_queue.task_done()
start = time.time()
def main():
#spawn a pool of threads, and pass them queue instance
for i in range(5):
t = ThreadUrl(queue, out_queue)
t.setDaemon(True)
t.start()
#populate queue with data
for host in hosts:
queue.put(host)
for i in range(5):
dt = DatamineThread(out_queue)
dt.setDaemon(True)
dt.start()
#wait on the queue until everything has been processed
queue.join()
out_queue.join()
main()
print "Elapsed Time: %s" % (time.time() - start)
|
If you run this version of the script, you get the following output:
# python url_fetch_threaded_part2.py
[<title>Google</title>]
[<title>Yahoo!</title>]
[<title>Apple</title>]
[<title>IBM United States</title>]
[<title>Amazon.com: Online Shopping for Electronics, Apparel,
Computers, Books, DVDs & more</title>]
Elapsed Time: 3.75387597084
|
In looking at the code, you can see that we added another
instance of a queue, and then passed that queue into the
first thread pool class, ThreadURL . Next , you
almost copy the exact same structure for the next thread
pool class, DatamineThread . In the run method
of this class, grab the Web page, chunk, from off of the
queue in each thread, and then process this chunk with
Beautiful Soup. In this case, you use Beautiful Soup to
simply extract the title tags from each page and print them
out. This example could quite easily be turned into
something more useful, as you have the core for a basic
search engine or data mining tool. One idea is to extract
the links from each page using Beautiful Soup and then
follow them.
Summary
This article explored threads in Python and demonstrated
the best practice of using queues to allieviate complexity
and subtle errors, and to promote readable code. While this
basic pattern is relatively simple, it can be used to solve
a wide number of problems by chaining queues and thread
pools together. In the final section, you began to explore
creating a more complex processing pipeline that can serve
as a model for future projects. There are quite a few
excellent resources on both concurrency in general and
threads in the
Resources section.
In closing, it is important to point out that threads are
not the solution to every problem, and that processes can be
quite suitable for many situations. The standard library
subprocess module in particular can be much simpler to deal
with if you only require forking many processes and
listening for a response. Please consult the
Resources
section for the official documentation on this. |
1 comment:
Did you know that you can earn dollars by locking special sections of your blog / site?
Simply join AdWorkMedia and use their content locking plugin.
Post a Comment