Обучающие Серии

Подпишись на обновления блогa, чтобы не пропустить следующий пост!

Оглавление

Введение

В прошлой части мы познакомились с TCP-сокетами, как инструментом передачи данных по сети, основами их использования на стороне сервера и клиента. В этот раз речь пойдет про обработку запросов. Существует множество способов организации обработки запросов сервером: один клиент - один процесс, один клиент - один поток, много клиентов - один поток с асинхронным I/O, а также гибридные подходы. Мы рассмотрим основные моменты каждого из подходов, их достоинства и недостатки.

Картинка для привлечения внимания (логотип Python и кусочек редактора кода)

Один клиент - один процесс

Начнем с самого простого способа. Однопоточный сервер постоянно ожидает клиентские подключения (accept_client_conn). Получив новое соединение, сервер вычитывает запрос (read_request), обрабатывает его (handle_request) и отправляет клиенту ответ (write_response):

# python3
import socket
import sys
import time

def run_server(port=53210):
  serv_sock = create_serv_sock(port)
  cid = 0
  while True:
    client_sock = accept_client_conn(serv_sock, cid)
    serve_client(client_sock, cid)
    cid += 1

def serve_client(client_sock, cid):
  request = read_request(client_sock)
  if request is None:
    print(f'Client #{cid} unexpectedly disconnected')
  else:
    response = handle_request(request)
    write_response(client_sock, response, cid)


def create_serv_sock(serv_port):
  serv_sock = socket.socket(socket.AF_INET,
                            socket.SOCK_STREAM,
                            proto=0)
  serv_sock.bind(('', serv_port))
  serv_sock.listen()
  return serv_sock

def accept_client_conn(serv_sock, cid):
    client_sock, client_addr = serv_sock.accept()
    print(f'Client #{cid} connected '
          f'{client_addr[0]}:{client_addr[1]}')
    return client_sock

def read_request(client_sock, delimiter=b'!'):
  request = bytearray()
  try:
    while True:
      chunk = client_sock.recv(4)
      if not chunk:
        # Клиент преждевременно отключился.
        return None

      request += chunk
      if delimiter in request:
        return request

  except ConnectionResetError:
    # Соединение было неожиданно разорвано.
    return None
  except:
    raise

def handle_request(request):
  time.sleep(5)
  return request[::-1]

def write_response(client_sock, response, cid):
  client_sock.sendall(response)
  client_sock.close()
  print(f'Client #{cid} has been served')


if __name__ == '__main__':
    run_server(port=int(sys.argv[1]))

(смотреть на github)

Разумно было бы полагать, что у такой простоты есть своя цена. Наш сервер работает в одном процессе (и одном потоке). Обработка клиентов происходит синхронно. Таким образом, пока сервер занят одним клиентом, остальные клиенты вынуждены ждать в очереди (на видео в левом нижнем углу необходимо обратить внимание на количество процессов и потоков nTH команды python в выводе top):

Один процесс на всех клиентов, синхронная обработка.

Очевидно, что это далеко не самый эффектинвый способ огранизовать обработку запросов. Попробуем его улучшить, назначив каждому клиенту выделенный процесс:

# python3
...
import os

def run_server(port=53210):
  serv_sock = create_serv_sock(port)
  active_children = set()
  cid = 0
  while True:
    client_sock = accept_client_conn(serv_sock, cid)
    child_pid = serve_client(client_sock, cid)
    active_children.add(child_pid)
    reap_children(active_children)
    cid += 1

def serve_client(client_sock, cid):
  child_pid = os.fork()
  if child_pid:
    # Родительский процесс, не делаем ничего
    client_sock.close()
    return child_pid

  # Дочерний процесс:
  #  - читаем запрос
  #  - обрабатываем
  #  - записываем ответ
  #  - закрываем сокет
  #  - завершаем процесс (os._exit())
  request = read_request(client_sock)
  if request is None:
    print(f'Client #{cid} unexpectedly disconnected')
  else:
    response = handle_request(request)
    write_response(client_sock, response, cid)
  os._exit(0)

def reap_children(active_children):
  for child_pid in active_children.copy():
    child_pid, _ = os.waitpid(child_pid, os.WNOHANG)
    if child_pid:
      active_children.discard(child_pid)

# create_serv_sock, accept_client_conn, \
# read_request, handle_request, write_response
#  ... см. пример выше

(смотреть на github)

Структура нашего сервера осталась практически неизменной. В то же время, он показывает новые свойства. Входящие запросы не блокируют сервер и могут быть обслужены конкурентно. Функция обработки запросов serve_client() использует fork() для запуска новых процессов на каждый входящий запрос. Таким образом, главный процесс сервера ответственен только за прием входящих соединений. Сразу после того, как был создан клиентский сокет, он передается в serve_client(), которая, уже в дочернем процессе, вычитывает запрос, производит его обработку, отсылает ответ, после чего закрывает сокет и завершает дочерний процесс используя функцию os._exit().

