Python Concurrency: Porting from a Queue to Multiprocessing

Earlier this week, I wrote a simple post about Python’s Queues and demonstrated how they can be used with a threading pool to download a set of PDFs from the United States Internal Revenue Service’s website. Today I decided to try “porting” that code over to Python’s multiprocessing module. As one of my readers pointed out, Python’s Queues and threads are limited to running on only one core due to the Global Interpreter Lock (GIL) that is a part of Python. The multiprocessing module (and Stackless and several other projects) can run on multiple cores and skirts around the GIL (see the documentation if you’re curious). Anyway, let’s get started.

Creating a multiprocessing Downloader App

Switching from a Queue to using the multiprocessing module is pretty straight-forward. Just for the heck of it, we’ll also use the requests library instead of urllib to download the files. Let’s look at the code:

import multiprocessing
import os
import requests

########################################################################
class MultiProcDownloader(object):
    """
    Downloads urls with Python's multiprocessing module
    """
    
    #----------------------------------------------------------------------
    def __init__(self, urls):
        """ Initialize class with list of urls """
        self.urls = urls
        
    #----------------------------------------------------------------------
    def run(self):
        """
        Download the urls and waits for the processes to finish
        """
        jobs = []
        for url in self.urls:
            process = multiprocessing.Process(target=self.worker, args=(url,))
            jobs.append(process)
            process.start()
        for job in jobs:
            job.join()
            
    #----------------------------------------------------------------------
    def worker(self, url):
        """
        The target method that the process uses tp download the specified url
        """
        fname = os.path.basename(url)
        msg = "Starting download of %s" % fname
        print msg, multiprocessing.current_process().name
        r = requests.get(url)
        with open(fname, "wb") as f:
            f.write(r.content)
        
#----------------------------------------------------------------------
if __name__ == "__main__":
    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
    downloader = MultiProcDownloader(urls)
    downloader.run()

You should get something like this printed out to stdout:


Starting download of f1040a.pdf Process-2
Starting download of f1040.pdf Process-1
Starting download of f1040es.pdf Process-4
Starting download of f1040sb.pdf Process-5
Starting download of f1040ez.pdf Process-3

Let’s break this code down a bit. Right off the bat, you’ll notice that you don’t subclass the multiprocessing module like you do with threading.Thread. Instead, we just create a generic class that just accepts a list of urls. After we instantiate the class, we call its run method which will iterate over the urls and create a process for each of them. It will also add each process to a jobs list. The reason we do that is because we want to call each process’s join method, which as you might expect, waits for the process to finish. If you want, you can pass a number to the join method that is basically a timeout that will cause join to return whether or not the process is actually finished. If you don’t do that, then join will block indefinitely.

If a process hangs or you just get tired of waiting for it, you can call its terminate method to kill it. According to the documentation, there’s a Queue class within the multiprocessing module that you can use in a similar manner to the normal Queue since it’s almost a clone of the original. If you want to dig deeper into all the possibilities of this cool modules, I recommend you check out some of the links below.

Additional Resources

7 thoughts on “Python Concurrency: Porting from a Queue to Multiprocessing”

  1. For i/o bound tasks, threads work just as well as the GIL is released when a thread i blocking on i/o. But to run large number of parallel i/o tasks you probably want to use asynchronous i/o instead, e.g. twistedmatrix.org.

  2. Really? I was under the impression that all threads ran in the main process (i.e. all on one core) so they couldn’t support multi-core CPUs.

  3. Alright. I keep hearing conflicting things on this topic. I apologize if I was mis-leading. I always strive to be as accurate as I can be. Thanks for the explanation!

  4. Exactly. I’m writing a program that uses urllib2 to call an external restful api. With one thread, it takes about 3.5 minutes. Performance tops out around 9 threads, and execution time is 6 seconds (yes, six). 

    Threads have historically been used for concurrency due to blocking I/O, which can be lots of things. Disk reads/writes, waiting on user input, network I/O. 

  5. Pingback: Python threading | kuangmingchen

  6. Pingback: Python Concurrency: An Intro to Threads - The Mouse Vs. The Python

Comments are closed.