Using Asyncio and Batch APIs for Remote Services

Introduction to Batch APIs

In modern Python applications, it's common to access remote API using REST or other web-based technologies. Batch APIs are capable of processing multiple requests with a single call. You can use batch APIs to reduce the number of network calls to the remote service. This is ideal when you have to make lots of calls to a remote service that can be batched into a single request.

Suppose you had a REST API that returned the current price of a stock. Using a simple API that takes a single stock identifier and returns the current price, if you needed to get the price of a thousand stocks you would need to make a thousand API calls. A batch API that offered the same functionality would instead take a set of stock identifiers in the request and return the current price for all of the requested identifiers. Using the batch API you would be able to fetch all the prices you need in a single request. This reduces the network overhead thus reducing the latency of your application. It also potentially reduces the load on the remote server.

Batch APIs reduce network round trips

In this article, you will learn how to use a batching pattern with Python's asyncio package to batch many individual function calls into a smaller number of requests.

Motivation: Async Python Functions in Excel

This article came about from a user of the Python Excel add-in PyXLL asking a question about how to use a batch API to streamline their Excel spreadsheet.

PyXLL embeds Python into Excel and it enables calling Python functions directly in Excel spreadsheets. Each time a cell calculates using a Python function it calls that Python function. In this case, the function was an async function that makes a request to a REST server.

A sheet with thousands of cells making individual requests to a REST API was taking too long. The solution was to use the batching pattern!

Batching requests from Python in Excel

Background: AsyncIO and Concurrency

When making multiple requests to a remote server often you don't want to send a request and wait for a response before sending the next request. Usually sending multiple requests in parallel (at the same time) and waiting for all of the responses is much faster. You can achieve that in Python using multithreading or async programming. This section gives an overview of multi-threading and async programming. You will also look at why you would choose one over the other.

Multithreading

Multithreading is a way to perform multiple tasks concurrently. In the threading model, you start multiple threads and each thread executes its code at the same time. If your problem is CPU bound, breaking it down into tasks to be run in parallel using multithreading can help. A program is said to be CPU bound when the main performance bottleneck is the CPU processing time.

There are some subtleties to threading specific to Python that you won't go into in this article, but in theory, that's basically how it works!

The computer's operating system manages all threads and it ensures that each gets a share of CPU time. This adds complexity as each context switch takes time that could be spent doing something else. That complexity scales with the number of threads. When the bottleneck is waiting on IO (network requests, for example) then running multiple threads for each request and waiting for a network response is far from ideal. It also doesn't scale to thousands of requests. That's where async programming comes in.

Asynchronous Programming with asyncio

Asynchronous programming in Python is a different model of concurrency that doesn't use multiple threads. Instead, everything runs on a single thread and Python manages switching between active tasks. It is perfect for programs that use a lot of network requests or other IO-bound tasks like disk or database access.

An event loop manages a set of running tasks. When a task is waiting for something like a network request to complete it "awaits". While a task is awaiting the event loop can schedule other tasks to run. This allows another task to send another network request and then await, allowing another task to run and so on and so on. When the network request is ready the event loop can resume the task. This allows us to have multiple simultaneous requests in flight without the overhead of one thread per request.

The Python keyword "async" designates a function as an asynchronous function to be run on the event loop. The keyword "await" yields to the event loop and wait for another async function or task to complete. The Python package "asyncio" provides the primitives needed for managing asynchronous tasks.

Advantages of a Batch API

Above you learned you can make multiple requests concurrently. This can be much faster than waiting for each request to be returned before sending the next request. If you can send all the requests you need at the same time, why do you need a batch API?

Sending multiple requests requires more network traffic than sending a single request. If you can request all the data you need using a single request that is more efficient from a data transfer point of view.

It can also have other benefits. For example, if the remote server can reduce the amount of work it needs to do by fetching everything in one go then the time it needs to service that one batch request can actually be less than the total time needed to service the equivalent individual requests.

The Batching Pattern in Python

Now you understand what a Batch API is, and that you can make multiple requests concurrently using asynchronous programming in Python, what is the Batching Pattern and why do you need it?

