Обработка больших файлов с использованием Python
В последний год или около того, и с моим повышенным вниманием к данным ribo-seq я полностью осознал, что означает термин большие данные. Исследования ribo-seq в их необработанном виде могут легко охватить сотни ГБ, что означает, что их обработка как своевременной, так и эффективной требует некоторого обдумывания. В этом посте, и, надеюсь, в следующем, я хочу подробно описать некоторые из методов, которые я придумала (собрал из разных статей в интернете), которые помогают мне получать данные такого масштаба. В частности, я буду подробно описывать методы для Python, хотя некоторые методы можно перенести на другие языки.
Мой первый большой совет по Python о том, как разбить ваши файлы на более мелкие блоки (или куски) таким образом, чтобы вы могли использовать несколько процессоров. Давайте начнем с самого простого способа чтения файла на python.
with open("input.txt") as f: data = f.readlines() for line in data: process(line)
Эта ошибка, сделанная выше в отношении больших данных, заключается в том, что она считывает все данные в ОЗУ, прежде чем пытаться обрабатывать их построчно. Это, вероятно, самый простой способ вызвать переполнение памяти и возникновение ошибки. Давайте исправим это, читая данные построчно, чтобы в любой момент времени в оперативной памяти сохранялась только одна строка.
with open("input.txt") as f: for line in f: process(line)
Это большое улучшение, и именно оно не перегружает ОЗУ при загрузке большого файла. Затем мы должны попытаться немного ускорить это, используя все эти бездействующие ядра.
import multiprocessing as mp pool = mp.Pool(cores) jobs = [] with open("input.txt") as f: for line in f: jobs.append( pool.apply_async(process,(line)) ) # дождаться окончания всех работ for job in jobs: job.get() pool.close()
При условии, что порядок обработки строк не имеет значения, приведенный выше код генерирует набор (пул) обработчиков, в идеале один для каждого ядра, перед созданием группы задач (заданий), по одной для каждой строки. Я склонен использовать объект Pool, предоставляемый модулем multiprocessing, из-за простоты использования, однако, вы можете порождать и контролировать отдельные обработчики, используя mp.Process, если вы хотите более точное управление. Для простого вычисления числа объект Pool очень хорош.
Хотя вышеперечисленное теперь использует все эти ядра, к сожалению, снова возникают проблемы с памятью. Мы специально используем функцию apply_async, чтобы пул не блокировался во время обработки каждой строки. Однако при этом все данные снова считываются в память; это время сохраняется в виде отдельных строк, связанных с каждым заданием, ожидая обработки в строке. Таким образом, память снова будет переполнена. В идеале метод считывает строку в память только тогда, когда подходит ее очередь на обработку.
import multiprocessing as mp def process_wrapper(lineID): with open("input.txt") as f: for i, line in enumerate(f): if i != lineID: continue else: process(line) break pool = mp.Pool(cores) jobs = [] with open("input.txt") as f: for ID, line in enumerate(f): jobs.append( pool.apply_async(process_wrapper,(ID)) ) # дождаться окончания всех работ for job in jobs: job.get() pool.close()
Выше мы изменили функцию, переданную в пул обработчика, чтобы она включала в себя открытие файла, поиск указанной строки, чтение ее в память и последующую обработку. Единственный вход, который теперь сохраняется для каждой порожденной задачи - это номер строки, что предотвращает переполнение памяти. К сожалению, накладные расходы, связанные с необходимостью найти строку путем итеративного чтения файла для каждого задания, являются несостоятельными, поскольку по мере того, как вы углубляетесь в файл, процесс занимает все больше времени. Чтобы избежать этого, мы можем использовать функцию поиска файловых объектов, которая пропускает вас в определенное место в файле. Сочетание с функцией tell, которая возвращает текущее местоположение в файле, дает:
import multiprocessing as mp def process_wrapper(lineByte): with open("input.txt") as f: f.seek(lineByte) line = f.readline() process(line) pool = mp.Pool(cores) jobs = [] with open("input.txt") as f: nextLineByte = f.tell() for line in f: jobs.append( pool.apply_async(process_wrapper,(nextLineByte)) ) nextLineByte = f.tell() for job in jobs: job.get() pool.close()
Используя поиск, мы можем перейти непосредственно к правильной части файла, после чего мы читаем строку в память и обрабатываем ее. Мы должны быть осторожны, чтобы правильно обрабатывать первую и последнюю строки, но в противном случае это будет именно то, что мы излагаем, а именно использование всех ядер для обработки данного файла без переполнения памяти.
Я закончу этот пост с небольшим обновлением вышеупомянутого, поскольку есть разумные накладные расходы, связанные с открытием и закрытием файла для каждой отдельной строки. Если мы обрабатываем несколько строк файла за один раз, мы можем сократить эти операции. Самая большая техническая сложность при этом заключается в том, что при переходе к месту в файле вы, скорее всего, не находитесь в начале строки. Для простого файла, как в этом примере, это просто означает, что вам нужно вызвать readline, который читает следующий символ новой строки. Более сложные типы файлов, вероятно, требуют дополнительного кода, чтобы найти подходящее место для начала / конца чанка.
import multiprocessing as mp, os def process_wrapper(chunkStart, chunkSize): with open("input.txt") as f: f.seek(chunkStart) lines = f.read(chunkSize).splitlines() for line in lines: process(line) def chunkify(fname,size=1024*1024): fileEnd = os.path.getsize(fname) with open(fname,'r') as f: chunkEnd = f.tell() while True: chunkStart = chunkEnd f.seek(size,1) f.readline() chunkEnd = f.tell() yield chunkStart, chunkEnd - chunkStart if chunkEnd > fileEnd: break pool = mp.Pool(cores) jobs = [] for chunkStart,chunkSize in chunkify("input.txt"): jobs.append( pool.apply_async(process_wrapper,(chunkStart,chunkSize)) ) for job in jobs: job.get() pool.close()
Во всяком случае, я надеюсь, что некоторые из вышеперечисленных примеров были новыми и возможно, полезными для вас. Если вы знаете лучший способ сделать что-то (на python), мне было бы очень интересно узнать об этом. В следующем посте, который будет опубликован в ближайшем будущем, я расширю этот код, превратив его в родительский класс, из которого создается несколько дочерних элементов для использования с различными типами файлов.