Python Concurrency: An Example of a Queue

Posted by Mike on August 1st, 2012 filed in Python

Python comes with a lot of cool concurrency tools builtin, such as threads, Queues, semaphores and multiprocessing. In this article, we’ll spend some time learning how to use Queues. A Queue can be used for first-in-first out or last-in-last-out stack-like implementations if you just use them directly. If you’d like to see that in action, see the Hellman article at the end of this post. We’re going to mix threads in and create a simple file downloader script to demonstrate how Queues work for cases where we want concurrency.

Creating a Downloading Application

This code is based loosely on Hellman’s article and the IBM article as they both show how to download URLs in various ways. This implementation actually downloads files. We’ll use the United States Infernal (oops, I mean Internal) Revenue Service’s tax forms for our example. Let’s pretend we’re a small business owner and we need to download a bunch of these forms for our employees. Here’s some code that will suit our needs:

import os
import Queue
import threading
import urllib2
 
########################################################################
class Downloader(threading.Thread):
    """Threaded File Downloader"""
 
    #----------------------------------------------------------------------
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
 
    #----------------------------------------------------------------------
    def run(self):
        while True:
            # gets the url from the queue
            url = self.queue.get()
 
            # download the file
            self.download_file(url)
 
            # send a signal to the queue that the job is done
            self.queue.task_done()
 
    #----------------------------------------------------------------------
    def download_file(self, url):
        """"""
        handle = urllib2.urlopen(url)
        fname = os.path.basename(url)
        with open(fname, "wb") as f:
            while True:
                chunk = handle.read(1024)
                if not chunk: break
                f.write(chunk)
 
#----------------------------------------------------------------------
def main(urls):
    """
    Run the program
    """
    queue = Queue.Queue()
 
    # create a thread pool and give them a queue
    for i in range(5):
        t = Downloader(queue)
        t.setDaemon(True)
        t.start()
 
    # give the queue some data
    for url in urls:
        queue.put(url)
 
    # wait for the queue to finish
    queue.join()
 
if __name__ == "__main__":
    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
    main(urls)

Let’s break this down a bit. First of all, we need to look at the main function definition to see how this all flows. Here we see that it accepts a list of urls. The main function then creates a queue instance that it passes to 5 daemonized threads. The main difference between daemonized and non-daemon threads is that you have to keep track of non-daemon threads and close them yourself whereas with a daemon thread you basically just set them and forget them and when your app closes, they close too. Next we load up the queue (using its put method) with the urls we passed in. Finally we tell the queue to wait for the threads to do their processing via the join method. In the download class, we have the line “self.queue.get()” which blocks until the queue has something to return. That means the threads just sit idly waiting to pick something up. It also means that for a thread to “get” something from the queue, it must call the queue’s “get” method. Thus as we add or put items in the queue, the thread pool will pick up or “get” items and process them. This is also known as “dequeing”. Once all the items in the queue are processed, the script ends and exits. On my machine, it downloads all 5 documents in under a second.

Further Reading

Print Friendly

  • HJ

    Hi,

    Is there a benefit of calling threading.Thread.__init__(self), instead of super(Downloader, self).__init__()?

  • Kmaid

    Queues are wonderfully simple to use the issue is when you try and achive true concurrency and find this nasty thing called the global interpreter lock and have to start looking at things that work in Stackless or MultiProcessing.

  • driscollis

    I almost used requests instead of urllib2. Maybe I’ll do that for one of my next posts. I like to limit the number of 3rd party packages my readers have to download though.

  • driscollis

    I’m aware of the GIL. I hope to be able to figure out multiprocessing enough to write about it some time.

  • driscollis
  • Pingback: Thought this was cool: Python Concurrency: An Example of a Queue « The Mouse Vs. The Python « CWYAlpha()

  • Fabian Kochem

    Thanks for the article, I have a question though.

    in main(): “# create a thread pool and give them a queue”
    How is the Queue instance aware of the Downloader instances?
    Inside the for-loop you instanciate a Downloader instance per iteration and assign that instance to t, again, per iteration. After the loop, you ignore “t” and operate on the Queue instance instead.
    How does this work? Why is this a thread pool?

  • Fabian Kochem

    Okay that took me a while, my mistake was assuming that Queue “triggers” worker threads, where it’s really the other way round.

    What also led to my confusion was that overwriting “t” in every iteration seems like a newbie mistake, maybe it would be more explicit to create a container for all Downloader instances or create a factory function.

    But nevertheless, great article and thanks for the explanation!

  • Pingback: Visto nel Web – 38 « Ok, panico()

  • http://www.facebook.com/profile.php?id=505561526 Guillermo Siliceo Trueba

    Thanks for this great article.

  • Pingback: Python threading | kuangmingchen()

  • Pingback: Python Concurrency: An Intro to Threads | Hello Linux()

  • http://github.com/MestreLion MestreLion

    – Correct, the queue is not “aware” of the threads, but instead it is aware of the many queue.put(url) in the loop, so q.join() is blocking on *them*. And they are only “released” when items in queue are taken (queue.get()) and processed (queue.task_done()), which is done in the threads.
    – The threads (instantiated as t) are “ignored” because they work on their own: each one is blocked by the availabilty of new items in queue (queue.get()), and once they get an item they block the queue until they signal their queue.work_done().

    The important concept is: queues are independent of threads. They don’t need threads (you could use it as a stack in a single-threaded app). And vice-versa. Also, queues are not bound to (or associated with) any particular thread.

    So why Queues are so often used with threads? Because its objects (instances, methods, data, etc) are *safe* to use by multiple threads. They have built-in Locks and Acquires and whatever is needed to make them thread-safe. And that makes queue.put()/.get() preferable over, say, globaldict[workerid] = mytask / globallist.append(myresult)

  • http://github.com/MestreLion MestreLion

    For non-CPU bound tasks, such as network I/O, the GIL is irrelevant and threads are an awesome way to achieve concurrency. So the Downloader example in the article is a great choice for multithreading,

  • Tom Barrett

    What is the point in having a thread pool if the queue is blocked until a call to queue.task_done()? Surely this means that only one thread is running at a time. Please excuse me if I’m being stupid!