Хабрахабр

Свой асинхронный tcp-сервер за 15 минут с подробным разбором

Ранее я представил пару небольших постов о потенциальной роли Spring Boot 2 в реактивном программировании. После этого я получил ряд вопросов о том, как работают асинхронные операции в программировании в целом. Сегодня я хочу разобрать, что такое Non-blocking I/O и как применить это знание для создания небольшого tcp–сервера на python, который сможет обрабатывать множество открытых и тяжелых (долгих) соединений в один поток. Знание python не требуется: все будет предельно просто со множеством комментариев. Приглашаю всех желающих!
Мне, как и многим другим разработчикам, очень нравятся эксперименты, поэтому вся последующая статья будет состоять как раз из серии экспериментов и выводов, которые они несут. Предполагается, что вы недостаточно хорошо знакомы с тематикой, и будете охотно экспериментировать со мной. Исходники примеров можно найти на github.

Задача сервера будет заключаться в получении и печати данных из сокета и возвращения строки Hello from server!. Начнем с написания очень простого tcp–сервера. Примерно так это выглядит:

Синхронный tcp-сервер

import socket # Задаем адрес сервера
SERVER_ADDRESS = ('localhost', 8686) # Настраиваем сокет
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(SERVER_ADDRESS)
server_socket.listen(10)
print('server is running, please, press ctrl+c to stop') # Слушаем запросы
while True: connection, address = server_socket.accept() print("new connection from ".format(address=address)) data = connection.recv(1024) print(str(data)) connection.send(bytes('Hello from server!', encoding='UTF-8')) connection.close()

Здесь все довольно просто. Если вы не знакомы с понятием сокета, то вот очень простая и практическая статья. Мы создаем сокет, ловим входящие соединения и обрабатываем их согласно заданной логике. Здесь стоит обратить внимание на сообщения. При создании нового соединения с клиентом мы пишем об этом в консоль.

Совершенно нормально, если что-то будет не совсем понятно в самом начале. Хочу сразу заметить, что не стоит серьезно вникать в листинги программ до полного прочтения статьи. Просто продолжайте чтение.

Поэтому на следующем этапе необходимо написать небольшой клиент для использования сервера: Нет особого смысла в сервере без клиента.

tcp-клиент

#!/usr/bin/env python
# -*- coding: utf-8 -*- import socket MAX_CONNECTIONS = 20
address_to_server = ('localhost', 8686) clients = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(MAX_CONNECTIONS)]
for client in clients: client.connect(address_to_server) for i in range(MAX_CONNECTIONS): clients[i].send(bytes("hello from client number " + str(i), encoding='UTF-8')) for client in clients: data = client.recv(1024) print(str(data))

Важной особенностью здесь является тот факт, что мы в начале устанавливаем максимально возможное количество соединений, а только потом используем их для передачи/хранения данных.

Первое, что мы видим: Давайте запустим сервер.

server is running, please, press ctrl+c to stop

Это означает, что мы успешно запустили наш сервер и он готов принимать входящие запросы. Запустим клиент и сразу увидим в консоли сервера (у вас порты могут быть другими):

server is running, please, press ctrl+c to stop
new connection from ('127.0.0.1', 39196)
b'hello from client number 0'
new connection from ('127.0.0.1', 39198)
b'hello from client number 1'
...

Что и следовало ожидать. В бесконечном цикле мы получаем новое соединение и сразу же обрабатываем данные из него. В чем тут проблема? Ранее мы использовали опцию server_socket.listen(10) для настройки сервера. Она означает максимальный размер очереди из еще не принятых подключений. Но в этом нет совершенно никакого смысла, ведь мы принимаем по одному соединению. Что предпринять в такой ситуации? На самом деле, существует несколько выходов.

  1. Распараллелить с помощью потоков/процессов (для этого можно, например, использовать fork или пул). Подробнее здесь.
  2. Обрабатывать запросы не по мере их подключения к серверу, а по мере наполнения этих соединений нужным количеством данных. Проще говоря, мы можем открыть сразу максимальное количество ресурсов и читать из них столько, сколько сможем (сколько необходимо на это процессорного времени в идеальном случае).

Вторая идея кажется заманчивой. Всего один поток и обработка множества соединений. Давайте посмотрим, как это будет выглядеть. Не стоит пугаться обилия кода. Если что-то сразу не понятно, то это вполне нормально. Можно попробовать запустить у себя и подебажить:

Асинхронный сервер


