DevGang
Авторизоваться

Как сделать код Python параллельным с помощью 3х строк

Я решил поделиться одним из моих любимых приемов в стандартной библиотеке Python на примере. Весь код работает на Python 3.2+ без внешних пакетов.

Начальная проблема

Допустим, у вас есть тысяча URL-адресов для обработки / загрузки / проверки, поэтому вам нужно выполнить как можно больше вызовов HTTP GET и получить тело каждого ответа.

Вот способ сделать это:

import http.client
import socket

def get_it(url):
   try:
       connection = http.client.HTTPSConnection(url, timeout=2)
       connection.request("GET", "/")
       response = connection.getresponse()
       return response.read()
   except socket.timeout:
       pass

urls = [
   "www.google.com",
   "www.youtube.com",
   "www.wikipedia.org",
   "www.reddit.com",
   "www.httpbin.org"
] * 200

for url in urls:
   get_it(url)

Я бы не использовал стандартную библиотеку в качестве HTTP-клиента, но для этого поста сгодится

Как видите, здесь нет магии. Python перебирает 1000 URL-адресов и вызывает каждый из них.

Эта вещь на моем компьютере занимает 2% процессорного времени и проводит большую часть времени в ожидании ввода / вывода:

$ time python io_bound_serial.py
20.67s user 5.37s system 855.03s real 24292kB mem

Это продолжается примерно 14 минут. Мы можем сделать лучше.

Покажи мне трюк!

from concurrent.futures import ThreadPoolExecutor as PoolExecutor
import http.client
import socket

def get_it(url):
   try:
       # Всегда устанавливается тайм - аут при подключении к внешнему серверу
       connection = http.client.HTTPSConnection(url, timeout=2)
       connection.request("GET", "/")
       response = connection.getresponse()
       return response.read()
   except socket.timeout:
       # В реальном сценарии вы, вероятно, делать вещи, если
       # сокет уходит в тайм-аут
       pass

urls = [
   "www.google.com",
   "www.youtube.com",
   "www.wikipedia.org",
   "www.reddit.com",
   "www.httpbin.org"
] * 200

with PoolExecutor(max_workers=4) as executor:
   for _ in executor.map(get_it, urls):
       pass

Посмотрим, что изменилось:

# импортировать новый API для создания пула потоков 
from concurrent.futures import ThreadPoolExecutor as PoolExecutor

# создать пул из 4 потоков 
with PoolExecutor(max_workers=4) as executor:

   # распределяем 1000 URL-адресов по четырем потокам в пуле 
   # _ - это тело каждой страницы, которую я сейчас игнорирую 
   for _ in executor.map(get_it, urls):
       pass

Итак, из 3 строк кода мы превратили медленную последовательную задачу в параллельную, занимающую всего 5 минут:

$ time python io_bound_threads.py
21.40s user 6.10s system 294.07s real 31784kB mem

Мы поднялись с 855,03 до 294,07, увеличившись в 2,9 раза!

Подожди, есть еще

Самое замечательное в этом новом API состоит в том, что вы можете заменить

from concurrent.futures import ThreadPoolExecutor as PoolExecutor

на

from concurrent.futures import ProcessPoolExecutor as PoolExecutor

сказав Python использовать процессы вместо потоков. Из любопытства посмотрим, что происходит со временем выполнения:

$ time python io_bound_processes.py
22.19s user 6.03s system 270.28s real 23324kB mem

На 20 секунд меньше, чем у потоковой версии, не сильно отличается. Имейте в виду, что это ненаучные эксперименты, и я использую компьютер во время работы этих скриптов.

Бонусный контент

Мой компьютер имеет 4 ядра, давайте посмотрим, что происходит с многопоточными версиями, увеличивая количество рабочих потоков:

# 6 threads
20.48s user 5.19s system 155.92s real 35876kB mem
# 8 threads
23.48s user 5.55s system 178.29s real 40472kB mem
# 16 threads
23.77s user 5.44s system 119.69s real 58928kB mem
# 32 threads
21.88s user 4.81s system 119.26s real 96136kB mem

Три вещи, на которые следует обратить внимание: занятость ОЗУ явно увеличивается, мы достигаем 16 потоков, а при 16 потоках мы более чем в 7 раз быстрее первоночальной версии.

Если вы не распознаете вывод времени, это потому, что я назвал его так:

time='gtime -f '\''%Us user %Ss system %es real %MkB mem -- %C'\'

где gtime установлен brew install gnu-time

Выводы

Я думаю, что  ThreadPoolExecutor  и  ProcessPoolExecutor  - это супер классные дополнения к стандартной библиотеке Python. Вы могли бы сделать в основном все, что они делают со «старыми»  threading и multiprocessing и очередями FIFO, но этот API намного лучше.

#Python
Комментарии
Чтобы оставить комментарий, необходимо авторизоваться