Демонстрацию работы нового сервера можно посмотреть на следующем видео (традиционно обращаем внимание на вкладку top в левом нижнем углу):

Один клиент - один процесс.

Запуск процессов всегда сопряжен с дополнительной сложностью по контролю за дочерними процессами, а также за общими ресурсами. Код нашего сервера сильно упрощен, и в нем отсутвтует обработка ошибок и сигналов. В то же время, хотелось бы обратить внимание на часть функции serve_client(), которая выполняется в родительском (главном) процессе сервера. После запуска дочернего процесса, родительский процесс немедленно закрывает клиентский сокет и сохраняет pid дочернего процесса. Для чего нужны эти лишние действия?

Для начала разберемся с сокетом. Как мы уже знаем из прошлой статьи, сокеты представлены файловыми дескрипторами. Особенность fork() состоит в том, что как родительский, так и дочерний процессы могут иметь доступ к файловым дескрипторам, открытым до вызова fork(). Чтобы избежать использования лишних ресурсов и конкурентного доступа к клиентскому сокету, главный процесс сервера должен закрыть его на своей стороне.

Вторая особенность fork() заключается в том, что даже завершившиеся сами по себе дочерние процессы (см. вызов os._exit() в serve_client()) нуждаются в явной обработке этого действия родительским процессом. До тех пор, пока родительский процесс не вызовет os.waitpid(child_pid) часть ресурсов (хоть и небольшая) дочернего процесса не будет освобождена операционной системой, а соответствующий дочерний процесс будет в состоянии зомби. Для простоты, в нашем примере главный процесс производит освобождение reap_children() завершившихся дочерних процессов после подключения каждого следующего клиента. Правильной же организацией была бы периодическая попытка вызывать reap_children() при наличии активных дочерних процессов не реже, чем раз в несколько секунд.

Когда используется подход на основе fork()? Исторически, это был один из первых способ решения проблемы конкурентной обработки запросов. Знаменитый режим prefork Apache HTTP Server - яркий пример такого использования. В то же время, в современном коде все еще имеет смысл использовать этот подход, когда сложно добиться потокобезопасности. При создании процесса вызовом fork() происходит копирование (хоть и отложенное) адресного пространства родительского процесса в дочерний. Модификации переменных в дочерних процессах не имеют эффекта в родительском процессе. Таким образом, каждый из процессов-обработчиков выполняется в изолированном окружении без риска попасть в состояние гонки. Также, благодаря изоляции, проблемы при обработке одного запроса не распространяются на остальные процессы-обработчики.

Когда не стоит использовать подход на основе fork()? Запуск процессов, в частности накладные расходы, связанные с копированием адресного пространства и последующего переключение контекста между процессами, является основным недостатком подхода. У изоляции своя цена и она достаточно высока. При большом числе запросов в секунду доля процессорного времени, затрачиваемая на накладные расходы, может оказаться чуть ли не выше, чем доля, приходящаяся на полезную нагрузку. Существуют и оптимизации, когда используется пул процессов-обработчиков (см. документацию Apache HTTP Server prefork выше), призванный устранить расходы на копирование адресного пространства и ограничить число переключений контекста за счет фиксированного числа процессов-обработчиков в пуле. Очевидно, что при пуле из K процессов-обработчиков не возможно конкурентно обрабатывать более K запросов. Типичный размер пула это десятки, реже сотни, процессов.

Один клиент - один поток

Естественным развитием идеи, описанной выше, является обработка запросов в отдельных потоках:

# python3
...
import threading

def run_server(port=53210):
  serv_sock = create_serv_sock(port)
  cid = 0
  while True:
    client_sock = accept_client_conn(serv_sock, cid)
    t = threading.Thread(target=serve_client,
                         args=(client_sock, cid))
    t.start()  # Запуск нового потока
    cid += 1

def serve_client(client_sock, cid):
  # Реализация совпадает с версией из синхронной обработки.
  request = read_request(client_sock)
  if request is None:
    print(f'Client #{cid} unexpectedly disconnected')
  else:
    response = handle_request(request)
    write_response(client_sock, response, cid)

# create_serv_sock, accept_client_conn, \
# read_request, handle_request, write_response
#  ... см. пример выше

(смотреть на github)