Put simply, the Batching Pattern collects together multiple requests into a single request and dispatches that single request in one go. In the rest of this article, you will see how you can use this to turn an implementation that uses lots of individual requests into one that batches requests together to make fewer calls to a remote server.

Example: Fetching Locations for Street Addresses

You'll use fetching locations of street addresses as your example. To do this you can use the REST API from https://www.geoapify.com/. There's a free tier you can sign up for testing, and it supports fetching multiple locations in bulk. To use the code below you will need to sign up and get an API key.

Here's the first attempt at some code to fetch locations for a number of street addresses:

# Import the modules you're going to use.
# You may need to 'pip install aiohttp'
from urllib.parse import urlencode
import asyncio
import aiohttp
import json

# Test data
STREET_ADDRESSES = [
    "1600 Pennsylvania Avenue, Washington DC, USA",
    "11 Wall Street New York, NY",
    "350 Fifth Avenue New York, NY 10118",
    "221B Baker St, London, England",
    "Tour Eiffel Champ de Mars, Paris",
    "4059 Mt Lee Dr.Hollywood, CA 90068",
    "Buckingham Palace, London, England",
    "Statue of Liberty, Liberty Island New York, NY 10004",
    "Manger Square, Bethlehem, West Bank",
    "2 Macquarie Street, Sydney"
]

# Constants for accessing the Geoapify API
GEOCODING_API = "https://api.geoapify.com/v1/geocode/search"
YOUR_API_KEY = "xxxx-xxxx-xxxx-xxxx"

async def get_location(address):
    """Return (latitude, longitude) from an address."""
    # Construct the URL to do the lookup for a single address
    query_string = urlencode({
        "apiKey": YOUR_API_KEY,
        "text": address,
        "limit": 1,
        "format": "json"
    })
    url = f"{GEOCODING_API}?{query_string}"

    # Make the request to the API
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.read()

            # Read the json string and return the latitude and longitude
            # from the first result (there will only be one)
            results = json.loads(data.decode())["results"]
            return results[0]["lat"], results[0]["lon"]

async def main():
    # Print the city for each IP address
    tasks = []
    for address in STREET_ADDRESSES:
        location = await get_location(address)
        print(f"{address} -> {location}")

# Because it's an async function you need to run it using the asyncio event loop
loop = asyncio.new_event_loop()
loop.run_until_complete(main())

You might have noticed that the above code is still calling the API for each address sequentially. Despite using async, the for loop is currently waiting for each request to complete before moving to the next address. To fix that you can use the asyncio function "gather". By gathering the tasks together and awaiting them all at the end you don't need to await them individually.

Your updated main function now looks like this:

async def main():
   # Get the location for each address
   tasks = []
   for address in STREET_ADDRESSES:
       tasks.append(get_location(address))

   # Wait for all tasks to complete
   locations = await asyncio.gather(*tasks)

   # Print them all once all requests have completed
   for address, location in zip(STREET_ADDRESSES, locations):
       print(f"{address} -> {location}")

You are still sending multiple requests to the server. Next, you will see how the Batching Pattern batches these requests together to reduce the number of requests, without modifying your main function.

Example: Fetching Multiple Locations using a Batch API

Using a batch API you can submit multiple requests in one go. If the server handling the request is able to process a batch more efficiently than individual requests it can be much faster to use a batch request when dealing with more than a handful of queries.

You'll use the batch version of the geocoding API used above. It's a little more complicated. Instead of submitting a single address as part of the URL you have to make a POST request. As it can take a little while to process a batch instead of returning the results immediately the server will first respond with a request id which you then query to check if the results are ready or not. This is a common pattern used when implementing a batch API.

The following function queries the API for the locations of a list of addresses. It does this using a single request to the batch API.

# Constants for accessing the Geoapify batch API
GEOCODING_BATCH_API = "https://api.geoapify.com/v1/batch/geocode/search"
YOUR_API_KEY = "xxxx-xxxx-xxxx-xxxx"