import select
import socket SERVER_ADDRESS = ('localhost', 8686) # Говорит о том, сколько дескрипторов единовременно могут быть открыты
MAX_CONNECTIONS = 10 # Откуда и куда записывать информацию
INPUTS = list()
OUTPUTS = list() def get_non_blocking_server_socket(): # Создаем сокет, который работает без блокирования основного потока server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(0) # Биндим сервер на нужный адрес и порт server.bind(SERVER_ADDRESS) # Установка максимального количество подключений server.listen(MAX_CONNECTIONS) return server def handle_readables(readables, server): """ Обработка появления событий на входах """ for resource in readables: # Если событие исходит от серверного сокета, то мы получаем новое подключение if resource is server: connection, client_address = resource.accept() connection.setblocking(0) INPUTS.append(connection) print("new connection from {address}".format(address=client_address)) # Если событие исходит не от серверного сокета, но сработало прерывание на наполнение входного буффера else: data = "" try: data = resource.recv(1024) # Если сокет был закрыт на другой стороне except ConnectionResetError: pass if data: # Вывод полученных данных на консоль print("getting data: {data}".format(data=str(data))) # Говорим о том, что мы будем еще и писать в данный сокет if resource not in OUTPUTS: OUTPUTS.append(resource) # Если данных нет, но событие сработало, то ОС нам отправляет флаг о полном прочтении ресурса и его закрытии else: # Очищаем данные о ресурсе и закрываем дескриптор clear_resource(resource) def clear_resource(resource): """ Метод очистки ресурсов использования сокета """ if resource in OUTPUTS: OUTPUTS.remove(resource) if resource in INPUTS: INPUTS.remove(resource) resource.close() print('closing connection ' + str(resource)) def handle_writables(writables): # Данное событие возникает когда в буффере на запись освобождается место for resource in writables: try: resource.send(bytes('Hello from server!', encoding='UTF-8')) except OSError: clear_resource(resource) if __name__ == '__main__': # Создаем серверный сокет без блокирования основного потока в ожидании подключения server_socket = get_non_blocking_server_socket() INPUTS.append(server_socket) print("server is running, please, press ctrl+c to stop") try: while INPUTS: readables, writables, exceptional = select.select(INPUTS, OUTPUTS, INPUTS) handle_readables(readables, server_socket) handle_writables(writables) except KeyboardInterrupt: clear_resource(server_socket) print("Server stopped! Thank you for using!")

Давайте запустим наш новый сервер и посмотрим на консоль:

Вывод асинхронного сервера

server is running, please, press ctrl+c to stop
new connection from ('127.0.0.1', 56608)
new connection from ('127.0.0.1', 56610)
new connection from ('127.0.0.1', 56612)
new connection from ('127.0.0.1', 56614)
new connection from ('127.0.0.1', 56616)
new connection from ('127.0.0.1', 56618)
new connection from ('127.0.0.1', 56620)
new connection from ('127.0.0.1', 56622)
new connection from ('127.0.0.1', 56624)
getting data: b'hello from client number 0'
new connection from ('127.0.0.1', 56626)
getting data: b'hello from client number 1'
getting data: b'hello from client number 2'

Как можно понять по выводу, мы принимаем новые коннекты и данные почти параллельно. Более того, мы не ждем данных от нового подключения. Вместо этого мы устанавливаем новое.

Как это работает?

Дело в том, что все наши операции с ресурсами (а обращение к сокету относится к этой категории) происходят через системные вызовы операционной системы. Если говорить коротко, то системные вызовы представляют собой обращение к API операционной системы.

Рассмотрим, что происходит в первом случае и во втором.

Синхронный вызов

Давайте разберем рисунок:

Далее наша программа блокируется до появления нужного события. Первая стрелка показывает, что наше приложение обращается к операционной системе для получения данных из ресурса. Минус очевиден: если у нас один поток, то другие пользователи должны ждать обработку текущего.

Асинхронный вызов

Теперь посмотрим на рисунок, который иллюстрирует асинхронный вызов:

Но посмотрите, что происходит далее. Первая стрелка, как и в первом случае, делает запрос к ОС (операционной системе) на получение данных из ресурсов. Что же делать нам? Мы не ждем данных из ресурса и продолжаем работу. Простейшим ответом будет самостоятельно опрашивать нашу систему на наличие данных. Мы отдали распоряжение ОС и не ждем результат сразу. Таким образом, мы сможем утилизировать ресурсы и не блокировать наш поток.

Такое состояние, в котором мы постоянно смотрим на данные и ждем какого-то события, называется активным ожиданием. Но самом деле такая система не является практичной. Более правильным решением было бы оставить блокировку, но сделать ее «умной». Минус очевиден: мы впустую тратим процессорное время в случае, когда информация не обновилась. Вместо этого он ожидает любое изменение данных в нашей программе. Наш поток не просто ждет какого-то определенного события. Именно так и работает функция select в нашем асинхронном сервере:

Первое, что бросается в глаза, – метод работы. Теперь можно вернуться к реализации нашего асинхронного сервера и взглянуть на него с новым знанием. Такой подход в разработке ПО называется событийно-ориентированным (event-driven development). Если в первом случае наша программа выполнялась “сверху вниз”, то во втором случае мы оперируем событиями.

У него имеется масса недостатков. Сразу стоит отметить, что такой подход не является серебряной пулей. Во-вторых, у нас есть и всегда будут блокирующие вызовы, которые все портят. Во-первых, такой код сложнее поддерживать и менять. Дело в том, что такая функция тоже обращается к ОС, следовательно, наш поток выполнения блокируется и другие источники данных терпеливо ждут. Например, в программе выше мы воспользовались функцией print.

Заключение

Выбор подхода напрямую зависит от решаемой нами задачи. Позвольте задаче самой выбрать наиболее продуктивный подход. Например, популярный Javaвеб -сервер Tomcat использует потоки. Не менее популярный сервер Nginx использует асинхронный подход. Создатели популярного веб-сервера gunicorn на python пошли по пути prefork.

В следующий раз (уже скоро) я расскажу о прочих возможных неблокирующих ситуациях в жизни наших программ. Спасибо, что прочитали статью до конца! Буду рад вас видеть и в следующих постах.

Показать больше

Похожие публикации

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»