Python 3 – An Intro to asyncio

The asyncio module was added to Python in version 3.4 as a provisional package. What that means is that it is possible that asyncio receives backwards incompatible changes or could even be removed in a future release of Python. According to the documentation asyncio “provides infrastructure for writing single-threaded concurrent code using coroutines, multiplexing I/O access over sockets and other resources, running network clients and servers, and other related primitives“. This chapter is not meant to cover everything you can do with asyncio, however you will learn how to use the module and why it is useful.

If you need something like asyncio in an older version of Python, then you might want to take a look at Twisted or gevent.


Definitions

The asyncio module provides a framework that revolves around the event loop. An event loop basically waits for something to happen and then acts on the event. It is responsible for handling such things as I/O and system events. Asyncio actually has several loop implementations available to it. The module will default to the one most likely to be the most efficient for the operating system it is running under; however you can explicitly choose the event loop if you so desire. An event loop basically says “when event A happens, react with function B”.

Think of a server as it waits for someone to come along and ask for a resource, such as a web page. If the website isn’t very popular, the server will be idle for a long time. But when it does get a hit, then the server needs to react. This reaction is known as event handling. When a user loads the web page, the server will check for and call one or more event handlers. Once those event handlers are done, they need to give control back to the event loop. To do this in Python, asyncio uses coroutines.

A coroutine is a special function that can give up control to its caller without losing its state. A coroutine is a consumer and an extension of a generator. One of their big benefits over threads is that they don’t use very much memory to execute. Note that when you call a coroutine function, it doesn’t actually execute. Instead it will return a coroutine object that you can pass to the event loop to have it executed either immediately or later on.

One other term you will likely run across when you are using the asyncio module is future. A future is basically an object that represents the result of work that hasn’t completed. Your event loop can watch future objects and wait for them to finish. When a future finishes, it is set to done. Asyncio also supports locks and semaphores.

The last piece of information I want to mention is the Task. A Task is a wrapper for a coroutine and a subclass of Future. You can even schedule a Task using the event loop.


async and await

The async and await keywords were added in Python 3.5 to define a native coroutine and make them a distinct type when compared with a generator based coroutine. If you’d like an in-depth description of async and await, you will want to check out PEP 492.

In Python 3.4, you would create a coroutine like this:


# Python 3.4 coroutine example
import asyncio

@asyncio.coroutine
def my_coro():
    yield from func()

This decorator still works in Python 3.5, but the types module received an update in the form of a coroutine function which will now tell you if what you’re interacting with is a native coroutine or not. Starting in Python 3.5, you can use async def to syntactically define a coroutine function. So the function above would end up looking like this:

import asyncio

async def my_coro():
    await func()

When you define a coroutine in this manner, you cannot use yield inside the coroutine function. Instead it must include a return or await statement that are used for returning values to the caller. Note that the await keyword can only be used inside an async def function.

The async / await keywords can be considered an API to be used for asynchronous programming. The asyncio module is just a framework that happens to use async / await for programming asynchronously. There is actually a project called curio that proves this concept as it is a separate implementation of an event loop thats uses async / await underneath the covers.


A Bad Coroutine Example

While it is certainly helpful to have a lot of background information into how all this works, sometimes you just want to see some examples so you can get a feel for the syntax and how to put things together. So with that in mind, let’s start out with a simple example!

A fairly common task that you will want to complete is downloading a file from some location whether that be an internal resource or a file on the Internet. Usually you will want to download more than one file. So let’s create a pair of coroutines that can do that:

import asyncio
import os
import urllib.request

async def download_coroutine(url):
    """
    A coroutine to download the specified url
    """
    request = urllib.request.urlopen(url)
    filename = os.path.basename(url)

    with open(filename, 'wb') as file_handle:
        while True:
            chunk = request.read(1024)
            if not chunk:
                break
            file_handle.write(chunk)
    msg = 'Finished downloading {filename}'.format(filename=filename)
    return msg

async def main(urls):
    """
    Creates a group of coroutines and waits for them to finish
    """
    coroutines = [download_coroutine(url) for url in urls]
    completed, pending = await asyncio.wait(coroutines)
    for item in completed:
        print(item.result())


if __name__ == '__main__':
    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]

    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(urls))
    finally:
        event_loop.close()

In this code, we import the modules that we need and then create our first coroutine using the async syntax. This coroutine is called download_coroutine and it uses Python’s urllib to download whatever URL is passed to it. When it is done, it will return a message that says so.

The other coroutine is our main coroutine. It basically takes a list of one or more URLs and queues them up. We use asyncio’s wait function to wait for the coroutines to finish. Of course, to actually start the coroutines, they need to be added to the event loop. We do that at the very end where we get an event loop and then call its run_until_complete method. You will note that we pass in the main coroutine to the event loop. This starts running the main coroutine which queues up the second coroutine and gets it going. This is known as a chained coroutine.

The problem with this example is that it really isn’t a coroutine at all. The reason is that the download_coroutine function isn’t asynchronous. The problem here is that urllib is not asynchronous and further, I am not using await or yield from either. A better way to do this would be to use the aiohttp package. Let’s look at that next!


