Использование ThreadPoolExecutor в Python 3

Потоки Python – это форма параллелизма, которая позволяет программам запускать несколько процедур одновременно. Параллелизм в Python также может быть достигнут с помощью нескольких процессов, но потоки позволяют также ускорить работу приложений, требующих значительного количества операций ввода-вывода.

В качестве примеров операций, связанных с вводом-выводом, можно привести выполнение веб-запросов и чтение данных из файлов. В отличие от операций ввода-вывода, операции с привязкой к CPU (например, выполнение математических расчетов с помощью стандартной библиотеки Python) не получат особой выгоды от потоков Python.

Для выполнения кода в потоке Python 3 предоставляет утилиту ThreadPoolExecutor.

В этом мануале мы покажем, как использовать ThreadPoolExecutor, чтобы оперативно выполнять сетевые запросы. Мы определим функцию, подходящую для вызова внутри потоков, а затем будем использовать ThreadPoolExecutor для выполнения этой функции и обработки полученных результатов.

В этом мануале мы выполним несколько сетевых запросов, чтобы проверить наличие страниц Википедии.

Примечание: Тот факт, что операции ввода-вывода больше выигрывают от потоков, чем операции CPU, связан с особенностью Python – с так называемой глобальной блокировкой интерпретатора.

Требования

Чтобы получить максимальную отдачу от этого мануала, нужно иметь некоторое представление о программировании на Python и о локальной среде Python с установленным пакетом requests.

Рекомендуем просмотреть эти руководства для получения необходимой дополнительной информации:

Чтобы установить пакет requests в вашу локальную среду Python, нужно выполнить эту команду:

pip install --user requests==2.23.0

1: Определение функции для потока

Начнем с определения функции, которую мы хотели бы выполнять с помощью потоков.

Используя nano или другой текстовый редактор, откройте этот файл:

nano wiki_page_function.py

В этом мануале мы напишем функцию, которая определяет, существует ли та или иная страница Википедии:

import requests
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status

Функция get_wiki_page_existence принимает два аргумента: URL-адрес страницы Википедии (wiki_page_url) и время в секундах для ожидания ответа от этого URL-адреса.

get_wiki_page_existence использует пакет requests, чтобы сделать веб-запрос на этот URL. В зависимости от кода состояния ответа функция возвращает строку, которая сообщает, существует ли страница. Различные коды состояния представляют разные результаты HTTP-запроса. Наша процедура предполагает, что код состояния 200 означает, что страница существует в Википедии, а код состояния 404 – что страница не существует.

Читайте также: Коды ошибок HTTP: расшифровка и устранение

Как описано в требованиях к этому мануалу, для запуска этой функции вам потребуется установленный пакет requests.

Теперь давайте попробуем запустить функцию, добавив URL-адрес и вызов функции после get_wiki_page_existence:

. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))

Добавив код, сохраните и закройте файл.

Если мы запустим этот код с помощью:

python wiki_page_function.py

Мы увидим следующий результат:

https://en.wikipedia.org/wiki/Ocean - exists

Вызов функции get_wiki_page_existence с валидным адресом страницы из Википедии возвращает строку, подтверждающую, что страница существует.

Важно! В целом делить объекты или состояния Python между потоками небезопасно, если при этом не уделять особого внимания устранению ошибок параллелизма. При работе с потоками лучше всего определить функцию, которая выполняет одно задание и не передает или не публикует состояние для других потоков. Наша get_wiki_page_existence – пример такой функции.

2: Выполнение функции в потоке с помощью ThreadPoolExecutor

Теперь, когда у нас есть подходящая функция, мы можем использовать ThreadPoolExecutor для выполнения нескольких вызовов этой функции.

Давайте добавим в нашу программу wiki_page_function.py следующий код:

import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [

"https://en.wikipedia.org/wiki/Ocean",


"https://en.wikipedia.org/wiki/Island",


"https://en.wikipedia.org/wiki/this_page_does_not_exist",


"https://en.wikipedia.org/wiki/Shark",


]


with concurrent.futures.ThreadPoolExecutor() as executor:


futures = []


for url in wiki_page_urls:


futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))


for future in concurrent.futures.as_completed(futures):


print(future.result())

Давайте посмотрим, как работает и из чего состоит этот код:

  • concurrent.futures импортируется, чтобы предоставить доступ к ThreadPoolExecutor.
  • Оператор with используется для создания экземпляра ThreadPoolExecutor по имени executor, который будет оперативно очищать потоки после завершения работы.
  • Экземпляру executor передаются четыре задания: по одному для каждого URL-адреса в списке wiki_page_urls.
  • Каждый вызов submit возвращает экземпляр Future, который хранится в списке futures.
  • Функция as_completed ожидает завершения каждого вызова Future get_wiki_page_existence, чтобы ее результат можно было отобразить на экране.

Если мы снова запустим эту программу с помощью команды:

python wiki_page_function.py

Мы увидим следующий результат:

https://en.wikipedia.org/wiki/Island - exists
https://en.wikipedia.org/wiki/Ocean - exists
https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist
https://en.wikipedia.org/wiki/Shark - exists

