Мои контакты


понедельник, 27 мая 2019 г.

Что внутри asyncio

В этой статье мы предлагаем читателю совершить с нами в меру увлекательное путешествие в недра asyncio, чтобы разобраться, как в ней реализовано асинхронное выполнение кода. Мы оседлаем коллбэки и промчимся по циклу событий сквозь пару ключевых абстракций прямо в корутину. Если на вашей карте питона еще нет этих достопримечательностей, добро пожаловать под кат.


Краткая справка о раскинувшейся перед нами местности


asyncio — библиотека асинхронного ввода/вывода которая, согласно pep3153, была создана, чтобы предоставить стандартизованную базу для создания асинхронных фреймворков. pep3156 так же приписывает ей необходимость обеспечить предельно простую интеграцию в уже существовавшие асинхронные фреймворки (Twisted, Tornado, Gevent). Как мы можем сейчас наблюдать, эти цели были успешно достигнуты — появился новый фреймворк на основе asyncio: aiohttp, в Tornado AsyncioMainLoop является циклом событий по умолчанию с версии 5.0, в Twisted asyncioreactor доступен с версии 16.5.0, а для Gevent есть сторонняя библиотека aiogevent.

asyncio — это гибридная библиотека, использующая одновременно два подхода к реализации асинхронного выполнения кода: классический на коллбэка и относительно новый (по крайней мере для питона) на корутинах. В её основе лежат три основные абстракции, являющиеся аналогами абстракций, существующих в сторонних фреймворках:
  • Pluggable Event Loop: подключаемый цикл событий. Подключаемый значит, что он может быть в две строчки кода заменен на другой, реализующий такой же интерфейс. Сейчас есть реализации на cython поверх libuv (uvloop) и на языке Rust (asyncio-tokio).
  • Future: результат операции, который будет доступен в будущем. Нужен, чтобы получать в корутинах результат выполнения коллбэков.
  • Task: специальный подкласс класса Future для запуска корутин на цикле событий.

Поехали!


Цикл событий — основная составляющая библиотеки, по дорогам, пролегающим через него, данные доставляются в любые её компоненты. Он большой и сложный, поэтому сначала рассмотрим его урезанный вариант.

# ~/inside_asyncio/base_loop.py

import collections
import random

class Loop:
    def __init__(self):
        # Очередь для хранения коллбэков
        self.ready = collections.deque()

    def call_soon(self, callback, *args):
        # складывает кортэж из коллбэка и его аргументов в очередь
        self.ready.append((callback, args))

    def run_until_complete(self, callback, *args):
        # Этот метод выполняет всё работу по запуску коллбэков
        self.call_soon(callback, *args)
        # Перекресток вех дорог - основной цикл
        # он крутится пока очередь не опустеет
        while self.ready:
            ntodo = len(self.ready)
            # внутренний цикл итерируется столько раз 
            # сколько было коллбэков в очереди на момент его запуска
            for _ in range(ntodo):
                # на каждой интерации достаёт из очереди
                # один коллбэк и его параметры и запускает
                callback, args = self.ready.popleft()
                callback(*args)

def callback(loop):
    print('Рассказчик')
    loop.call_soon(print, 'Читатель')

loop = Loop()
loop.run_until_complete(c, loop)

Оседлав наш маленький коллбэк, мы отправляемся в путь через call_soon, попадаем в очередь и после краткого ожидания будем выведены на экран.

Эпизод про плохие коллбэки


Стоит упомянуть, что коллбэки это опасные лошадки — если они сбросят вас посреди дороги, интерпретатор питона не сможет помочь понять, где это произошло. Если не верите, покатайтесь той же дорогой на коллбэке maybe_print, приходящем к финишу примерно в половине случаев.

# ~/inside_asyncio/base_loop.py

def maybe_print(msg):
    if random.randint(0, 1):
        raise Exception(msg)
    else:
        print(msg)

def starting_point(loop):  # Место посадки
    print('Рассказчик')
    loop.call_soon(maybe_print, 'Читатель')

def main(loop):
    loop.call_soon(starting_point, loop)
    loop.call_soon(starting_point, loop)