async def get_locations(addresses):
    """Return a dictionary of address -> (lat, lon)."""
    # Construct the URL to do the batch request
    query_string = urlencode({"apiKey": YOUR_API_KEY})
    url = f"{GEOCODING_BATCH_API}?{query_string}"

    # Build the JSON payload for the batch POST request
    data = json.dumps(addresses)

    # And use Content-Type: application/json in the headers
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json"
    }

    # Make the POST request to the API
    async with aiohttp.ClientSession() as session:
        async with session.post(url, data=data, headers=headers) as response:
            response_json = await response.read()
            response_data = json.loads(response_json)

    # The API can return a dict with a pending status if it needs more
    # time to complete. Poll the API until the result is ready.
    while isinstance(response_data, dict) \
    and response_data.get("status") == "pending":
        # Wait a bit before calling the API
        await asyncio.sleep(0.1)

        # Query the result to see if it's ready yet
        request_id = response_data.get("id")
        async with aiohttp.ClientSession() as session:
            async with session.get(url + f"&id={request_id}") as response:
                response_json = await response.read()
                response_data = json.loads(response_json)

    # Gather the results into a dictionary of address -> (lat, lon)
    locations = {}
    for result in response_data:
        address = result["query"]["text"]
        coords = result["lat"], result["lon"]
        locations[address] = coords

    return locations

Putting it Together: The Batching Pattern

Now you have a function that can call a batch API to find the location of a list of addresses in bulk. Your next task is to refactor "get_location" so that it can take advantage of the batch API without having to change your "main" function.

Why not change the "main" function? In this simple illustration, it would be trivial to change the main function to call get_locations. In real-world projects that sort of refactoring is often not so simple. Other times it's not even desirable to change the inputs a function takes and you often want to shield the end user of the function from the implementation details.

To come back to the original question that inspired this post, that was about calling Python functions from Excel using the Excel add-in PyXLL. In that case, the end user is an Excel user who may not know anything about Python. Having a single function to take one input and return one output fits their expectations as an Excel user. Exposing them to the concept of batches would confuse matters unnecessarily. It would also mean they would have had to structure their spreadsheet to call it in an efficient way. Handling the batching of requests behind the scenes while keeping the interface the end user sees is definitely an advantage in this case.

How it works

In pseudo-code, what we want to write is along these lines:

async def get_location(address)
    1. Put the address on a queue of requests.
    2. Start a background task that:
        i. Waits a short time for other requests to be enqueued.
        ii. Processes all queued requests as a batch.
        iii. Notifies the waiting 'get_location' functions.
    3. Wait for the result and return it.

Batching Requests

You can achieve this in Python using asyncio. You "get_location()" function can start a background task to process any queued requests. It will await until that background task has processed the batch containing your request and then return it. The background task should only be started once and so you will need to check if it's already running before starting it. If "get_location" is called multiple times, because it's an async function, it can run while the others are awaiting. Each subsequent call will add a request to the current queue.

To return the result back from the background task to the awaiting get_location functions you will use the asyncio primitive "Future". A Future is an awaitable object that when awaited on will block until a result is set.

Your "get_location()" function re-written to batch up requests, using a future to pass the result back, looks like this:

# State for batching requests
ADDRESSES_BATCH = []
BATCH_LOOP_RUNNING = False

async def get_location(address):
    """Return (latitude, longitude) from an address."""
    global BATCH_LOOP_RUNNING

    # Create a Future that will be set with the location once the
    # request has completed.
    loop = asyncio.get_event_loop()
    future = loop.create_future()

    # Add the ip address and future to the batch
    ADDRESSES_BATCH.append((address, future))

    # Start 'process_batches' running on the asyncio event loop if it's
    # not already running.
    # We've not written 'process_batches_loop' yet!
    if not BATCH_LOOP_RUNNING:
        BATCH_LOOP_RUNNING = True
        asyncio.create_task(process_batches_loop())

    # Wait for the batch your address is in to return
    await future

    # And return the result
    return future.result()

The code above creates an asyncio.Future object and adds that and the address to a list which will be processed as a batch. If the loop to process the batches is not running it starts it using "asyncio.create_task". The function "asyncio.create_task" schedules your "processes_batched_loop" on the asyncio event loop to be called when the other running tasks have awaited. You've not yet defined your function "process_batches_loop" but you will do that next. You await on the future, allowing other tasks running on the asyncio event loop to run, and once the result has been set you return it.