Этот вывод правильный: три URL-адреса являются действительными страницами Википедии, а один из них (this_page_does_not_exist) – нет. Обратите внимание, ваш вывод может быть упорядочен иначе, чем этот пример. Функция concurrent.futures.as_completed в этом примере возвращает результаты сразу, как только они становятся доступными, независимо от того, в каком порядке были отправлены задачи.

3: Обработка исключений

В предыдущем разделе функция get_wiki_page_existence успешно вернула значение для всех наших вызовов. На этом этапе вы узнаете, как ThreadPoolExecutor может вызывать исключения, генерируемые при вызовах многопоточных функций.

Рассмотрим следующий пример кода:

import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
] with concurrent.futures.ThreadPoolExecutor() as executor:

futures = []


for url in wiki_page_urls:


futures.append(


executor.submit(


get_wiki_page_existence, wiki_page_url=url, timeout=0.00001


)


)


for future in concurrent.futures.as_completed(futures):


try:


print(future.result())


except requests.ConnectTimeout:


print("ConnectTimeout.")

Этот блок кода почти идентичен тому, который мы использовали в разделе 2, но имеет два ключевых отличия:

  • Теперь мы передаем в get_wiki_page_existence параметр timeout=0.00001. Поскольку пакет requests не сможет обработать веб-запрос в Википедию за 0,00001 секунды, он вызовет исключение ConnectTimeout.
  • Затем мы перехватываем исключения ConnectTimeout, вызванные функцией future.result(), и выводим на экран строку каждый раз, когда делаем это.

Если мы снова запустим программу, мы получим следующий результат:

ConnectTimeout.
ConnectTimeout.
ConnectTimeout.
ConnectTimeout.

На экране появятся четыре сообщения ConnectTimeout – по одному для каждого из четырех wiki_page_urls (поскольку ни один запрос не был выполнен за 0,00001 секунды, следовательно, каждый из четырех вызовов get_wiki_page_existence выдал исключение ConnectTimeout).

Итак, если вызов функции, отправленный в ThreadPoolExecutor, выдает исключение, то, как правило, это исключение можно выдать, вызвав Future.result.

Вызывая Future.result для всех отправленных вызовов, ваша программа не пропустит никаких исключений вашей многопоточной функции.

4: Сравнение времени выполнения с потоками и без них

Теперь давайте убедимся, что использование ThreadPoolExecutor действительно ускоряет работу программы.

Для начала давайте попробуем запустить get_wiki_page_existence без потоков:

import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running without threads:")


without_threads_start = time.time()


for url in wiki_page_urls:


print(get_wiki_page_existence(wiki_page_url=url))


print("Without threads time:", time.time() - without_threads_start)

В данном примере кода мы вызываем функцию get_wiki_page_existence с 50 различными URL-адресами страниц Википедии. Мы используем функцию time.time(), чтобы вывести на экран количество секунд, необходимое для запуска программы.

Если мы запустим этот код, мы увидим такой результат:

Running without threads:
https://en.wikipedia.org/wiki/0 - exists
https://en.wikipedia.org/wiki/1 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Without threads time: 5.803015232086182

Записи 2–47 в этом выводе были опущены для краткости.

Количество секунд, указанное после Without threads time, будет отличаться в вашем случае – это нормально, сейчас нам нужно получить точку для сравнения с ThreadPoolExecutor. В данном случае это ~5.803 секунды.

Давайте обработаем те же пятьдесят URL-адресов Википедии с помощью get_wiki_page_existence, но на этот раз с помощью ThreadPoolExecutor:

import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)] print("Running threaded:")

threaded_start = time.time()


with concurrent.futures.ThreadPoolExecutor() as executor:


futures = []


for url in wiki_page_urls:


futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))


for future in concurrent.futures.as_completed(futures):


print(future.result())


print("Threaded time:", time.time() - threaded_start)

Это тот же код, который мы использовали в разделе 2, только с дополнительными операторами print, которые выводят количество секунд, необходимое для выполнения кода.

Если мы снова запустим программу, то увидим следующее:

Running threaded:
https://en.wikipedia.org/wiki/1 - exists
https://en.wikipedia.org/wiki/0 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Threaded time: 1.2201685905456543

Опять же, количество секунд в Threaded time будет отличаться (как и порядок вывода).

Теперь вы можете сравнить время обработки пятидесяти URL-адресов с потоками и без них.

На машине, использованной в этом руководстве, без потоков потребовалось ~5,803 секунды, а с потоками ~ 1,220 секунды. Как видите, с потоками программа работала значительно быстрее.

Заключение

В этом мануале вы узнали, как использовать утилиту ThreadPoolExecutor в Python 3 для оптимизации выполнения кода, привязанного к операциям ввода-вывода. Мы создали функцию для работы в потоках, узнали, как получать выходные данные и исключения, и отследили повышение производительности при использовании потоков.

Узнать больше о других функциях параллелизма, которые предлагает модуль concurrent.futures, можно здесь.

Tags: ,

Добавить комментарий