Як застосовувати Linux epoll у Python

У статті описується:

  • Приклади використання блокуючих сокетів
  • Переваги асинхронних сокетів і Linux epoll
  • Приклади асинхронного використання сокетів через epoll
  • Питання продуктивності
  • Початковий код

Введення

З версії 2.6 Python включає API для роботи з Linux бібліотекою epoll. Ця стаття коротко демонструє дане API прикладами коду на Python 3.

Від перекладача.

Я намагався не зловживати англомовними термінами наскільки це можливо. Так що «register/unregister» стали «підпискою/відпискою», «print to the console» - «висновком на консоль». «Production server» вирішив перевести як «навантажений сервер», оскільки нічого кращого, ніж «сервер на продакшені» в голову не спадає. «Thread» переклав як «потік», а не «нитку».

Назви подій, режимів і прапорів вирішив взагалі не перекладати, даючи лише одноразовий приклад можливого перекладу.

Хоч і написано, що код для Python 3, все прекрасно працює і на Python 2.6.

Приклади використання блокуючих сокетів

Перший приклад це простий Python 3.0 сервер, який слухає порт 8080 на предмет вхідних HTTP запитів, виводить їх на консоль, і відправляє відповідне HTTP повідомлення клієнту.

  • Рядок 9: Створення серверного сокету.
  • Рядок 10: Дозволяємо виконувати bind () в рядку 11 навіть у разі, якщо інша програма недавно слухала той же порт. Без цього, програма не зможе працювати з портом протягом 1-2 хвилин після закінчення роботи з тим же портом в раніше запущеній програмі.
  • Рядок 11: Вішаємо (bind'ім) серверний сокет на порт 8080 для всіх доступних IPv4 адрес цієї машини.
  • Рядок 12: Вказуємо серверному сокету почати прийом вхідних з'єднань від клієнтів.
  • Рядок 14: Програма буде зупинятися в цій точці до отримання вхідного з'єднання. Коли це станеться, серверний сокет створить новий сокет, який буде використовуватися на цій машині для зв'язку з клієнтом. Цей новий сокет представлений об'єктом clientconnection, який повертається викликом accept (). Об'єкт address містить IP адресу і номер порту віддаленої машини.
  • Рядки 15-17: Формуємо дані, які буде надіслано клієнтові для завершення HTTP запиту. Протокол HTTP описано тут.
  • Рядок 18: Виводимо запит до консоля як перевірку правильності дії.
  • Рядок 19: Відсилаємо відповідь клієнту.
  • Рядки 20-22: Закриваємо з'єднання з клієнтом так само як і слухаючий серверний сокет.

Офіційний HOWTO містить більш детальний опис програмування сокетів в Python.

Приклад 1

Copy Source | Copy HTML

  1. import socket
  2. EOL1 = b'\n\n'
  3. EOL2 = b'\n\r\n'
  4. response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
  5. response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
  6. response += b'Hello, world!'
  7. serversocket = socket.socket(socket.AF_INET, socket. SOCK_STREAM)
  8. setsockopt(socket.SOL_SOCKET, socket. SO_REUSEADDR, 1)
  9. bind(('0.0.0.0', 8080))
  10. listen(1)
  11. connectiontoclient, address = serversocket.accept()
  12. request = b''
  13. while EOL1 not in request and EOL2 not in request:
  14. request += connectiontoclient.recv(1024)
  15. print(request.decode())
  16. send(response)
  17. close()
  18. close()

Приклад 2 додає цикл у 15 рядку для повторної обробки клієнтських з'єднань, яка виконується до власного переривання (наприклад, з клавіатури). Це зрозуміліше показує, що серверний сокет ніколи не використовується для обміну даними з клієнтом. Швидше, він лише приймає з'єднання від клієнта і створює новий сокет, який вже і використовується для зв'язку.

Блок finally в рядках 23-24 потрібен для того, щоб слухаючий серверний сокет закривався в будь-якому випадку, навіть при виникненні помилок.

Приклад 2

Copy Source | Copy HTML

  1. import socket
  2. EOL1 = b'\n\n'
  3. EOL2 = b'\n\r\n'
  4. response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
  5. response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
  6. response += b'Hello, world!'
  7. serversocket = socket.socket(socket.AF_INET, socket. SOCK_STREAM)
  8. setsockopt(socket.SOL_SOCKET, socket. SO_REUSEADDR, 1)
  9. bind(('0.0.0.0', 8080))
  10. listen(1)
  11. try:
  12. while True:
  13. connectiontoclient, address = serversocket.accept()
  14. request = b''
  15. while EOL1 not in request and EOL2 not in request:
  16. request += connectiontoclient.recv(1024)
  17. print('-'*40 + '\n' + request.decode()[:-2])
  18. send(response)
  19. close()
  20. finally:
  21. close()

Переваги асинхронних сокетів і Linux epoll

Сокети, показані в прикладі 2 називаються блокуючими сокетами, тому що програма на Python призупиняє своє виконання до приходу події. Виклик accept () у рядку 16 блокується до отримання з'єднання від клієнта. Виклик recv () у рядку 19 блокується до отримання даних від клієнта (або поки не буде даних для прийому). Виклик send () в рядку 21 блокується до того, як всі дані, що відправляються клієнту, не будуть додані в чергу відправки Linux.

Коли програма використовує блокуючі сокети, вона часто використовує окремий потік (або навіть процес) для виконання взаємодії з кожним з таких сокетів. Основний потік програми містить слухаючий серверний сокет, який приймає вхідні з'єднання від клієнтів. Він приймає ці з'єднання по одному за раз, передаючи новостворений клієнтський сокет в окремий потік, який буде взаємодіяти з клієнтом. Оскільки кожен з цих потоків пов'язаний тільки з одним клієнтом, то припустимо, що в деяких місцях відбуваються мережеві блокування. Ці блокування не заважають іншим потокам виконувати їх завдання.

Застосування блокуючих сокетів з безліччю потоків призводить до простого коду, але пов'язане з серією недоліків. Важко бути впевненим у коректному спільному доступі з потоків до розділених ресурсів. І цей стиль програмування мало ефективний на комп'ютерах з єдиним CPU.

Проблема C10K обговорює альтернативні варіанти обробки безлічі конкуруючих сокетів. Один з них полягає у використанні асинхронних сокетів. Такі сокети не блокуються до приходу події. Навпаки, програма виконує дію над асинхронним сокетом і відразу ж отримує повідомлення про успішність або помилку. Ця інформація дозволяє програмі вирішувати як вчинити. Так як асинхронні сокети є не блокуючими, то немає необхідності у безлічі потоків виконання. Всю роботу можна виконати в єдиному потоці. Такий однопоточний підхід має свої проблеми і не є хорошим вибором для багатьох програм. Але він може бути скомбінований з багатопоточним підходом: асинхронні сокети, застосовані в єдиному потоці, можуть бути використані для мережевої складової сервера, а потоки можна використовувати для доступу до зовнішніх блокуючих ресурсів, наприклад баз даних.

Linux 2.6 має ряд механізмів для управління асинхронними сокетами, три з яких представлені в Python API через select, poll і epoll. epoll і poll краще ніж select, тому що програмі на Python не потрібно стежити за всіма цікавими подіями в сокеті. Замість цього можна покластися на операційну систему повідомляти про те, які події виникли на яких сокетах. А epoll в свою чергу краще poll, тому що він не вимагає від операційної системи перевірки всіх сокетів на цікаві події кожен раз, коли це запитується Python програмою. Швидше, при запиті від Python, Linux перевіряє, чи відбулися ці події, і повертає список подій. Отже, epoll більш ефективний і масштабований механізм для великого числа (тисяч) одночасних з'єднань, як показано на цих графіках.

Приклади асинхронного використання сокетів через epoll

Програми, які використовують epoll, часто працюють за наступним принципом:

  1. Створюється epoll об'єкт
  2. epoll об'єкту вказується спостерігати за певними подіями на певних сокетах
  3. epoll об'єкта запитується на яких сокетах відбулися зазначені події з моменту попереднього опитування
  4. Виконуються деякі дії на цих сокетах
  5. epoll об'єкту вказується змінити список сокетів та/або спостережених подій
  6. Повторюються кроки з 3 по 5 до завершення
  7. Знищення об'єкта epoll

Приклад 3 повторює функціонал прикладу 2, що використовує асинхронні сокети. Програма складніша, тому що один потік по черзі взаємодіє з безліччю клієнтів.

  • Рядок 1: Додаток select містить функціонал epoll.
  • Рядок 13: Блокуючі за замовчуванням сокети потрібно використовувати в неблокуючому (асинхронному) режимі.
  • Рядок 15: Створення epoll об'єкта.
  • Рядок 16: Підписуємося на події читання на серверному сокеті. Подія читання відбувається в той момент, коли серверний сокет приймає з'єднання.
  • Рядок 19: Словник з'єднань показує файлові дескриптори (цілі числа) у відповідні об'єкти мережевих з'єднань.
  • Рядок 21: Запит до epoll об'єкта для з'ясування, чи відбулися якісь з очікуваних подій. Параметр «1» вказує, що ми готові чекати події до 1 секунди. Якщо будь-які з цікавих подій відбудуться раніше, то запит відразу поверне список цих подій.
  • Рядок 22: Події повертаються послідовністю кортежів (fileno, event code). fileno це синонім файлового дескриптора і завжди є цілим числом.
  • Рядок 23: Якщо на серверному сокеті сталася подія читання, ви можете створити новий клієнтський сокет.
  • Рядок 25: Встановлюємо новий сокет у неблокуючий режим.
  • Рядок 26: Підписуємося на події читання (EPOLLIN) на новому сокеті.
  • Рядок 31: Якщо на клієнтському сокеті відбулася подія читання, то читаємо нові дані, що прийшли від клієнта.
  • Рядок 33: Після отримання запиту відписуємося від подій читання і підписуємося на події запису (EPOLLOUT). Ці події відбуваються, коли можна надіслати дані відповіді клієнту.
  • Рядок 34: Друкуємо запит, показуючи, що незважаючи на перемикання між клієнтами, дані можна зібрати воєдино і обробити як єдине повідомлення.
  • Рядок 35: Якщо на клієнтському сокеті сталася подія запису, то можна спробувати відправити нові дані клієнту.
  • Рядки 36-38: Надсилання даних відповіді порцією за раз, поки всю відповідь не буде передано операційній системі для надсилання.
  • Рядок 39: Після повної надсилання відповіді відписуємося від подальших подій читання або запису.
  • Рядок 40: Виклик shutdown сокету не обов'язковий для явного закриття з'єднання. Цей приклад використовує його, щоб змусити клієнта завершити зв'язок першим. Виклик shutdown повідомляє клієнту, що більше не буде відправлено або отримано даних і що йому варто по хорошому закрити сокет зі свого боку.
  • Рядок 41: Подія HUP (hang-up, зависання) повідомляє, що клієнтський сокет відключився (був закритий), тобто слід його закрити. Немає необхідності підписуватися на події HUP. Вони завжди відбуваються на сокетах, які підписані в epoll об'єкті.
  • Рядок 42: Відписуємося від подій в даному сокеті.
  • Рядок 43: Закриваємо сокет.
  • Рядки 18-45: Блок try-catch використовується в цьому прикладі тому, що програма може бути перервана з клавіатури.
  • Рядки 46-48: Відкриті сокети не потрібно закривати, тому що Python закриває їх при завершенні роботи програми. Однак явне закриття - це хороша практика.

Приклад 3

Copy Source | Copy HTML

  1. import socket, select
  2. EOL1 = b'\n\n'
  3. EOL2 = b'\n\r\n'
  4. response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
  5. response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
  6. response += b'Hello, world!'
  7. serversocket = socket.socket(socket.AF_INET, socket. SOCK_STREAM)
  8. setsockopt(socket.SOL_SOCKET, socket. SO_REUSEADDR, 1)
  9. bind(('0.0.0.0', 8080))
  10. listen(1)
  11. setblocking( 0)
  12. epoll = select.epoll()
  13. register(serversocket.fileno(), select. EPOLLIN)
  14. try:
  15. connections = {}; requests = {}; responses = {}
  16. while True:
  17. events = epoll.poll(1)
  18. for fileno, event in events:
  19. if fileno == serversocket.fileno():
  20. connection, address = serversocket.accept()
  21. setblocking( 0)
  22. register(connection.fileno(), select. EPOLLIN)
  23. connections[connection.fileno()] = connection
  24. requests[connection.fileno()] = b''
  25. responses[connection.fileno()] = response
  26. elif event & select. EPOLLIN:
  27. requests[fileno] += connections[fileno].recv(1024)
  28. if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
  29. modify(fileno, select. EPOLLOUT)
  30. print('-'*40 + '\n' + requests[fileno].decode()[:-2])
  31. elif event & select. EPOLLOUT:
  32. byteswritten = connections[fileno].send(responses[fileno])
  33. responses[fileno] = responses[fileno][byteswritten:]
  34. if len(responses[fileno]) ==  0:
  35. modify(fileno,  0)
  36. connections[fileno].shutdown(socket.SHUT_RDWR)
  37. elif event & select. EPOLLHUP:
  38. unregister(fileno)
  39. connections[fileno].close()
  40. del connections[fileno]
  41. finally:
  42. unregister(serversocket.fileno())
  43. close()
  44. close()

epoll має два режими роботи, звані ініційований фронтом (edge-triggered) і ініційований рівнем (level-triggered). У режимі edge-triggered виклик epoll.poll () поверне подію тільки після того, як події читання або записи відбудуться на сокеті. Програма повинна обробити всі дані, пов'язані з цією подією, без повторних викликів epoll.poll (). Коли дані від певної події вичерпуються, додаткові спроби роботи з сокетом призводитимуть до винятків. Навпаки, в режимі level-triggered, повторні виклики epoll.poll () будуть давати повторні повідомлення про цікаві події, поки не будуть оброблені всі дані, пов'язані з подіями. Жодних винятків не виникає при нормальній роботі в режимі level-triggered.

Для прикладу припустимо, що серверний сокет був підписаний в epoll об'єкті на події читання. У режимі edge-triggered програмі слід викликати accept () для прийому нових з'єднань поки не відбудеться виключення socket.error. В режимі level-triggered може бути зроблений єдиний виклик accept (), а потім epoll об'єкт може бути запитаний знову для наступних подій в черзі.

Приклад 3 використовує режим level-triggered, який є типовим режимом. Приклад 4 демонструє як використовувати режим edge-triggered. У рядках 25, 36 і 45 вводяться цикли, які працюю поки не виникне виняток (або стане відомо, що всі дані оброблені). Рядки 32, 38 і 48 ловлять винятки. Нарешті, рядки 16, 28, 41 і 51 додають маску EPOLLET, яка задає режим edge-triggered.

Приклад 4

Copy Source | Copy HTML

  1. import socket, select
  2. EOL1 = b'\n\n'
  3. EOL2 = b'\n\r\n'
  4. response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
  5. response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
  6. response += b'Hello, world!'
  7. serversocket = socket.socket(socket.AF_INET, socket. SOCK_STREAM)
  8. setsockopt(socket.SOL_SOCKET, socket. SO_REUSEADDR, 1)
  9. bind(('0.0.0.0', 8080))
  10. listen(1)
  11. setblocking( 0)
  12. epoll = select.epoll()
  13. register(serversocket.fileno(), select. EPOLLIN | select. EPOLLET)
  14. try:
  15. connections = {}; requests = {}; responses = {}
  16. while True:
  17. events = epoll.poll(1)
  18. for fileno, event in events:
  19. if fileno == serversocket.fileno():
  20. try:
  21. while True:
  22. connection, address = serversocket.accept()
  23. setblocking( 0)
  24. register(connection.fileno(), select. EPOLLIN | select. EPOLLET)
  25. connections[connection.fileno()] = connection
  26. requests[connection.fileno()] = b''
  27. responses[connection.fileno()] = response
  28. except socket.error:
  29. pass
  30. elif event & select. EPOLLIN:
  31. try:
  32. while True:
  33. requests[fileno] += connections[fileno].recv(1024)
  34. except socket.error:
  35. pass
  36. if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
  37. modify(fileno, select. EPOLLOUT | select. EPOLLET)
  38. print('-'*40 + '\n' + requests[fileno].decode()[:-2])
  39. elif event & select. EPOLLOUT:
  40. try:
  41. while len(responses[fileno]) >  0:
  42. byteswritten = connections[fileno].send(responses[fileno])
  43. responses[fileno] = responses[fileno][byteswritten:]
  44. except socket.error:
  45. pass
  46. if len(responses[fileno]) ==  0:
  47. modify(fileno, select. EPOLLET)
  48. connections[fileno].shutdown(socket.SHUT_RDWR)
  49. elif event & select. EPOLLHUP:
  50. unregister(fileno)
  51. connections[fileno].close()
  52. del connections[fileno]
  53. finally:
  54. unregister(serversocket.fileno())
  55. close()
  56. close()

При всій схожості, режим level-triggered часто застосовується при портуванні програм, що використовують механізми select або poll, тоді як режим edge-triggered може застосовуватися програмістом у разі, коли немає потреби в такій підтримці управління станами подій з боку операційної системи.

На додаток до цих двох режимів, сокети також можна підписати в epoll на подію EPOLLONESHOT. Якщо ви використовуєте цей параметр, подія коректна лише для одноразового виклику epoll.poll (), після якого вона автоматично видаляється зі списку подій, що спостерігаються.

Питання продуктивності

Довжина черги з "єднань з сервером

У 12ому рядку всіх прикладів показано виклик методу serversocket.listen (). Параметром для цього методу є довжина черги з'єднань до сервера (listen backlog). Він повідомляє операційну систему про максимальний приймуваний число TCP/IP підключень, які можуть бути розміщені в системній черзі до того, як їх прийме Python програма. Кожен раз, коли Python викликає accept () на серверному сокеті, одне з з'єднань видаляється з черги і визволене місце може бути використано для іншого вхідного з'єднання. При заповненій черзі, нові вхідні підключення мовчки ігноруються, що призводить до непотрібних затримок на клієнтській стороні. Навантажений сервер зазвичай обробляє сотні і тисячі одночасних з'єднань, так що значення 1 буде неадекватним. Як приклад, при використанні ab для навантажувального тестування вищенаведених прикладів з сотнею одночасних HTTP 1.0 клієнтів, довжина черги менше 50 часом може призвести до сильного падіння продуктивності.

Параметри TCP

Параметр TCP_CORK може блокувати (bottle up) відправку даних, поки вони не будуть готові. Цей параметр, проілюстрований у рядках 34 і 40 прикладу 5, може бути корисним для HTTP сервера, який використовує конвеєр HTTP/1.1.

Приклад 5

Copy Source | Copy HTML

  1. import socket, select
  2. EOL1 = b'\n\n'
  3. EOL2 = b'\n\r\n'
  4. response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
  5. response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
  6. response += b'Hello, world!'
  7. serversocket = socket.socket(socket.AF_INET, socket. SOCK_STREAM)
  8. setsockopt(socket.SOL_SOCKET, socket. SO_REUSEADDR, 1)
  9. bind(('0.0.0.0', 8080))
  10. listen(1)
  11. setblocking( 0)
  12. epoll = select.epoll()
  13. register(serversocket.fileno(), select. EPOLLIN)
  14. try:
  15. connections = {}; requests = {}; responses = {}
  16. while True:
  17. events = epoll.poll(1)
  18. for fileno, event in events:
  19. if fileno == serversocket.fileno():
  20. connection, address = serversocket.accept()
  21. setblocking( 0)
  22. register(connection.fileno(), select. EPOLLIN)
  23. connections[connection.fileno()] = connection
  24. requests[connection.fileno()] = b''
  25. responses[connection.fileno()] = response
  26. elif event & select. EPOLLIN:
  27. requests[fileno] += connections[fileno].recv(1024)
  28. if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
  29. modify(fileno, select. EPOLLOUT)
  30. connections[fileno].setsockopt(socket.IPPROTO_TCP, socket. TCP_CORK, 1)
  31. print('-'*40 + '\n' + requests[fileno].decode()[:-2])
  32. elif event & select. EPOLLOUT:
  33. byteswritten = connections[fileno].send(responses[fileno])
  34. responses[fileno] = responses[fileno][byteswritten:]
  35. if len(responses[fileno]) ==  0:
  36. connections[fileno].setsockopt(socket.IPPROTO_TCP, socket. TCP_CORK,  0)
  37. modify(fileno,  0)
  38. connections[fileno].shutdown(socket.SHUT_RDWR)
  39. elif event & select. EPOLLHUP:
  40. unregister(fileno)
  41. connections[fileno].close()
  42. del connections[fileno]
  43. finally:
  44. unregister(serversocket.fileno())
  45. close()
  46. close()

З іншого боку, опція TCP_NODELAY повідомляє системі, що будь-які дані, передані в socket.send (), слід відразу ж відправити клієнту без буферизації операційною системою. Цей параметр, проілюстрований у рядку 14 прикладу 6, може бути корисним для клієнтів SSH та інших програм «реального часу».

Приклад 6

Copy Source | Copy HTML

  1. import socket, select
  2. EOL1 = b'\n\n'
  3. EOL2 = b'\n\r\n'
  4. response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
  5. response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
  6. response += b'Hello, world!'
  7. serversocket = socket.socket(socket.AF_INET, socket. SOCK_STREAM)
  8. setsockopt(socket.SOL_SOCKET, socket. SO_REUSEADDR, 1)
  9. bind(('0.0.0.0', 8080))
  10. listen(1)
  11. setblocking( 0)
  12. <

COM_SPPAGEBUILDER_NO_ITEMS_FOUND