A Better Coroutine Example

The aiohttp package is designed for creating asynchronous HTTP clients and servers. You can install it with pip like this:

pip install aiohttp

Once that’s installed, let’s update our code to use aiohttp so that we can download the files:

import aiohttp
import asyncio
import async_timeout
import os


async def download_coroutine(session, url):
    with async_timeout.timeout(10):
        async with session.get(url) as response:
            filename = os.path.basename(url)
            with open(filename, 'wb') as f_handle:
                while True:
                    chunk = await response.content.read(1024)
                    if not chunk:
                        break
                    f_handle.write(chunk)
            return await response.release()


async def main(loop):
    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]

    async with aiohttp.ClientSession(loop=loop) as session:
        tasks = [download_coroutine(session, url) for url in urls]
        await asyncio.gather(*tasks)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))

You will notice here that we import a couple of new items: aiohttp and async_timeout. The latter is a actually one of the aiohttp’s dependencies and allows us to create a timeout context manager. Let’s start at the bottom of the code and work our way up. In the bottom conditional statement, we start our asynchronous event loop and call our main function. In the main function, we create a ClientSession object that we pass on to our download coroutine function for each of the urls we want to download. In the download_coroutine, we create an async_timeout.timeout() context manager that basically creates a timer of X seconds. When the seconds run out, the context manager ends or times out. In this case, the timeout is 10 seconds. Next we call our session’s get() method which gives us a response object. Now we get to the part that is a bit magical. When you use the content attribute of the response object, it returns an instance of aiohttp.StreamReader which allows us to download the file in chunks of whatever size we’d like. As we read the file, we write it out to local disk. Finally we call the response’s release() method, which will finish the response processing.

According to aiohttp’s documentation, because the response object was created in a context manager, it technically calls release() implicitly. But in Python, explicit is usually better and there is a note in the documentation that we shouldn’t rely on the connection just going away, so I believe that it’s better to just release it in this case.

There is one part that is still blocking here and that is the portion of the code that actually writes to disk. While we are writing the file, we are still blocking. There is another library called aiofiles that we could use to try and make the file writing asynchronous too, but I will leave that update to the reader.


Scheduling Calls

You can also schedule calls to regular functions using the asyncio event loop. The first method we’ll look at is call_soon. The call_soon method will basically call your callback or event handler as soon as it can. It works as a FIFO queue, so if some of the callbacks take a while to run, then the others will be delayed until the previous ones have finished. Let’s look at an example:

import asyncio
import functools


def event_handler(loop, stop=False):
    print('Event handler called')
    if stop:
        print('stopping the loop')
        loop.stop()
    
    
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.call_soon(functools.partial(event_handler, loop))
        print('starting event loop')
        loop.call_soon(functools.partial(event_handler, loop, stop=True))
        
        loop.run_forever()
    finally:
        print('closing event loop')
        loop.close() 

The majority of asyncio’s functions do not accept keywords, so we will need the functools module if we need to pass keywords to our event handler. Our regular function will print some text out to stdout whenever it is called. If you happen to set its stop argument to True, it will also stop the event loop.

The first time we call it, we do not stop the loop. The second time we call it, we do stop the loop. The reason we want to stop the loop is that we’ve told it to run_forever, which will put the event loop into an infinite loop. Once the loop is stopped, we can close it. If you run this code, you should see the following output:

starting event loop
Event handler called
Event handler called
stopping the loop
closing event loop

There is a related function called call_soon_threadsafe. As the name implies, it works the same way as call_soon, but it’s thread-safe.

If you want to actually delay a call until some time in the future, you can do so using the call_later function. In this case, we could change our call_soon signature to the following:

loop.call_later(1, event_handler, loop)

This will delay calling our event handler for one second, then it will call it and pass the loop in as its first parameter.

If you want to schedule a specific time in the future, then you will need to grab the loop’s time rather than the computer’s time. You can do so like this:

current_time = loop.time()

Once you have that, then you can just use the call_at function and pass it the time that you want it to call your event handler. So let’s say we want to call our event handler five minutes from now. Here’s how you might do it:

loop.call_at(current_time + 300, event_handler, loop)

In this example, we use the current time that we grabbed and append 300 seconds or five minutes to it. By so doing, we delay calling our event handler for five minutes! Pretty neat!


Tasks

Tasks are a subclass of a Future and a wrapper around a coroutine. They give you the ability to keep track of when they finish processing. Because they are a type of Future, other coroutines can wait for a task and you can also grab the result of a task when it’s done processing. Let’s take a look at a simple example:

import asyncio


async def my_task(seconds):
    """
    A task to do for a number of seconds
    """
    print('This task is taking {} seconds to complete'.format(
        seconds))
    await asyncio.sleep(seconds)
    return 'task finished'
    
    
if __name__ == '__main__':
    my_event_loop = asyncio.get_event_loop()
    try:
        print('task creation started')
        task_obj = my_event_loop.create_task(my_task(seconds=2))
        my_event_loop.run_until_complete(task_obj)
    finally:
        my_event_loop.close()
        
    print("The task's result was: {}".format(task_obj.result()))