loop = Loop()
loop.run_until_complete(main, loop)

Ниже показан полный трейсбэк запуска предыдущего примера. Из-за того, что функция maybe_print была запущена циклом событий, а не напрямую из starting
point, трейсбэк заканчивается именно на нём, в методе run_until_complete. По такому трейсбэку невозможно определить, где в коде находится starting_point, что значительно усложнит отладку, если starting_point будут находится в нескольких местах кодовой базы.

$: python3 base_loop.py 
>> Рассказчик  # Доехал первый раз
>> Читатель    # Доехал первый раз
>> Рассказчик  # Доехал второй раз
>> Traceback (most recent call last):
>>   File "base_loop.py", line 42, in 
>>     loop.run_until_complete(main, loop)
>>   File "base_loop.py", line 17, in run_until_complete
>>     callback(*args)
>>   File "base_loop.py", line 29, in maybe_print
>>     raise Exception(msg)
>> Exception: Читатель  # не доехал второй раз

Непрерывный стек вызовов нужен не только для вывода полного трейсбэка, но и для реализации других возможностей языка. Например, на нём основана обработка исключений. Пример ниже не заработает, потому что к моменту запуска starting_point, функция main уже будет выполнена:

# ~/inside_asyncio/base_loop.py

def main(loop):
    try:
        loop.call_soon(starting_point, loop)
        loop.call_soon(starting_point, loop)
    except:
        pass

Loop().run_until_complete(main, loop)

Следующий пример тоже не заработает. Менеджер контекста в функции main откроет и закроет файл ещё до того, как будет запущена его обработка.

# ~/inside_asyncio/base_loop.py

def main(loop):
    with open('file.txt', 'rb') as f:
        loop.call_soon(process_file, f)

Loop().run_until_complete(main, loop)
# тут аналогия с путешествием достигла моего лимита, дальше без неё =(

Отсутствие непрерывного стека вызовов ограничивает использование привычных возможностей языка. Для частичного обхода этого недостатка в asyncio пришлось добавить много дополнительного кода, не относящeгося напрямую к решаемой ей задаче. Этот код, по большей части, отсутствует в примерах — они и без него довольно сложны.

Из цикла событий во внешний мир и обратно


Цикл событий сообщается с внешним миром через операционную систему посредством событий. Код, который умеет с ней работать, предоставляется модулем стандартной библиотеки под названием selectors. Он позволяет сказать операционной системе, что мы ждем какого-то события, а потом спросить, произошло ли оно. В примере ниже ожидаемым событием будет доступность сокета на чтение.

# ~/inside_asyncio/event_loop.py

import selectors
import socket
import collections
from future import Future
from handle import Handle
from task import Task


class EventLoop:
    def __init__(self):
        self.ready = collections.deque()
        # Добавляем селектор
        self.selector = selectors.DefaultSelector()

    def add_reader(self, sock, callback):
        # Регистрируем ожидание доступности сокета на чтение
        # параметры:
        # сокет,
        # константа содержащая битовую маску доступности сокета на чтение
        # кортеж с данными которые мы хотим ассоциировать с этим событием
        self.selector.register(
            sock, socket.EVENT_READ, (self._accept_conn, sock, callback)
        )

    def _accept_conn(self, sock, callback):
        # принимаем входящее соединение
        conn, addr = sock.accept()
        conn.setblocking(False)
        # регистрируем ожидание данных на сокете
        self.selector.register(
            conn, socket.EVENT_READ, (callback, conn)
        )

    def run_until_complete(self, callback, *args):
        self.call_soon(callback, *args)
        # основной цикл крутится пока очередь не пустая или мы ожидаем каких-то событий
        while self.ready or self.selector.get_map():
            ntodo = len(self._ready)
            for _ in range(ntodo):
                callback, args = self.ready.popleft()
                callback(*args)

            # второй подцикл итерируется по наступившим событиям
            for key, events in self.selector.select(timeout=0):
                # достает коллбэк и аргументы из кортежа с ассоциированными данными
                callback, *args = key.data
                # добавляет их в очередь на выполнение
                self.call_soon(callback, *args)

    def call_soon(self, callback, *args):
        self.ready.append((callback, args))