И снова структура сервера практически не изменилась. Главный поток (соответсвтующий главному процессу сервера) как и прежде занимается лишь приемом входящих подключений. Создав очередной клиентский сокет (accept_client_conn), он запускает дополнительный поток, входной точкой для которого является простая версия serve_client(). Выполнив обработку запроса, дополнительный поток немедленно завершается (традиционно обращаем внимание на вкладку top в левом нижнем углу, колонка nTH показывает количесто одновременны запущенных потоков команды python):

Один клиент - один поток.

Использование потока вместо процесса на каждое новое подключение является более эффективным способом организации обработки запросов. В то же время, переключение контекста все еще присутствует, хоть и не включает в себя самую дорогостоящую фазу - переключение адресного пространства. Как обычно, мы наблюдаем очередной компромисс. Увеличив производительность, мы потеряли преимущества изоляции. Все потоки-обработчики имеют доступ к одному и тому же адресному пространству главного процесса. При необходимости иметь разделяемое состояние, код нашего сервера должен быть написан в потоко-безопасной манере.

Накладные расходы, связанные с запуском нового потока на каждое входящее соединение могут быть устранены с помощью введения пула потоков. Как и в случае с пулом процессов, мы можем при старте сервера запустить K потоков и назначать свободный поток из пула каждому новому соединению (пример из конфигурации Apache HTTP Server). Несмотря на то, что мы снова не сможем иметь более K обслуживаемых запросов в один момент времени, размер пула потоков может быть на один-два порядка (то есть сотни, реже тысячи, потоков) выше размера пула процессов при том же объеме накладных расходов на переключение контекста.

Пул потоков достаточно распространенный способ реализации обработки запросов в современном коде. В то же время, современные сервисы могут иметь тысячи и десятки тысяч одновременных подключений, поэтому ситуация с ожиданием клиентов в очереди все еще может возникнуть. Но еще более распространенная ситуация для современных веб-сервисов - это наличие большого количества обращений к сторонним API в процессе построения ответа на очередной запрос. Например, наш сервер может быть под нагрузкой в 100 запросов в секунду и иметь пул из 100 потоков-обработчиков. В среднем, обработка одного запроса занимает 0.01 сек процессорного времени и 0.2 сек ожидания I/O от сторонних сервисов. Таким образом, каждую секунду нам необходимо 0.01 x 100 = 1 сек процессорного времени и 0.2 x 100 = 20 сек ожидания. На 8-ядерной машине мы можем ожидать до 8 сек процессорного времени в секунду, то есть наш процессор загружен лишь на 1/8, а при 100 потоках мы можем иметь до 100 сек ожидания каждую секунду, то есть мы загружены не более, чем на 1/5 от емкости. Получается, что наша конфигурация имеет достаточный запас прочности чтобы справиться даже с пятикратным ростом нагрузки. Но, как только один или несколько сторонных сервисов начинают обрабатывать запросы медленее, чем в среднем 1 сек на запрос, суммарное время ожидания каждую секунду может легко превысить 100 сек. И, несмотря на то, что наш процессор будет все еще загружен лишь на 12.5%, очередь запросов начнет расти. Динамическое увеличение размера пула потоков с последующим его скоращением могло бы быть потенциальным решением возникшей проблемы. Но при росте числа потоков выше некоторого предела, накладные нагрузки на переключение контекста становятся ощутимыми. Таким образом, пул потоков становится неэффективным при наличии интенсивной работы с I/O.

Много клиентов - один поток с асинхронным I/O

Принципиально отличным способом обработки запросов является подход, основанный на асинхронном вводе-выводе. Основная идея заключается в том, что чтение или запись в сокет (и другой ввод-вывод) может являться неблокирующим. Тогда один и тот же процесс (и поток) может инициировать чтение из сокета первого клиента и, пока оно выполняется операционной системой на заднем плане, перейти к обработке запроса от следующего клиента. Таким образом, возможно иметь множество конкурентных запросов, обслуживаемых всего одним потоком:

# python3
import asyncio
import sys

counter = 0

async def run_server(host, port):
  server = await asyncio.start_server(serve_client, host, port)
  await server.serve_forever()

async def serve_client(reader, writer):
  global counter
  cid = counter
  counter += 1  # Потоко-безопасно, так все выполняется в одном потоке
  print(f'Client #{cid} connected')

  request = await read_request(reader)
  if request is None:
    print(f'Client #{cid} unexpectedly disconnected')
  else:
    response = await handle_request(request)
    await write_response(writer, response, cid)

async def read_request(reader, delimiter=b'!'):
  request = bytearray()
  while True:
    chunk = await reader.read(4)
    if not chunk:
      # Клиент преждевременно отключился.
      break

    request += chunk
    if delimiter in request:
      return request

  return None

async def handle_request(request):
  await asyncio.sleep(5)
  return request[::-1]

