DevGang
Авторизоваться

Обработка больших файлов с использованием Python 

В последний год или около того, и с моим повышенным вниманием к данным ribo-seq я полностью осознал, что означает термин большие данные. Исследования ribo-seq в их необработанном виде могут легко охватить сотни ГБ, что означает, что их обработка как своевременной, так и эффективной требует некоторого обдумывания. В этом посте, и, надеюсь, в следующем, я хочу подробно описать некоторые из методов, которые я придумала (собрал из разных статей в интернете), которые помогают мне получать данные такого масштаба. В частности, я буду подробно описывать методы для Python, хотя некоторые методы можно перенести на другие языки.

Мой первый большой совет по Python о том, как разбить ваши файлы на более мелкие блоки (или куски) таким образом, чтобы вы могли использовать несколько процессоров. Давайте начнем с самого простого способа чтения файла на python.

with open("input.txt") as f:
   data = f.readlines()
   for line in data:
       process(line)

Эта ошибка, сделанная выше в отношении больших данных, заключается в том, что она считывает все данные в ОЗУ, прежде чем пытаться обрабатывать их построчно. Это, вероятно, самый простой способ вызвать переполнение памяти и возникновение ошибки. Давайте исправим это, читая данные построчно, чтобы в любой момент времени в оперативной памяти сохранялась только одна строка.

with open("input.txt") as f:
   for line in f:
       process(line)

Это большое улучшение, и именно оно не перегружает ОЗУ при загрузке большого файла. Затем мы должны попытаться немного ускорить это, используя все эти бездействующие ядра.

import multiprocessing as mp

pool = mp.Pool(cores)
jobs = []

with open("input.txt") as f:
   for line in f:
       jobs.append( pool.apply_async(process,(line)) )

# дождаться окончания всех работ
for job in jobs:
   job.get()

pool.close()

При условии, что порядок обработки строк не имеет значения, приведенный выше код генерирует набор (пул) обработчиков, в идеале один для каждого ядра, перед созданием группы задач (заданий), по одной для каждой строки. Я склонен использовать объект Pool, предоставляемый модулем multiprocessing, из-за простоты использования, однако, вы можете порождать и контролировать отдельные обработчики, используя mp.Process, если вы хотите более точное управление. Для простого вычисления числа объект Pool очень хорош.

Хотя вышеперечисленное теперь использует все эти ядра, к сожалению, снова возникают проблемы с памятью. Мы специально используем функцию apply_async, чтобы пул не блокировался во время обработки каждой строки. Однако при этом все данные снова считываются в память; это время сохраняется в виде отдельных строк, связанных с каждым заданием, ожидая обработки в строке. Таким образом, память снова будет переполнена. В идеале метод считывает строку в память только тогда, когда подходит ее очередь на обработку.

import multiprocessing as mp

def process_wrapper(lineID):
   with open("input.txt") as f:
       for i, line in enumerate(f):
           if i != lineID:
               continue
           else:
               process(line)
               break

pool = mp.Pool(cores)
jobs = []

with open("input.txt") as f:
   for ID, line in enumerate(f):
       jobs.append( pool.apply_async(process_wrapper,(ID)) )

# дождаться окончания всех работ
for job in jobs:
   job.get()

pool.close()

Выше мы изменили функцию, переданную в пул обработчика, чтобы она включала в себя открытие файла, поиск указанной строки, чтение ее в память и последующую обработку. Единственный вход, который теперь сохраняется для каждой порожденной задачи - это номер строки, что предотвращает переполнение памяти. К сожалению, накладные расходы, связанные с необходимостью найти строку путем итеративного чтения файла для каждого задания, являются несостоятельными, поскольку по мере того, как вы углубляетесь в файл, процесс занимает все больше времени. Чтобы избежать этого, мы можем использовать функцию поиска файловых объектов, которая пропускает вас в определенное место в файле. Сочетание с функцией tell, которая возвращает текущее местоположение в файле, дает:

import multiprocessing as mp

def process_wrapper(lineByte):
   with open("input.txt") as f:
       f.seek(lineByte)
       line = f.readline()
       process(line)

pool = mp.Pool(cores)
jobs = []

with open("input.txt") as f:
   nextLineByte = f.tell()
   for line in f:
       jobs.append( pool.apply_async(process_wrapper,(nextLineByte)) )
       nextLineByte = f.tell()

for job in jobs:
   job.get()

pool.close()

Используя поиск, мы можем перейти непосредственно к правильной части файла, после чего мы читаем строку в память и обрабатываем ее. Мы должны быть осторожны, чтобы правильно обрабатывать первую и последнюю строки, но в противном случае это будет именно то, что мы излагаем, а именно использование всех ядер для обработки данного файла без переполнения памяти.

Я закончу этот пост с небольшим обновлением вышеупомянутого, поскольку есть разумные накладные расходы, связанные с открытием и закрытием файла для каждой отдельной строки. Если мы обрабатываем несколько строк файла за один раз, мы можем сократить эти операции. Самая большая техническая сложность при этом заключается в том, что при переходе к месту в файле вы, скорее всего, не находитесь в начале строки. Для простого файла, как в этом примере, это просто означает, что вам нужно вызвать readline, который читает следующий символ новой строки. Более сложные типы файлов, вероятно, требуют дополнительного кода, чтобы найти подходящее место для начала / конца чанка.

import multiprocessing as mp, os

def process_wrapper(chunkStart, chunkSize):
   with open("input.txt") as f:
       f.seek(chunkStart)
       lines = f.read(chunkSize).splitlines()
       for line in lines:
           process(line)

def chunkify(fname,size=1024*1024):
   fileEnd = os.path.getsize(fname)
   with open(fname,'r') as f:
       chunkEnd = f.tell()
   while True:
       chunkStart = chunkEnd
       f.seek(size,1)
       f.readline()
       chunkEnd = f.tell()
       yield chunkStart, chunkEnd - chunkStart
       if chunkEnd > fileEnd:
           break

pool = mp.Pool(cores)
jobs = []

for chunkStart,chunkSize in chunkify("input.txt"):
   jobs.append( pool.apply_async(process_wrapper,(chunkStart,chunkSize)) )

for job in jobs:
   job.get()

pool.close()

Во всяком случае, я надеюсь, что некоторые из вышеперечисленных примеров были новыми и возможно, полезными для вас. Если вы знаете лучший способ сделать что-то (на python), мне было бы очень интересно узнать об этом. В следующем посте, который будет опубликован в ближайшем будущем, я расширю этот код, превратив его в родительский класс, из которого создается несколько дочерних элементов для использования с различными типами файлов.

#Python
Комментарии
Чтобы оставить комментарий, необходимо авторизоваться

Присоединяйся в тусовку

В этом месте могла бы быть ваша реклама

Разместить рекламу