def print_data(conn):
    print(conn.recv(1000))

def main(loop):
    # создаём сокет
    sock = socket.socket()
    # привязываем к локалхосту на 8086 порту
    sock.bind(('localhost', 8086))
    sock.listen(100)
    sock.setblocking(False)
    # добавляем коллбэк для чтения данных
    loop.add_reader(sock, print_data)

loop = EventLoop()
# запускаем цикл событий
loop.run_until_complete(main, loop)

Гонец из внешнего мира оставляет своё сообщение или посылку в селекторе, а селектор передаёт её получателю. Теперь стало возможным читать из сокета, используя цикл событий. Если запустить этот код и подключиться с помощью netcat, то он будет добросовестно выводить всё, что в него будет отправлено.

$: nc localhost 8086         $: python3 event_loop.py
"Hi there!"                  b'"Hi there!"\n'
"Hello!"                     b'"Hello!"\n'
"Answer me, please!"         b'"Answer me, please!"\n'

В начале статьи говорилось, что asyncio — гибридная библиотека, в которой корутины работают поверх коллбэков. Для реализации этой функциональности используются две оставшиеся основные абстракции: Task и Future. Далее будет показан код этих абстракций, а затем, как, используя их цикл событий, выполняются корутины.

Future


Ниже представлен код класса Future. Он нужен для того, чтобы в корутине можно было дождаться завершения выполнения коллбэка и получить его результат.

# ~/inside_asyncio/future.py

import sys
from asyncio import events, CancelledError


class Future:
    # хранит состояние коллбэка результат выполнения которого представляет
    _state = 'PENDING'  # FINISHED, CANCELLED 
    # стек вызовов до того места где был создан экземпляр Future
    # нужен чтобы в случае возникновения исключения вывести понятный трейсбэк
    _source_traceback = None
    # список коллбэков которые должны быть вызваны когда изменится состояние ожидаемого коллбэка
    _callbacks = []
    # исключение если оно было возбуждено во время выполения ожидаемого коллбэка
    _exception = None
    # цикл событий чтобы знать где запускать коллбэки на смену состояния
    _loop = None
    # результат выполнения ожидаемого коллбэка
    _result = None

    def __init__(self, loop):
        self._loop = loop
        self._source_traceback = events.extract_stack(sys._getframe(1))

    def add_done_callback(self, callback):
        # добавляет коллбэки на смену состояния в список
        self._callbacks.append(callback)

    def _schedule_callbacks(self):
        # запускает коллбэки на смену состояния на цикле событий
        for callback in self._callbacks:
            self._loop.call_soon(callback, self)
        self._callbacks[:] = []

# Один из следующих трёх методов должен быть вызван для изменения состояния Future
# когда ожидаемый коллбэк каким-либо образом завершит свое выполнение
    def set_exception(self, exception):
        # в случае возникновения исключения сохраняем его
        self._exception = exception
        # меняем состояние
        self._state = 'FINISHED'
        # запускаем коллбэки на смену состояния
        self._schedule_callbacks()

    def set_result(self, result):
        # если ожидаемый коллбэк завершился успешно сохраняем результат выполнения
        self._result = result
        self._state = 'FINISHED'
        self._schedule_callbacks()

    def cancel(self):
        # в случае отмены просто меняем состояние
        self._state = 'CANCELLED'
        self._schedule_callbacks()

    def result(self):
        # метод для получения результат
        # возбуждает исключение если ожидание завершения коллбэка было отменено
        if self._state == 'CANCELLED':
            raise CancelledError
        # или оно возникло во время выполнения ожидаемого коллбэка
        if self._exception is not None:
            raise self._exception
        # иначе возвращает результат
        return self._result

    def __await__(self):
        # магический метод, вызывается ключевым словом await
        # если находимся в состоянии ожидания йилдим себя
        if self._state == 'PENDING':
            yield self
        # иначе пытаемся вернуть результат
        return self.result()

Task


