Обработка больших файлов с использованием Python: часть 2
В прошлой статье я написал о некоторых методах, которые я использую в python для эффективной обработки очень больших наборов данных. Вы можете найти это здесь. Грубо говоря, в нем подробно рассказывается, как можно разбить большой файл на куски, которые затем можно передать на несколько ядер для сокращения времени обработки. Ниже я подробно остановлюсь на этом, сначала создав родительский класс, который превращает данный (большой) файл в порции. Я создаю его таким образом, чтобы дочерние классы можно было легко создавать и адаптировать для конкретных типов файлов, учитывая некоторые примеры. Наконец, я даю некоторые функции обертывания для использования в сочетании с любым из чанков, чтобы чанки могли обрабатываться с использованием нескольких ядер.
Есть один вопрос на который стоит дать ответ, перед тем как вы начнете использовать данный метод повсеместно. Какого размера фалй стоит обрабатывать данным методом? Грубый ответ будет, когда размер данных становится сопоставимым с доступной оперативной памятью. Лучшим ответом будет, когда издержки на чтение каждой отдельной строки (записи) больше, чем операция над этой записью. Вот пример этого случая, хотя это не совсем справедливое сравнение:
>>> import timeit,os.path >>> os.path.getsize("Saccharomyces_cerevisiae.R64-1-1.cds.all.fa") 10095955 >>> timeit.timeit('f = open("Saccharomyces_cerevisiae.R64-1-1.cds.all.fa");sum([l.count(">") for l in f])',number=10) 0.8403599262237549 >>> timeit.timeit('f = open("Saccharomyces_cerevisiae.R64-1-1.cds.all.fa");sum([c.count(">") for c in iter(lambda: f.read(1024*1024),"")])',number=10) 0.15671014785766602
Для небольшого 10-мегабайтного файла Fasta мы подсчитываем количество последовательностей, присутствующих в пятой части времени, используя чанки. Я должен быть честным и утверждать, что ускорение в основном связано с отсутствием идентифицирующих символов новой строки в методе чанкинга; но тем не менее, это показывает силу, которую можно иметь, используя куски. Для файла fasta 14 ГБ время для фрагментированных (1 МБ фрагментов) и не фрагментированных методов составляет 55 с и 130 с соответственно.
Возвращаясь на правильный путь, давайте превратим метод chunking в родительский класс, из которого мы можем построить:
import os.path class Chunker(object): @classmethod def chunkify(cls,fname,size=1024*1024): fileEnd = os.path.getsize(fname) with open(fname,'r') as f: chunkEnd = f.tell() while True: chunkStart = chunkEnd f.seek(size,1) cls._EOC(f) chunkEnd = f.tell() yield chunkStart, chunkEnd - chunkStart if chunkEnd >= fileEnd: break @staticmethod def _EOC(f): f.readline() @staticmethod def read(fname,chunk): with open(fname,'r') as f: f.seek(chunk[0]) return f.read(chunk[1]) @staticmethod def parse(chunk): for line in chunk.splitlines(): yield chunk
Выше мы создаем класс Chunker, у которого есть метод класса chunkify, а также статические методы _EOC, read и parse. Метод chunkify выполняет фактическое разбиение на фрагменты данных файла, возвращая итератор, который выдает кортежи, содержащие начало и размер фрагмента. Это метод класса, так что он может использовать статический метод _EOC (конец фрагмента), чтобы переместить указатель в подходящее место для разделения фрагментов. Для простейшего случая это просто конец / начало новой строки. Методы read и parse считывают данный фрагмент из файла и разбивают его на блоки (отдельные строки в простейшем случае) соответственно. Мы делаем методы non-chunkify статическими, чтобы их можно было вызывать без дополнительных затрат на создание экземпляра класса.
Давайте теперь создадим несколько дочерних элементов этого класса для определенных типов файлов. Во-первых, один из самых известных типов файлов в биоинформатике, FASTA. Ниже приведен фрагмент файла FASTA. Каждая запись имеет строку заголовка, которая начинается с «>», за которым следует уникальный идентификатор последовательности. После строки заголовка следуют одна или несколько строк, задающих последовательность. Последовательности могут представлять собой последовательности белков или нуклеиновых кислот, и они могут содержать пробелы и / или символы выравнивания.
>SEQUENCE_1 MTEITAAMVKELRESTGAGMMDCKNALSETNGDFDKAVQLLREKGLGKAAKKADRLAAEG LVSVKVSDDFTIAAMRPSYLSYEDLDMTFVENEYKALVAELEKENEERRRLKDPNKPEHK IPQFASRKQLSDAILKEAEEKIKEELKAQGKPEKIWDNIIPGKMNSFIADNSQLDSKLTL MGQFYVMDDKKTVEQVIAEKEKEFGGKIKIVEFICFEVGEGLEKKTEDFAAEVAAQL >SEQUENCE_2 SATVSEINSETDFVAKNDQFIALTKDTTAHIQSNSLQSVEELHSSTINGVKFEEYLKSQI ATIGENLVVRRFATLKAGANGVVNGYIHTNGRVGVVIAAACDSAEVASKSRDLLRQICMH
А вот обработка данного файла с применением нашего класса Chunker:
from Bio import SeqIO from cStringIO import StringIO class Chunker_FASTA(Chunker): @staticmethod def _EOC(f): l = f.readline() #incomplete line p = f.tell() l = f.readline() while l and l[0] != '>': p = f.tell() l = f.readline() f.seek(p) #revert one line @staticmethod def parse(chunk): fh = cStringIO.StringIO(chunk) for record in SeqIO.parse(fh,"fasta"): yield record fh.close()
Мы обновляем метод _EOC, чтобы найти, когда одна запись заканчивается, а следующая начинается с определения местоположения «>», после чего мы перематываем указатель дескриптора файла на начало этой строки. Мы также обновляем метод parse, чтобы использовать анализатор fasta из модуля BioPython, что дает объекты SeqRecord для каждой записи в чанке.
В качестве второго более сложного примера приведу пример работы с выводом, полученным с помощью bowtie, выравнивателя коротких операций чтения из данных NGS. Формат состоит из столбцов, разделенных табуляцией, с идентификатором каждого чтения, расположенным в первом столбце. Обратите внимание, что одно чтение может быть выровнено в нескольких местах (до 8 по умолчанию!), Поэтому один и тот же идентификатор появляется в нескольких строках. Небольшой пример раздела результатов приведен ниже.
SRR014374.1 HWI-EAS355_3_Nick_1_1_464_1416 length=36 + RDN25-2 2502 GTTTCTTTACTTATTCAATGAAGCGG IIIIIIIIIIIIIIIIIIIIIIIIII 3 SRR014374.1 HWI-EAS355_3_Nick_1_1_464_1416 length=36 + RDN37-2 4460 GTTTCTTTACTTATTCAATGAAGCGG IIIIIIIIIIIIIIIIIIIIIIIIII 3 SRR014374.1 HWI-EAS355_3_Nick_1_1_464_1416 length=36 + RDN25-1 2502 GTTTCTTTACTTATTCAATGAAGCGG IIIIIIIIIIIIIIIIIIIIIIIIII 3 SRR014374.1 HWI-EAS355_3_Nick_1_1_464_1416 length=36 + RDN37-1 4460 GTTTCTTTACTTATTCAATGAAGCGG IIIIIIIIIIIIIIIIIIIIIIIIII 3 SRR014374.2 HWI-EAS355_3_Nick_1_1_341_1118 length=36 + RDN37-2 4460 GTTTCTTTACTTATTCAATGAAGCG IIIIIIIIIIIIIIIIIIIIIIIII 3 SRR014374.2 HWI-EAS355_3_Nick_1_1_341_1118 length=36 + RDN25-1 2502 GTTTCTTTACTTATTCAATGAAGCG IIIIIIIIIIIIIIIIIIIIIIIII 3 SRR014374.2 HWI-EAS355_3_Nick_1_1_341_1118 length=36 + RDN37-1 4460 GTTTCTTTACTTATTCAATGAAGCG IIIIIIIIIIIIIIIIIIIIIIIII 3 SRR014374.2 HWI-EAS355_3_Nick_1_1_341_1118 length=36 + RDN25-2 2502 GTTTCTTTACTTATTCAATGAAGCG IIIIIIIIIIIIIIIIIIIIIIIII 3 SRR014374.8 HWI-EAS355_3_Nick_1_1_187_1549 length=36 + RDN25-2 2502 GTTTCTTTACTTATTCAATGAAGCGG IIIIIIIIIIIIIIIIIIIIIIIIII 3 SRR014374.8 HWI-EAS355_3_Nick_1_1_187_1549 length=36 + RDN37-2 4460 GTTTCTTTACTTATTCAATGAAGCGG IIIIIIIIIIIIIIIIIIIIIIIIII 3
с соответствующим Chunker, заданным:
class Chunker_BWT(chunky.Chunker): @staticmethod def _EOC(f): l = f.readline()#incomplete line l = f.readline() if not l: return #EOF readID = l.split()[0] while l and (l.split()[0] != readID): p = f.tell() l = f.readline() f.seek(p) @staticmethod def parse(chunk): lines = chunk.splitlines() N = len(lines) i = 0 while i < N: readID = lines[i].split('\t')[0] j = i while lines[j].split('\t')[0] == readID: j += 1 if j == N: break yield lines[i:j] i = j
На этот раз конец фрагмента находится путем чтения строк до тех пор, пока в идентификаторе чтения не появится переключатель, после чего мы возвращаем одну строку. Для синтаксического анализа мы приводим все различные местоположения, к которым относится данное чтение, как одну запись.
Надеемся, что эти примеры покажут, как можно легко расширить родительский класс для большинства типов файлов. Давайте теперь объединим эти различные чанкеры с кодом из предыдущего поста, чтобы показать, как мы можем включить многоядерную параллельную обработку чанков, которые они дают. Приведенный ниже код содержит несколько обобщенных функций-оболочек, которые работают в тандеме с любым из вышеперечисленных блоков, позволяя распараллелить большинство задач.
import multiprocessing as mp, sys def _printMP(text): print text sys.stdout.flush() def _workerMP(chunk,inFile,jobID,worker,kwargs): _printMP("Processing chunk: "+str(jobID)) output = worker(chunk,inFile,**kwargs) _printMP("Finished chunk: "+str(jobID)) return output def main(inFile,worker,chunker=Chunker,cores=1,kwargs={}): pool = mp.Pool(cores) jobs = [] for jobID,chunk in enumerate(chunker.chunkify(inFile)): job = pool.apply_async(_workerMP,(chunk,inFile,jobID,worker,kwargs)) jobs.append(job) output = [] for job in jobs: output.append( job.get() ) pool.close() return output
Основная функция должна быть узнаваемой как код из предыдущего поста. Он генерирует пул воркеров, в идеале по одному на каждое ядро, прежде чем использовать данный блок для разделения соответствующего файла на серию фрагментов для обработки. В отличие от предыдущих примеров, мы собираем выходные данные каждой задачи и возвращаем их после завершения обработки. Основная функция действует как оболочка, позволяющая нам определять разные функции обработки и разные чанкеры, как указано переменными worker и chunker соответственно. Мы обернули вызов функции обработки в функцию _workerMP, которая печатает на терминал по мере выполнения задач. Для этого он использует функцию _printMP, так как вам нужно очистить терминал после оператора печати при использовании многоядерной обработки, иначе ничего не появится, пока все задачи не будут выполнены.
Давайте закончим, продемонстрировав пример того, как мы будем использовать вышеупомянутое, чтобы выполнить ту же задачу, что и в начале этого поста, подсчитывая последовательности в файле fasta, используя базовый чанкер:
def seq_counter(chunk,inFile): data = Chunker.read(inFile,chunk) return data.count('>')
и используя чанкер FASTA:
def seq_counter_2(chunk,inFile): data = list(Chunker_FASTA.parse(Chunker_FASTA.read(inFile,chunk))) return len(data)
И время, которое они занимают, чтобы посчитать последовательности в файле 14GB из ранее:
>>> os.path.getsize("SRR951828.fa") 14944287128 >>> x=time.time();f = open("SRR951828.fa");sum([l.count(">") for l in f]);time.time()-x 136829250 190.05533599853516 >>> x=time.time();f = open("SRR951828.fa");sum([c.count(">") for c in iter(lambda: f.read(1024*1024),"")]);time.time()-x 136829250 26.343637943267822 >>> x=time.time();sum(main("SRR951828.fa",seq_counter,cores=8));time.time()-x 136829250 4.36846399307251 >>> x=time.time();main("SRR951828.fa",seq_counter_2,Chunker_FASTA,8);time.time()-x 136829250 398.94060492515564
Давайте проигнорируем последний вывод, поскольку замедление связано с превращением записей в BioPython SeqRecords. Предыдущий, который объединяет разбиение на чанки и многоядерную обработку, примерно в 50 раз быстрее. Я уверен, что это может быть дополнительно уменьшено с использованием большего количества ядер и / или оптимизации размера фрагмента, однако одно это различие может изменить что-то от вычислительно неправдоподобного к правдоподобному. Неплохо только для нескольких строк кода.