Как сделать код Python параллельным с помощью 3х строк
Я решил поделиться одним из моих любимых приемов в стандартной библиотеке Python на примере. Весь код работает на Python 3.2+ без внешних пакетов.
Начальная проблема
Допустим, у вас есть тысяча URL-адресов для обработки / загрузки / проверки, поэтому вам нужно выполнить как можно больше вызовов HTTP GET и получить тело каждого ответа.
Вот способ сделать это:
import http.client import socket def get_it(url): try: connection = http.client.HTTPSConnection(url, timeout=2) connection.request("GET", "/") response = connection.getresponse() return response.read() except socket.timeout: pass urls = [ "www.google.com", "www.youtube.com", "www.wikipedia.org", "www.reddit.com", "www.httpbin.org" ] * 200 for url in urls: get_it(url)
Я бы не использовал стандартную библиотеку в качестве HTTP-клиента, но для этого поста сгодится
Как видите, здесь нет магии. Python перебирает 1000 URL-адресов и вызывает каждый из них.
Эта вещь на моем компьютере занимает 2% процессорного времени и проводит большую часть времени в ожидании ввода / вывода:
$ time python io_bound_serial.py 20.67s user 5.37s system 855.03s real 24292kB mem
Это продолжается примерно 14 минут. Мы можем сделать лучше.
Покажи мне трюк!
from concurrent.futures import ThreadPoolExecutor as PoolExecutor import http.client import socket def get_it(url): try: # Всегда устанавливается тайм - аут при подключении к внешнему серверу connection = http.client.HTTPSConnection(url, timeout=2) connection.request("GET", "/") response = connection.getresponse() return response.read() except socket.timeout: # В реальном сценарии вы, вероятно, делать вещи, если # сокет уходит в тайм-аут pass urls = [ "www.google.com", "www.youtube.com", "www.wikipedia.org", "www.reddit.com", "www.httpbin.org" ] * 200 with PoolExecutor(max_workers=4) as executor: for _ in executor.map(get_it, urls): pass
Посмотрим, что изменилось:
# импортировать новый API для создания пула потоков from concurrent.futures import ThreadPoolExecutor as PoolExecutor # создать пул из 4 потоков with PoolExecutor(max_workers=4) as executor: # распределяем 1000 URL-адресов по четырем потокам в пуле # _ - это тело каждой страницы, которую я сейчас игнорирую for _ in executor.map(get_it, urls): pass
Итак, из 3 строк кода мы превратили медленную последовательную задачу в параллельную, занимающую всего 5 минут:
$ time python io_bound_threads.py 21.40s user 6.10s system 294.07s real 31784kB mem
Мы поднялись с 855,03 до 294,07, увеличившись в 2,9 раза!
Подожди, есть еще
Самое замечательное в этом новом API состоит в том, что вы можете заменить
from concurrent.futures import ThreadPoolExecutor as PoolExecutor
на
from concurrent.futures import ProcessPoolExecutor as PoolExecutor
сказав Python использовать процессы вместо потоков. Из любопытства посмотрим, что происходит со временем выполнения:
$ time python io_bound_processes.py 22.19s user 6.03s system 270.28s real 23324kB mem
На 20 секунд меньше, чем у потоковой версии, не сильно отличается. Имейте в виду, что это ненаучные эксперименты, и я использую компьютер во время работы этих скриптов.
Бонусный контент
Мой компьютер имеет 4 ядра, давайте посмотрим, что происходит с многопоточными версиями, увеличивая количество рабочих потоков:
# 6 threads 20.48s user 5.19s system 155.92s real 35876kB mem # 8 threads 23.48s user 5.55s system 178.29s real 40472kB mem # 16 threads 23.77s user 5.44s system 119.69s real 58928kB mem # 32 threads 21.88s user 4.81s system 119.26s real 96136kB mem
Три вещи, на которые следует обратить внимание: занятость ОЗУ явно увеличивается, мы достигаем 16 потоков, а при 16 потоках мы более чем в 7 раз быстрее первоночальной версии.
Если вы не распознаете вывод времени, это потому, что я назвал его так:
time='gtime -f '\''%Us user %Ss system %es real %MkB mem -- %C'\'
где gtime установлен brew install gnu-time
Выводы
Я думаю, что ThreadPoolExecutor и ProcessPoolExecutor - это супер классные дополнения к стандартной библиотеке Python. Вы могли бы сделать в основном все, что они делают со «старыми» threading и multiprocessing и очередями FIFO, но этот API намного лучше.