Использование ThreadPoolExecutor в Python 3
Development, Python | Комментировать запись
Потоки Python – это форма параллелизма, которая позволяет программам запускать несколько процедур одновременно. Параллелизм в Python также может быть достигнут с помощью нескольких процессов, но потоки позволяют также ускорить работу приложений, требующих значительного количества операций ввода-вывода.
В качестве примеров операций, связанных с вводом-выводом, можно привести выполнение веб-запросов и чтение данных из файлов. В отличие от операций ввода-вывода, операции с привязкой к CPU (например, выполнение математических расчетов с помощью стандартной библиотеки Python) не получат особой выгоды от потоков Python.
Для выполнения кода в потоке Python 3 предоставляет утилиту ThreadPoolExecutor.
В этом мануале мы покажем, как использовать ThreadPoolExecutor, чтобы оперативно выполнять сетевые запросы. Мы определим функцию, подходящую для вызова внутри потоков, а затем будем использовать ThreadPoolExecutor для выполнения этой функции и обработки полученных результатов.
В этом мануале мы выполним несколько сетевых запросов, чтобы проверить наличие страниц Википедии.
Примечание: Тот факт, что операции ввода-вывода больше выигрывают от потоков, чем операции CPU, связан с особенностью Python – с так называемой глобальной блокировкой интерпретатора.
Требования
Чтобы получить максимальную отдачу от этого мануала, нужно иметь некоторое представление о программировании на Python и о локальной среде Python с установленным пакетом requests.
Рекомендуем просмотреть эти руководства для получения необходимой дополнительной информации:
Чтобы установить пакет requests в вашу локальную среду Python, нужно выполнить эту команду:
pip install --user requests==2.23.0
1: Определение функции для потока
Начнем с определения функции, которую мы хотели бы выполнять с помощью потоков.
Используя nano или другой текстовый редактор, откройте этот файл:
nano wiki_page_function.py
В этом мануале мы напишем функцию, которая определяет, существует ли та или иная страница Википедии:
import requests
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
Функция get_wiki_page_existence принимает два аргумента: URL-адрес страницы Википедии (wiki_page_url) и время в секундах для ожидания ответа от этого URL-адреса.
get_wiki_page_existence использует пакет requests, чтобы сделать веб-запрос на этот URL. В зависимости от кода состояния ответа функция возвращает строку, которая сообщает, существует ли страница. Различные коды состояния представляют разные результаты HTTP-запроса. Наша процедура предполагает, что код состояния 200 означает, что страница существует в Википедии, а код состояния 404 – что страница не существует.
Читайте также: Коды ошибок HTTP: расшифровка и устранение
Как описано в требованиях к этому мануалу, для запуска этой функции вам потребуется установленный пакет requests.
Теперь давайте попробуем запустить функцию, добавив URL-адрес и вызов функции после get_wiki_page_existence:
. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))
Добавив код, сохраните и закройте файл.
Если мы запустим этот код с помощью:
python wiki_page_function.py
Мы увидим следующий результат:
https://en.wikipedia.org/wiki/Ocean - exists
Вызов функции get_wiki_page_existence с валидным адресом страницы из Википедии возвращает строку, подтверждающую, что страница существует.
Важно! В целом делить объекты или состояния Python между потоками небезопасно, если при этом не уделять особого внимания устранению ошибок параллелизма. При работе с потоками лучше всего определить функцию, которая выполняет одно задание и не передает или не публикует состояние для других потоков. Наша get_wiki_page_existence – пример такой функции.
2: Выполнение функции в потоке с помощью ThreadPoolExecutor
Теперь, когда у нас есть подходящая функция, мы можем использовать ThreadPoolExecutor для выполнения нескольких вызовов этой функции.
Давайте добавим в нашу программу wiki_page_function.py следующий код:
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
for future in concurrent.futures.as_completed(futures):
print(future.result())
Давайте посмотрим, как работает и из чего состоит этот код:
- concurrent.futures импортируется, чтобы предоставить доступ к ThreadPoolExecutor.
- Оператор with используется для создания экземпляра ThreadPoolExecutor по имени executor, который будет оперативно очищать потоки после завершения работы.
- Экземпляру executor передаются четыре задания: по одному для каждого URL-адреса в списке wiki_page_urls.
- Каждый вызов submit возвращает экземпляр Future, который хранится в списке futures.
- Функция as_completed ожидает завершения каждого вызова Future get_wiki_page_existence, чтобы ее результат можно было отобразить на экране.
Если мы снова запустим эту программу с помощью команды:
python wiki_page_function.py
Мы увидим следующий результат:
https://en.wikipedia.org/wiki/Island - exists
https://en.wikipedia.org/wiki/Ocean - exists
https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist
https://en.wikipedia.org/wiki/Shark - exists
Этот вывод правильный: три URL-адреса являются действительными страницами Википедии, а один из них (this_page_does_not_exist) – нет. Обратите внимание, ваш вывод может быть упорядочен иначе, чем этот пример. Функция concurrent.futures.as_completed в этом примере возвращает результаты сразу, как только они становятся доступными, независимо от того, в каком порядке были отправлены задачи.
3: Обработка исключений
В предыдущем разделе функция get_wiki_page_existence успешно вернула значение для всех наших вызовов. На этом этапе вы узнаете, как ThreadPoolExecutor может вызывать исключения, генерируемые при вызовах многопоточных функций.
Рассмотрим следующий пример кода:
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(
executor.submit(
get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
)
)
for future in concurrent.futures.as_completed(futures):
try:
print(future.result())
except requests.ConnectTimeout:
print("ConnectTimeout.")
Этот блок кода почти идентичен тому, который мы использовали в разделе 2, но имеет два ключевых отличия:
- Теперь мы передаем в get_wiki_page_existence параметр timeout=0.00001. Поскольку пакет requests не сможет обработать веб-запрос в Википедию за 0,00001 секунды, он вызовет исключение ConnectTimeout.
- Затем мы перехватываем исключения ConnectTimeout, вызванные функцией future.result(), и выводим на экран строку каждый раз, когда делаем это.
Если мы снова запустим программу, мы получим следующий результат:
ConnectTimeout.
ConnectTimeout.
ConnectTimeout.
ConnectTimeout.
На экране появятся четыре сообщения ConnectTimeout – по одному для каждого из четырех wiki_page_urls (поскольку ни один запрос не был выполнен за 0,00001 секунды, следовательно, каждый из четырех вызовов get_wiki_page_existence выдал исключение ConnectTimeout).
Итак, если вызов функции, отправленный в ThreadPoolExecutor, выдает исключение, то, как правило, это исключение можно выдать, вызвав Future.result.
Вызывая Future.result для всех отправленных вызовов, ваша программа не пропустит никаких исключений вашей многопоточной функции.
4: Сравнение времени выполнения с потоками и без них
Теперь давайте убедимся, что использование ThreadPoolExecutor действительно ускоряет работу программы.
Для начала давайте попробуем запустить get_wiki_page_existence без потоков:
import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)
В данном примере кода мы вызываем функцию get_wiki_page_existence с 50 различными URL-адресами страниц Википедии. Мы используем функцию time.time(), чтобы вывести на экран количество секунд, необходимое для запуска программы.
Если мы запустим этот код, мы увидим такой результат:
Running without threads:
https://en.wikipedia.org/wiki/0 - exists
https://en.wikipedia.org/wiki/1 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Without threads time: 5.803015232086182
Записи 2–47 в этом выводе были опущены для краткости.
Количество секунд, указанное после Without threads time, будет отличаться в вашем случае – это нормально, сейчас нам нужно получить точку для сравнения с ThreadPoolExecutor. В данном случае это ~5.803 секунды.
Давайте обработаем те же пятьдесят URL-адресов Википедии с помощью get_wiki_page_existence, но на этот раз с помощью ThreadPoolExecutor:
import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
print("Running threaded:")
threaded_start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
for future in concurrent.futures.as_completed(futures):
print(future.result())
print("Threaded time:", time.time() - threaded_start)
Это тот же код, который мы использовали в разделе 2, только с дополнительными операторами print, которые выводят количество секунд, необходимое для выполнения кода.
Если мы снова запустим программу, то увидим следующее:
Running threaded:
https://en.wikipedia.org/wiki/1 - exists
https://en.wikipedia.org/wiki/0 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Threaded time: 1.2201685905456543
Опять же, количество секунд в Threaded time будет отличаться (как и порядок вывода).
Теперь вы можете сравнить время обработки пятидесяти URL-адресов с потоками и без них.
На машине, использованной в этом руководстве, без потоков потребовалось ~5,803 секунды, а с потоками ~ 1,220 секунды. Как видите, с потоками программа работала значительно быстрее.
Заключение
В этом мануале вы узнали, как использовать утилиту ThreadPoolExecutor в Python 3 для оптимизации выполнения кода, привязанного к операциям ввода-вывода. Мы создали функцию для работы в потоках, узнали, как получать выходные данные и исключения, и отследили повышение производительности при использовании потоков.
Узнать больше о других функциях параллелизма, которые предлагает модуль concurrent.futures, можно здесь.
Tags: Python, Python 3