Processing the Batch

The "process_batches_loop" function waits a short time to allow other functions to add requests to the "ADDRESSES_BATCH" list. It then submits all the queued requests as a single call to the REST API. Once the results are returned from the REST API it unpacks the results and sets the results on the futures, allowing each awaiting "get_location" function to complete.

async def process_batches_loop():
    global ADDRESSES_BATCH, BATCH_LOOP_RUNNING

    # Loop while BATCH_LOOP_RUNNING is True
    while BATCH_LOOP_RUNNING:
        # Wait for more to be added to the batch
        await asyncio.sleep(0.1)

        # If nothing has been added to the batch then continue
        # to the start of the loop as there's nothing to do.
        if not ADDRESSES_BATCH:
            continue

        # Get the current items from the batch and reset the batch
        batch = ADDRESSES_BATCH
        ADDRESSES_BATCH = []

        # Get the locations of the current batch
        addresses = [address for (address, future) in batch]
        locations = await get_locations(addresses)

        # Set the results on the futures from this batch.
        # This allows each awaiting 'get_location' function to continue.
        for address, future in batch:
            coords = locations.get(address)
            future.set_result(coords)

You have now achieved the original goal. You have a function "get_location" that looks to the caller like your original function. It takes a single address and returns a single location. Behind the scenes, it batches these individual requests together and submits them to a batch API. Batch APIs can offer better performance compared with APIs that only process individual requests and now your function can take advantage of that, without any change to how the function is called.

The time spent waiting for requests to be added to the batch should be tuned to match how the function is being used. If the function is likely to be called many times at almost the same time, for example, multiple cells being calculated at the same time in Excel, then a short delay can be used. In other situations, for example, if the call results from some user input that might take a few seconds, then a longer delay would be necessary. Logging the time each item is added to the batch along with the time each batch is processed would help us understand the optimal time to wait.

Room for Improvement

There is plenty of room for improvement in the code presented here. I hope this has given you some ideas to take this forward and use in your own projects! The code was written in a relatively simple way to try to make the intention behind it clear, but before you use this in a real-world application there are some things you will need to consider.

  1. Error checking. This is probably the most important thing to add. What happens if the loop processing batches fail? Your code should handle any errors that might occur gracefully, or at the very least log them so that you can track what's happened.
  2. Unnecessary looping. The loop to process batches as written continues looping even if there is nothing to do. You could modify this to await on an "asyncio.Event" object until you've queued at least one item. Alternatively, you could exit the loop when there are no more items to process and restart it when needed.
  3. Stopping the loop when your program ends. The loop will continue looping as long as BATCH_LOOP_RUNNING is True. When your program ends you should think about how to gracefully end the loop. This could be simply setting BATCH_LOOP_RUNNING to False and then awaiting on the task for it to complete. The function "asyncio.create_task" returns a Task object which you could store as a global variable.

Wrapping Up

In this article,  you have learned what a batch API is and why it can be advantageous to use one. You looked at concurrency in Python, comparing multithreading to asynchronous programming. Finally, you demonstrated how you can use a batching pattern to turn a function that processes individual requests into one that uses a batch API.

The complete code used in this article is available here https://gist.github.com/tonyroberts/a539885e12892dc43935860f75cdfe7e.

REST APIs are one example of a batch API. You can apply the same technique to database queries, or any other type of function where it's more efficient to queue data in bulk.

Batching requests behind the scenes and hiding the details from the user-facing function or API is useful when you want to keep things simple for the end user of the function. It can also be a way to retrofit a batch API to an existing code base where refactoring would be difficult.

The motivation in the use case this article was based on was calling a Python function from Excel, without exposing the Excel user to the details of managing batch calls. The user calls a simple function to perform an individual request. If they build a sheet that makes multiple requests across different cells then this solution automatically batches everything together behind the scenes. The Excel add-in PyXLL enables integrating Python into Excel, making it possible to call Python functions as Excel worksheet functions.

For more information about the PyXLL add-in please visit https://www.pyxll.com.

Copyright © 2022 Mouse Vs Python | Powered by Pythonlibrary