async def write_response(writer, response, cid):
  writer.write(response)
  await writer.drain()
  writer.close()
  print(f'Client #{cid} has been served')


if __name__ == '__main__':
  asyncio.run(run_server('127.0.0.1', int(sys.argv[1])))

(смотреть на github)

В этот раз структура сервера значительно отличается от всего, что мы успели рассмотреть выше. В примере используются встроенные возможности (как синтаксические, так и функции стандартной библиотеки) Python 3.5+, но подход на основе асинхронного ввода-вывода можно реализовать и без async/await.

Зачастую реализация программы, использующей асинхронный ввод-вывод, строится на основе цикла событий. Это бесконечный цикл, читающий события (представленные в виде функций обратного вызова) из очереди с приоритетами. Приоритетом обычно является время готовности к выполнению. Клиентский код, встретив необходимость чтения или записи в сокет, иницирует операцию I/O и регистрирует в очереди событий колбек, который необходимо вызвать для продолжения обработки запроса в момент, когда сокет будет готов. После чего, уступает управление циклу событий. Цикл событий опрашивает все файловые дескрипторы, ожидающие I/O и на основе этой информации обновляет приоритеты событий в очереди. Затем цикл выбирает самый высокоприоритетный элемент и передает выполнение ему.

Модуль asyncio предоставляет цикл событий "из коробки". Строка asyncio.run(...) запускает наш сервер на выполнение внутри цикла событий. Сервер при этом реализован также этим модулем (asyncio.start_server). Нам же лишь остается определить функцию-обработчик (serve_client), которую asyncio сервер будет вызывать на каждое входящее соединение.

Код, основанный на колбеках обычно достаточно тяжело как писать, так и читать, так как он выглядит нелинейно. При этом колбеки необходимы, чтобы продолжить выполнение кода, прерванного на операции I/O, когда данные станут доступными. С другой стороны, мы знаем, что корутины умеют приостанавливать и возобновлять собственное выполнение в произвольных точках, сохраняя стек. Разумно предположить, что можно воспользоваться этим свойством, приостанавливая выполнения функции и передавая управление циклу событий, встретив необходимость I/O. Для этого Python вводит ключевое слово await, а функция, желающая его использовать, должна быть объявлена через async def, что делает ее корутиной.

Такой код выполняется в одном потоке, при этом возможно обрабатывать большое число одновременных соединений:

Много клиентов - один поток с асинхронным I/O.

Подход отлично справляется с проблемой, описанной выше, когда суммарное ожидание в секунду превышает число потоков в пуле. Практическое ограничение на количество одновременных обработчиков очень высокое (десятки, реже сотни, тысяч), а накладные расходы на запуск новых обработчиков минимальны (сопоставимы с вызовом функции, переключение контекста на уровне операционной системы отсутствует). Данный подход идеально подходит в частности для создания WebSocket-серверов с десятками тысяч одновременных соединений, находящимися в режиме ожидания сообщений бОльшую часть времени.

К сожалению, преимущество подхода является и его недостатком. Так как весь код выполняется в одном потоке, утилизация всех ядер процессора затрудняется. Поэтому к коду, требующему большого числа вычислений необходимо относиться с осторожностью, не запуская его в основном цикле событий.

Другими известными апологетами обработки запросов с использованием асинхронного ввода-вывода являются веб-сервер nginx и программная платформа Node.js.

Гибридный подход

В реальных серверах обычно используется комбинация подходов. Широкоизвестные веб-сервер nginx и Apache HTTP Server в режиме MPM event имеют пул процессов, каждый из которых запускает пул потоков-обработчиков, каждый из которых использует модель обработки на основе асинхронного ввода-вывода.

Из документации Apache HTTP Server:

event (mode) is based on the worker MPM, which implements a hybrid multi-process multi-threaded server. A single control process (the parent) is responsible for launching child processes. Each child process creates a fixed number of server threads as specified in the ThreadsPerChild directive, as well as a listener thread which listens for connections and passes them to a worker thread for processing when they arrive.

Run-time configuration directives are identical to those provided by worker, with the only addition of the AsyncRequestWorkerFactor.

Заключение

Мы рассмотрели способы организации обработки запросов сервером. Примеры кода в статье были намеренно упрощены и не годятся для использования в боевых проектах. На взрослый код, реализующий примеры из статьи, можно посмотреть в стандартной библиотеке Python socketserver.py, а соответствующую документацию изучить здесь socketserver — A framework for network servers. В следующей статье мы ознакомимся с протоколом HTTP и реализуем простейший HTTP-сервер.

Make code not war!

Обучающие Серии

Подпишись на обновления блогa, чтобы не пропустить следующий пост!