Обработка больших файлов с использованием Python
В последний год или около того, и с моим повышенным вниманием к данным ribo-seq я полностью осознал, что означает термин большие данные. Исследования ribo-seq в их необработанном виде могут легко охватить сотни ГБ, что означает, что их обработка как своевременной, так и эффективной требует некоторого обдумывания. В этом посте, и, надеюсь, в следующем, я хочу подробно описать некоторые из методов, которые я придумала (собрал из разных статей в интернете), которые помогают мне получать данные такого масштаба. В частности, я буду подробно описывать методы для Python, хотя некоторые методы можно перенести на другие языки.
Мой первый большой совет по Python о том, как разбить ваши файлы на более мелкие блоки (или куски) таким образом, чтобы вы могли использовать несколько процессоров. Давайте начнем с самого простого способа чтения файла на python.
with open("input.txt") as f:
data = f.readlines()
for line in data:
process(line)Эта ошибка, сделанная выше в отношении больших данных, заключается в том, что она считывает все данные в ОЗУ, прежде чем пытаться обрабатывать их построчно. Это, вероятно, самый простой способ вызвать переполнение памяти и возникновение ошибки. Давайте исправим это, читая данные построчно, чтобы в любой момент времени в оперативной памяти сохранялась только одна строка.
with open("input.txt") as f:
for line in f:
process(line)Это большое улучшение, и именно оно не перегружает ОЗУ при загрузке большого файла. Затем мы должны попытаться немного ускорить это, используя все эти бездействующие ядра.
import multiprocessing as mp
pool = mp.Pool(cores)
jobs = []
with open("input.txt") as f:
for line in f:
jobs.append( pool.apply_async(process,(line)) )
# дождаться окончания всех работ
for job in jobs:
job.get()
pool.close()При условии, что порядок обработки строк не имеет значения, приведенный выше код генерирует набор (пул) обработчиков, в идеале один для каждого ядра, перед созданием группы задач (заданий), по одной для каждой строки. Я склонен использовать объект Pool, предоставляемый модулем multiprocessing, из-за простоты использования, однако, вы можете порождать и контролировать отдельные обработчики, используя mp.Process, если вы хотите более точное управление. Для простого вычисления числа объект Pool очень хорош.
Хотя вышеперечисленное теперь использует все эти ядра, к сожалению, снова возникают проблемы с памятью. Мы специально используем функцию apply_async, чтобы пул не блокировался во время обработки каждой строки. Однако при этом все данные снова считываются в память; это время сохраняется в виде отдельных строк, связанных с каждым заданием, ожидая обработки в строке. Таким образом, память снова будет переполнена. В идеале метод считывает строку в память только тогда, когда подходит ее очередь на обработку.
import multiprocessing as mp
def process_wrapper(lineID):
with open("input.txt") as f:
for i, line in enumerate(f):
if i != lineID:
continue
else:
process(line)
break
pool = mp.Pool(cores)
jobs = []
with open("input.txt") as f:
for ID, line in enumerate(f):
jobs.append( pool.apply_async(process_wrapper,(ID)) )
# дождаться окончания всех работ
for job in jobs:
job.get()
pool.close()Выше мы изменили функцию, переданную в пул обработчика, чтобы она включала в себя открытие файла, поиск указанной строки, чтение ее в память и последующую обработку. Единственный вход, который теперь сохраняется для каждой порожденной задачи - это номер строки, что предотвращает переполнение памяти. К сожалению, накладные расходы, связанные с необходимостью найти строку путем итеративного чтения файла для каждого задания, являются несостоятельными, поскольку по мере того, как вы углубляетесь в файл, процесс занимает все больше времени. Чтобы избежать этого, мы можем использовать функцию поиска файловых объектов, которая пропускает вас в определенное место в файле. Сочетание с функцией tell, которая возвращает текущее местоположение в файле, дает:
import multiprocessing as mp
def process_wrapper(lineByte):
with open("input.txt") as f:
f.seek(lineByte)
line = f.readline()
process(line)
pool = mp.Pool(cores)
jobs = []
with open("input.txt") as f:
nextLineByte = f.tell()
for line in f:
jobs.append( pool.apply_async(process_wrapper,(nextLineByte)) )
nextLineByte = f.tell()
for job in jobs:
job.get()
pool.close()Используя поиск, мы можем перейти непосредственно к правильной части файла, после чего мы читаем строку в память и обрабатываем ее. Мы должны быть осторожны, чтобы правильно обрабатывать первую и последнюю строки, но в противном случае это будет именно то, что мы излагаем, а именно использование всех ядер для обработки данного файла без переполнения памяти.
Я закончу этот пост с небольшим обновлением вышеупомянутого, поскольку есть разумные накладные расходы, связанные с открытием и закрытием файла для каждой отдельной строки. Если мы обрабатываем несколько строк файла за один раз, мы можем сократить эти операции. Самая большая техническая сложность при этом заключается в том, что при переходе к месту в файле вы, скорее всего, не находитесь в начале строки. Для простого файла, как в этом примере, это просто означает, что вам нужно вызвать readline, который читает следующий символ новой строки. Более сложные типы файлов, вероятно, требуют дополнительного кода, чтобы найти подходящее место для начала / конца чанка.
import multiprocessing as mp, os
def process_wrapper(chunkStart, chunkSize):
with open("input.txt") as f:
f.seek(chunkStart)
lines = f.read(chunkSize).splitlines()
for line in lines:
process(line)
def chunkify(fname,size=1024*1024):
fileEnd = os.path.getsize(fname)
with open(fname,'r') as f:
chunkEnd = f.tell()
while True:
chunkStart = chunkEnd
f.seek(size,1)
f.readline()
chunkEnd = f.tell()
yield chunkStart, chunkEnd - chunkStart
if chunkEnd > fileEnd:
break
pool = mp.Pool(cores)
jobs = []
for chunkStart,chunkSize in chunkify("input.txt"):
jobs.append( pool.apply_async(process_wrapper,(chunkStart,chunkSize)) )
for job in jobs:
job.get()
pool.close()Во всяком случае, я надеюсь, что некоторые из вышеперечисленных примеров были новыми и возможно, полезными для вас. Если вы знаете лучший способ сделать что-то (на python), мне было бы очень интересно узнать об этом. В следующем посте, который будет опубликован в ближайшем будущем, я расширю этот код, превратив его в родительский класс, из которого создается несколько дочерних элементов для использования с различными типами файлов.