Python 3's asyncio
module provides fundamental tools for implementing asynchronous I/O in Python. It was introduced in Python 3.4, and with each subsequent minor release, the module has evolved significantly.
This tutorial contains a general overview of the asynchronous paradigm, and how it's implemented in Python 3.7.
Blocking vs Non-Blocking I/O
The problem that asynchrony seeks to resolve is blocking I/O.
By default, when your program accesses data from an I/O source, it waits for that operation to complete before continuing to execute the program.
with open('myfile.txt', 'r') as file:
data = file.read()
# Until the data is read into memory, the program waits here
print(data)
The program is blocked from continuing its flow of execution while a physical device is accessed, and data is transferred.
Network operations are another common source of blocking:
# pip install --user requests
import requests
req = requests.get('https://www.stackabuse.com/')
#
# Blocking occurs here, waiting for completion of an HTTPS request
#
print(req.text)
In many cases, the delay caused by blocking is negligible. However, blocking I/O scales very poorly. If you need to wait for 1010 file reads or network transactions, performance will suffer.
Multiprocessing, Threading, and Asynchrony
Strategies for minimizing the delays of blocking I/O fall into three major categories: multiprocessing, threading, and asynchrony.
Multiprocessing
Multiprocessing is a form of parallel computing: instructions are executed in an overlapping time frame on multiple physical processors or cores. Each process spawned by the kernel incurs an overhead cost, including an independently-allocated chunk of memory (heap).
Python implements parallelism with the multiprocessing
module.
The following is an example of a Python 3 program that spawns four child processes, each of which exhibits a random, independent delay. The output shows the process ID of each child, the system time before and after each delay, and the current and peak memory allocation at each step.
from multiprocessing import Process
import os, time, datetime, random, tracemalloc
tracemalloc.start()
children = 4 # number of child processes to spawn
maxdelay = 6 # maximum delay in seconds
def status():
return ('Time: ' +
str(datetime.datetime.now().time()) +
'\t Malloc, Peak: ' +
str(tracemalloc.get_traced_memory()))
def child(num):
delay = random.randrange(maxdelay)
print(f"{status()}\t\tProcess {num}, PID: {os.getpid()}, Delay: {delay} seconds...")
time.sleep(delay)
print(f"{status()}\t\tProcess {num}: Done.")
if __name__ == '__main__':
print(f"Parent PID: {os.getpid()}")
for i in range(children):
proc = Process(target=child, args=(i,))
proc.start()
Output:
Parent PID: 16048
Time: 09:52:47.014906 Malloc, Peak: (228400, 240036) Process 0, PID: 16051, Delay: 1 seconds...
Time: 09:52:47.016517 Malloc, Peak: (231240, 240036) Process 1, PID: 16052, Delay: 4 seconds...
Time: 09:52:47.018786 Malloc, Peak: (231616, 240036) Process 2, PID: 16053, Delay: 3 seconds...
Time: 09:52:47.019398 Malloc, Peak: (232264, 240036) Process 3, PID: 16054, Delay: 2 seconds...
Time: 09:52:48.017104 Malloc, Peak: (228434, 240036) Process 0: Done.
Time: 09:52:49.021636 Malloc, Peak: (232298, 240036) Process 3: Done.
Time: 09:52:50.022087 Malloc, Peak: (231650, 240036) Process 2: Done.
Time: 09:52:51.020856 Malloc, Peak: (231274, 240036) Process 1: Done.
Threading
Threading is an alternative to multiprocessing, with benefits and downsides.
Threads are independently scheduled, and their execution may occur within an overlapping time period. Unlike multiprocessing, however, threads exist entirely in a single kernel process, and share a single allocated heap.
Python threads are concurrent — multiple sequences of machine code are executed in overlapping time frames. But they are not parallel — execution does not occur simultaneously on multiple physical cores.
The primary downsides to Python threading are memory safety and race conditions. All child threads of a parent process operate in the same shared memory space. Without additional protections, one thread may overwrite a shared value in memory without other threads being aware of it. Such data corruption would be disastrous.
To enforce thread safety, CPython implementations use a global interpreter lock (GIL). The GIL is a mutex mechanism that prevents multiple threads from executing simultaneously on Python objects. Effectively, this means that only one thread runs at any given time.
Here's the threaded version of the multiprocessing example from the previous section. Notice that very little has changed: multiprocessing.Process
is replaced with threading.Thread
. As indicated in the output, everything happens in a single process, and the memory footprint is significantly smaller.
from threading import Thread
import os, time, datetime, random, tracemalloc
tracemalloc.start()
children = 4 # number of child threads to spawn
maxdelay = 6 # maximum delay in seconds
def status():
return ('Time: ' +
str(datetime.datetime.now().time()) +
'\t Malloc, Peak: ' +
str(tracemalloc.get_traced_memory()))
def child(num):
delay = random.randrange(maxdelay)
print(f"{status()}\t\tProcess {num}, PID: {os.getpid()}, Delay: {delay} seconds...")
time.sleep(delay)
print(f"{status()}\t\tProcess {num}: Done.")
if __name__ == '__main__':
print(f"Parent PID: {os.getpid()}")
for i in range(children):
thr = Thread(target=child, args=(i,))
thr.start()
Output:
Parent PID: 19770
Time: 10:44:40.942558 Malloc, Peak: (9150, 9264) Process 0, PID: 19770, Delay: 3 seconds...
Time: 10:44:40.942937 Malloc, Peak: (13989, 14103) Process 1, PID: 19770, Delay: 5 seconds...
Time: 10:44:40.943298 Malloc, Peak: (18734, 18848) Process 2, PID: 19770, Delay: 3 seconds...
Time: 10:44:40.943746 Malloc, Peak: (23959, 24073) Process 3, PID: 19770, Delay: 2 seconds...
Time: 10:44:42.945896 Malloc, Peak: (26599, 26713) Process 3: Done.
Time: 10:44:43.945739 Malloc, Peak: (26741, 27223) Process 0: Done.
Time: 10:44:43.945942 Malloc, Peak: (26851, 27333) Process 2: Done.
Time: 10:44:45.948107 Malloc, Peak: (24639, 27475) Process 1: Done.
Asynchrony
Asynchrony is an alternative to threading for writing concurrent applications. Asynchronous events occur on independent schedules, "out of sync" with one another, entirely within a single thread.
Unlike threading, in asynchronous programs the programmer controls when and how voluntary preemption occurs, making it easier to isolate and avoid race conditions.
Introduction to the Python 3.7 asyncio Module
In Python 3.7, asynchronous operations are provided by the asyncio
module.
High-Level vs Low-Level asyncio API
Asyncio components are divided into high-level APIs (for writing programs), and low-level APIs (for writing libraries or frameworks based on asyncio
).
Every asyncio
program can be written using only the high-level APIs. If you're not writing a framework or library, you never need to touch the low-level stuff.
With that said, let's look at the core high-level APIs, and discuss the core concepts.
Coroutines
In general, a coroutine (short for cooperative subroutine) is a function designed for voluntary preemptive multitasking: it proactively yields to other routines and processes, rather than being forcefully preempted by the kernel. The term "coroutine" was coined in 1958 by Melvin Conway (of "Conway's Law" fame), to describe code that actively facilitates the needs of other parts of a system.
In asyncio, this voluntary preemption is called awaiting.
Awaitables, Async, and Await
Any object which can be awaited (voluntarily preempted by a coroutine) is called an awaitable.
The await
keyword suspends the execution of the current coroutine, and calls the specified awaitable.
In Python 3.7, the three awaitable objects are coroutine
, task
, and future
.
An asyncio coroutine
is any Python function whose definition is prefixed with the async
keyword.
async def my_coro():
pass
An asyncio task
is an object that wraps a coroutine, providing methods to control its execution, and query its status. A task may be created with asyncio.create_task()
, or asyncio.gather()
.
An asyncio future
is a low-level object that acts as a placeholder for data that hasn't yet been calculated or fetched. It can provide an empty structure to be filled with data later, and a callback mechanism that is triggered when the data is ready.
A task inherits all but two of the methods available to a future
, so in Python 3.7 you never need to create a future
object directly.
Event Loops
In asyncio, an event loop controls the scheduling and communication of awaitable objects. An event loop is required to use awaitables. Every asyncio program has at least one event loop. It's possible to have multiple event loops, but multiple event loops are strongly discouraged in Python 3.7.
A reference to the currently-running loop object is obtained by calling asyncio.get_running_loop()
.
Sleeping
The asyncio.sleep(delay)
coroutine blocks for delay
seconds. It's useful for simulating blocking I/O.
import asyncio
async def main():
print("Sleep now.")
await asyncio.sleep(1.5)
print("OK, wake up!")
asyncio.run(main())
Initiating the Main Event Loop
The canonical entrance point to an asyncio program is asyncio.run(main())
, where main()
is a top-level coroutine.
import asyncio
async def my_coro(arg):
"A coroutine."
print(arg)
async def main():
"The top-level coroutine."
await my_coro(42)
asyncio.run(main())
Calling asyncio.run()
implicitly creates and runs an event loop. The loop object has many useful methods, including loop.time()
, which returns a float representing the current time, as measured by the loop's internal clock.
Note: The asyncio.run()
function cannot be called from within an existing event loop. Therefore, it is possible that you see errors if you're running the program within a supervising environment, such as Anaconda or Jupyter, which is running an event loop of its own. The example programs in this section and the following sections should be run directly from the command line by executing the python file.
The following program prints lines of text, blocking for one second after each line until the last.
import asyncio
async def my_coro(delay):
loop = asyncio.get_running_loop()
end_time = loop.time() + delay
while True:
print("Blocking...")
await asyncio.sleep(1)
if loop.time() > end_time:
print("Done.")
break
async def main():
await my_coro(3.0)
asyncio.run(main())
Output:
Blocking...
Blocking...
Blocking...
Done.
Tasks
A task is an awaitable object that wraps a coroutine. To create and immediately schedule a task, you can call the following:
asyncio.create_task(coro(args...))
This will return a task object. Creating a task tells the loop, "go ahead and run this coroutine as soon as you can."
If you await a task, execution of the current coroutine is blocked until that task is complete.
import asyncio
async def my_coro(n):
print(f"The answer is {n}.")
async def main():
# By creating the task, it's scheduled to run
# concurrently, at the event loop's discretion.
mytask = asyncio.create_task(my_coro(42))
# If we later await the task, execution stops there
# until the task is complete. If the task is already
# complete before it is awaited, nothing is awaited.
await mytask
asyncio.run(main())
Output:
The answer is 42.
Tasks have several useful methods for managing the wrapped coroutine. Notably, you can request that a task be canceled by calling the task's .cancel()
method. The task will be scheduled for cancellation on the next cycle of the event loop. Cancellation is not guaranteed: the task may complete before that cycle, in which case the cancellation does not occur.
Gathering Awaitables
Awaitables can be gathered as a group, by providing them as a list argument to the built-in coroutine asyncio.gather(awaitables)
.
The asyncio.gather()
returns an awaitable representing the gathered awaitables, and therefore must be prefixed with await
.
If any element of awaitables is a coroutine, it is immediately scheduled as a task.
Gathering is a convenient way to schedule multiple coroutines to run concurrently as tasks. It also associates the gathered tasks in some useful ways:
- When all gathered tasks are complete, their aggregate return values are returned as a list, ordered in accordance with the awaitables list order.
- Any gathered task may be canceled, without canceling the other tasks.
- The gather itself can be cancelled, cancelling all tasks.
Example: Async Web Requests with aiohttp
The following example illustrates how these high-level asyncio APIs can be implemented. The following is a modified version, updated for Python 3.7, of Scott Robinson's nifty asyncio example. His program leverages the aiohttp
module to grab the top posts on Reddit, and output them to the console.
Make sure that you have aiohttp
module installed before you run the script below. You can download the module via the following pip command:
$ pip install --user aiohttp
import sys
import asyncio
import aiohttp
import json
import datetime
async def get_json(client, url):
async with client.get(url) as response:
assert response.status == 200
return await response.read()
async def get_reddit_top(subreddit, client, numposts):
data = await get_json(client, 'https://www.reddit.com/r/' +
subreddit + '/top.json?sort=top&t=day&limit=' +
str(numposts))
print(f'\n/r/{subreddit}:')
j = json.loads(data.decode('utf-8'))
for i in j['data']['children']:
score = i['data']['score']
title = i['data']['title']
link = i['data']['url']
print('\t' + str(score) + ': ' + title + '\n\t\t(' + link + ')')
async def main():
print(datetime.datetime.now().strftime("%A, %B %d, %I:%M %p"))
print('---------------------------')
loop = asyncio.get_running_loop()
async with aiohttp.ClientSession(loop=loop) as client:
await asyncio.gather(
get_reddit_top('python', client, 3),
get_reddit_top('programming', client, 4),
get_reddit_top('asyncio', client, 2),
get_reddit_top('dailyprogrammer', client, 1)
)
asyncio.run(main())
If you run the program multiple times, you'll see that the order of the output changes. That's because the JSON requests are displayed as they're received, which is dependent on the server's response time, and the intermediate network latency. On a Linux system, you can observe this in action by running the script prefixed with (e.g.) watch -n 5
, which will refresh the output every 5 seconds:
Other High-level APIs
Hopefully, this overview gives you a solid foundation of how, when, and why to use asyncio. Other high-level asyncio APIs, not covered here, include:
- stream, a set of high-level networking primitives for managing asynchronous TCP events.
- lock, event, condition, async analogs of the synchronization primitives provided in the threading module.
- subprocess, a set of tools for running async subprocesses, such as shell commands.
- queue, an asynchronous analog of the queue module.
- exception, for handling exceptions in async code.
Conclusion
Keep in mind that even if your program doesn't require asynchrony for performance reasons, you can still use asyncio
if you prefer writing within the asynchronous paradigm. I hope this overview gives you a solid understanding of how, when, and why to begin using use asyncio
.
from Planet Python
via read more
No comments:
Post a Comment