Here we create an asynchronous function that accepts the number of seconds it will take for the function to run. This simulates a long running process. Then we create our event loop and then create a task object by calling the event loop object’s create_task function. The create_task function accepts the function that we want to turn into a task. Then we tell the event loop to run until the task completes. At the very end, we get the result of the task since it has finished.

Tasks can also be canceled very easily by using their cancel method. Just call it when you want to end a task. Should a task get canceled when it is waiting for another operation, the task will raise a CancelledError.


Wrapping Up

At this point, you should know enough to start working with the asyncio library on your own. The asyncio library is very powerful and allows you to do a lot of really cool and interesting tasks. The Python documentation is a great place to start learning the asyncio library.

UPDATE: This article was recently translated into Russian here.


Related Reading

32 thoughts on “Python 3 – An Intro to asyncio”

  1. Pingback: Python 3 Concurrency: The concurrent.futures Module – scribdbook

  2. Note: your `download_coroutine` is a co-routine in name only. The function body is entirely synchronous, uncooperative code. The function won’t yield to let other co-routines do work, and thus the downloading takes place entirely sequentially. Not really a good example of a coroutine, I’m afraid. Note that `urllib` is not asynchronous, and even if it was, nowhere does *your use of it* use `await` or `yield from`.

  3. I find it quite common that trivial examples like the download coroutine miss the point of being asyncronous by using a blocking io library. The files will download in sequence, negating any benefit of being asyncronous since the urllib will block.
    I’ve improved the example to use aiohttp, which is an async replacement for urllib to actually download the files in parallel. The key part being the await on retrieving a chunk from the response.

    consider using the asyncio time function for the task example as well for the same reason.

    async def download_coroutine(url):
    “””
    A coroutine to download the specified url
    “””
    async with aiohttp.ClientSession() as session:
    async with session.get(url) as response:
    filename = os.path.basename(url)
    print(“Starting download for file: {}”.format(filename))
    with open(filename, ‘wb’) as file_handle:
    while True:
    chunk = await response.content.read(1024)
    if not chunk:
    break
    file_handle.write(chunk)
    print(“Finished downloading {}”.format(filename))
    msg = ‘Downloaded {filename}’.format(filename=filename)
    return msg

  4. Thanks for the comment. I should have realized that when I was writing it. I will have to come up with a new example and update this article accordingly. Thanks!

  5. Yeah, in retrospect this totally makes sense. I encountered this same issue when working with twisted a couple of weeks ago. I’ll thought about updating the example with aiohttp, but I’d rather create a good example without using a new library. But I do plan to write up an article devoted to just aiohttp. Thanks for the feedback!

  6. ya, looks much better. Thanks for getting to this.
    One minor typo “The aiohttp package is designed for creating synchronous HTTP clients and servers” should probably be asynchronous?

  7. Thanks, much better! I realised that the file I/O (writing the chunks out to a file) is still blocking; I’ve updated my SO answer to use aiofiles, which uses threads to offload blocking write calls to.

  8. I decided to just point out that you could make the code even better by using something like aiofiles. Thanks for the pointers. If you’re interested, I would be happy to have you as a part of my PyDev of the Week series.

  9. You never explain what the timeout context manager is. From what I can gather it waits 10 seconds and if the download isn’t done it cancels it? Another question: if I want to only download a max amount of files at once how would I limit the created coroutines?

  10. Pingback: An Intro to aiohttp – scribdbook

  11. async def my_task(seconds):
    print('This task is taking {} seconds to complete'.format(seconds))
    asyncio.sleep(seconds)
    return 'task finished'

    This doesn’t cause the coroutine to sleep. Replacing the 2nd line of the function with await asyncio.sleep(seconds) makes it sleep.

  12. In your “A Better Coroutine Example”, your main() coro currently does:

    “`
    async with aiohttp.ClientSession(loop=loop) as session:
    for url in urls:
    await download_coroutine(session, url)
    “`

    , which makes it wait for each download_coroutine to complete before scheduling (and awaiting for) the next one. That results in sequential downloads (url1 download will never start prior to url0’s download completed).

    Is that your intent? If so, it doesn’t illustrate well the asyncio feature imo.

    Instead, you could do:

    async with aiohttp.ClientSession(loop=loop) as session:
    tasks = [download_coroutine(session, url) for url in urls]
    await asyncio.gather(*tasks)

    to have it schedule and await for all these downloads tasks. As one will be blocked on IO, the scheduler will start the download of another one, etc..

  13. Pingback: Python3, Using some shared state in 2 async methods - Addshore

  14. Pingback: Bad performance when using Python 2.7 requests lib (http) – PythonCharm

  15. Pingback: RuntimeError: This event loop is already running in python – inneka.com

  16. Pingback: python - Python simple socket de cliente/servidor utilizando asyncio

  17. Pingback: An intro to aiohttp - The Mouse Vs. The Python

  18. Pingback: Python 3 Concurrency - The concurrent.futures Module - The Mouse Vs. The Python

  19. Pingback: RuntimeError: This event loop is already running in python - iZZiSwift

  20. Pingback: Why nginx is faster than Apache, and why you needn't necessarily care — Django deployment

Comments are closed.