Это специальный подкласс класса Future. Он нужен для запуска корутины на коллбэчном цикле событий.

# ~/inside_asyncio/task.py

from asyncio import futures
from future import Future


class Task(Future):
    def __init__(self, coro, *, loop=None):
        super().__init__(loop=loop)
        # сохраняет выполняемую корутину
        self._coro = coro

    def _step(self, exc=None):
        # метод вызываемы циклом событий, нужен чтобы крутить корутину
        try:
            if exc is None:
                # если не получено исключение отправляем в корутину None 
                # что заставляет её прокрутится на один шаг
                result = self._coro.send(None)
            else:
                # если получено исключение возбуждаем его в корутине
                self._coro.throw(exc)
        except StopIteration:
            result = None
        except Exception as exc:
            self.set_exception(exc)
        else:
            # если получили Future из корутины добавляем ей метод
            # wakeup как коллбэк на смену состояния
            if isinstance(result, Future):
                result.add_done_callback(self._wakeup)
            # иначе шедулим вызов метода step циклом событий еще раз
            elif result is None:
                self._loop.call_soon(self._step)

    def _wakeup(self, future):
        # метод с помощью которого Future возвращает поток выполнения в ожидающую её Task
        # с исключением
        try:
            future.result()
        except Exception as exc:
            self._step(exc)
        # или без в зависимости от успешности завершения Future
        else:
            self._step()

Цикл событий, умеющий работать с Future


# ~/inside_asyncio/future_event_loop.py

import selectors
from selectors import EVENT_READ, EVENT_WRITE
import socket
import collections
from future import Future
from task import Task

class EventLoop:
    def __init__(self):
        self._ready = collections.deque()
        self.selector = selectors.DefaultSelector()

    def run_until_complete(self, callback, *args):
        self.call_soon(callback, *args)
        while self._ready or self.selector.get_map():
            ntodo = len(self._ready)
            for _ in range(ntodo):
                callback, args = self._ready.popleft()
                callback(*args)

            for key, events in self.selector.select(timeout=0):
                callback, *args = key.data
                self.call_soon(callback, *args)

    def call_soon(self, callback, *args):
        self._ready.append((callback, args))

# Два метода умеющих работать с Future
    def sock_accept(self, sock, fut=None):
        # метод принимающий входящее соединение на сокете
        # создаёт Future если не получил её
        fut = fut if fut else Future(loop=self)
        try:
            # пытается принять входящее соединение
            conn, address = sock.accept()
            conn.setblocking(False)
        except (BlockingIOError, InterruptedError):
            # если входящего соединия нет
            # регистрирует сам себя на ожидание
            # передав свежесозданную Future в качестве параметра
            self.selector.register(
                sock, EVENT_READ, (self.sock_accept, sock, fut)
            )
        except Exception as exc: 
            fut.set_exception(exc)
            self.selector.unregister(sock)
        else:
            # если соединение установлено
            # вызывает метод Future для сохранения результата
            fut.set_result((conn, address))
            self.selector.unregister(sock)
        return fut

    def sock_recv(self, sock, n, fut=None):
        # метод для получения данных из сокета
        # практичесски идентичен предыдущему за тем исключением,
        # что пытается не принять соединение, а получить данные из сокета
        fut = fut if fut else Future(loop=self)
        try:
            data = sock.recv(n)
        except (BlockingIOError, InterruptedError):
            self.selector.register(
                sock, EVENT_READ, (self.sock_recv, sock, n, fut)
            )
        except Exception as exc:
            fut.set_exception(exc)
            self.selector.unregister(sock)
        else:
            fut.set_result(data)
            self.selector.unregister(sock)
        return fut

            

async def main(loop):
    sock = socket.socket()
    sock.bind(('localhost', 8080))
    sock.listen(100)
    sock.setblocking(False)
    # ожидаем входящего соединения
    conn, addr = await loop.sock_accept(sock)
    # получаем из него данные
    result = await loop.sock_recv(conn, 1000)
    print(result)

loop = EventLoop()
# заворачиваем корутину в Task 
task = Task(coro=main(loop), loop=loop)
# шедулим метод степ для запуска на цикле событий
loop.run_until_complete(task._step)

Двинемся дальше


Теперь проследим за тем, как корутина main будет выполняться:

class EventLoop:
    def run_until_complete(self, callback, *args):
        # task._step добавляется в очередь
        self.call_soon(callback, *args)
        while self._ready or self.selector.get_map():
            ntodo = len(self._ready)
            for _ in range(ntodo):
                callback, args = self._ready.popleft()
                # и практически сразу вызывается
                callback(*args)  # task._step()
clsss Task:
    def _step(self, exc=None):
        try:
            if exc is None:
                # отправляет None в корутину
                result = self._coro.send(None)
            else:
async def main(loop):
    # корутина прокручивается на один шаг
    # создаётся сокет
    sock = socket.socket()
    sock.bind(('localhost', 8080))
    sock.listen(100)
    sock.setblocking(False)
    # вызывается метод цикла событий для ожидания входящего соединения
    conn, addr = await loop.sock_accept(sock)
    result = await loop.sock_recv(conn, 1000)
    print(result)
class EventLoop:
    def sock_accept(self, sock, fut=None):
        # создаёт экземпляр Future
        fut = fut if fut else Future(loop=self)
        try:
            # пытается принять входящее соединение
            conn, address = sock.accept()
            conn.setblocking(False)
        except (BlockingIOError, InterruptedError):
            # так как соединения нет
            # регистрирует сам себя на ожидание
            # передав свежесозданную Future в качестве параметра
            self.selector.register(
                sock, EVENT_READ, (self.sock_accept, sock, fut)
            )
        except Exception as exc: 
            self.selector.unregister(sock)
        # возвращает Future в корутину
        return fut
async def main(loop):
    sock = socket.socket()
    sock.bind(('localhost', 8080))
    sock.listen(100)
    sock.setblocking(False)
    # ключевое слово await вызывает метод __await__ полученной Future
    conn, addr = await loop.sock_accept(sock)
    result = await loop.sock_recv(conn, 1000)
    print(result)
class Future:
    def __await__(self):
        # так как Future находится в состоянии ожидания она йилдит саму себя
        if self._state == 'PENDING':
            yield self
        return self.result()
class Task(Future):
    def _step(self, exc=None):
        try:
            if exc is None:
                # результат йилда пробрасывается напрямую в то место откуда ворутину пришел None
                result = self._coro.send(None)  # result = fut
        else:
            # получили Future из корутины добавляем ей метод
            # wakeup как коллбэк на смену состояния
            if isinstance(result, Future):
                result.add_done_callback(self._wakeup)
            elif result is None:
                self._loop.call_soon(self._step)

# тут выполнение данной корутины останавливается - крутящие её эксземпляры Task и Future
# ждут входящего соединения
# если бы в очереди были другие коллбэки цикл событий бы переключился на их выполнение
class EventLoop:
    def run_until_complete(self, callback, *args):
        self.call_soon(callback, *args)
        while self._ready or self.selector.get_map():
            ntodo = len(self._ready)
            for _ in range(ntodo):
                callback, args = self._ready.popleft()
                callback(*args)

            for key, events in self.selector.select(timeout=0):
                # пришло входящее соединение
                callback, *args = key.data
                self.call_soon(callback, *args)  # loop.sock_accept(sock, fut)
class EventLoop:
    def sock_accept(self, sock, fut=None):
        fut = fut if fut else Future(loop=self)
        try:
            # принимаем входящее соединение
            conn, address = sock.accept()
            conn.setblocking(False)
        except (BlockingIOError, InterruptedError):
        --------------------------------
        else:
            # устанавливаем результат Future
            fut.set_result((conn, address))
            self.selector.unregister(sock)
        return fut
class Future:
    def set_result(self, result):
        # устанавливает результат
        self._result = result
        # меняет состояние
        self._state = 'FINISHED'
        # вызывает коллбэки на смену состояния
        self._schedule_callbacks()

    def _schedule_callbacks(self):
        for callback in self._callbacks:
            # у нас только один коллбэк на смену состояния task.wakeup
            self._loop.call_soon(callback, self)  # (task.wakeup, fut)
        self._callbacks[:] = []
class EventLoop:
    def run_until_complete(self, callback, *args):
        self.call_soon(callback, *args)
        while self._ready or self.selector.get_map():
            ntodo = len(self._ready)
            for _ in range(ntodo):
                callback, args = self._ready.popleft()
                # на следующей итерации главного цикла 
                # будет вызван метод task.wakeup 
                callback(*args)  # task.wakeup(fut)
class Task(Future):
    def _wakeup(self, future):
        try:
            future.result()
        except Exception as exc:
            self._step(exc)
        else:
            # так как Future завершилась успешно он вызовет метод task._step
            self._step()

    def _step(self, exc=None):
        try:
            if exc is None:
                # который отправит в корутину ещё один None
                result = self._coro.send(None)
            else:
async def main(loop):
    sock = socket.socket()
    sock.bind(('localhost', 8080))
    sock.listen(100)
    sock.setblocking(False)
    # ключевое слово await вызывает метод __awai__ второй раз
    conn, addr = await loop.sock_accept(sock)
    result = await loop.sock_recv(conn, 1000)
    print(result)
class Future:
    def __await__(self):
        if self._state == 'PENDING':
            yield self
        # так как Future завершена возвращаем результат
        return self.result()
async def main(loop):
    sock = socket.socket()
    sock.bind(('localhost', 8080))
    sock.listen(100)
    sock.setblocking(False)
    # результат возвращенный из Future помещается в переменные conn и addr
    conn, addr = await loop.sock_accept(sock)
    result = await loop.sock_recv(conn, 1000)
    print(result)

Вот таким нехитрым способом asyncio выполняет корутины.

Итоги


Цель создания asyncio была успешно достигнута. Она не только решила проблему совместимости, но и вызвала огромный рост интереса к конкурентному программированию в сообществе. Новые статьи и библиотеки начали появляться, словно грибы после дождя. Кроме того, asyncio повлияла и на сам язык: в него были добавлены нативные корутины и новые ключевые слова async/await. В предыдущий раз новое ключевое слово добавлялось в далеком 2003 году, это было ключевое слово yield.

Одной из целей создания asyncio было обеспечить предельно простую интеграцию в уже существовавшие асинхронные фреймворки (Twisted, Tornado, Gevent). Из этой цели логически вытекает выбор инструментов: если бы не было требования совместимости, возможно, корутинам была бы отдана главная роль. Из-за того, что при программировании на коллбэках невозможно сохранить непрерывный стэк вызовов, на границе между ними и корутинами пришлось создать дополнительную систему, обеспечивающую поддержку опирающихся на него возможностей языка.

Теперь главный вопрос. Зачем всё это знать простому пользователю библиотеки, который следует рекомендациям из документации и использует лишь корутины и высокоуровневый API?

Вот кусок документации класса StreamWriter:


Его экземпляр возвращается функцией asyncio.open_connection и является async/await API поверх API на коллбэках. И эти коллбэки из него торчат. Функции write и writelines синхронные, они пытаются писать в сокет, а если не получается, то сбрасывают данные в нижележащий буфер и добавляют коллбэки на запись. Корутина drain нужна для того, чтобы обеспечить возможность дождаться, пока количество данных в буфере не опустится до заданного значения.

Если забыть вызвать drain между вызовами write, то внутренний буфер может разрастись до неприличных размеров. Однако, если помнить об этом, то остается пара неприятных моментов. Первый: если коллбэк на запись «сломается», то корутина, использующая этот API никак об этом не узнает и, соответственно, не сможет обработать. Второй: если корутина «сломается», то коллбэк на запись никак об этом не узнает и продолжит писать данные из буфера.

Таким образом, даже используя только корутины, будьте готовы к тому, что коллбэки напомнят о себе.

О том, как работать с базами данных из асинхронного кода, вы можете прочитать в этой статье нашего корпоративного блога Antida software.

Автор: Александр Меренков, Python developer @ Antida software.

Комментариев нет:

Отправить комментарий