DevGang
Авторизоваться

Обработка больших файлов с использованием Python: часть 2 

В прошлой статье я написал о некоторых методах, которые я использую в python для эффективной обработки очень больших наборов данных. Вы можете найти это здесь. Грубо говоря, в нем подробно рассказывается, как можно разбить большой файл на куски, которые затем можно передать на несколько ядер для сокращения времени обработки. Ниже я подробно остановлюсь на этом, сначала создав родительский класс, который превращает данный (большой) файл в порции. Я создаю его таким образом, чтобы дочерние классы можно было легко создавать и адаптировать для конкретных типов файлов, учитывая некоторые примеры. Наконец, я даю некоторые функции обертывания для использования в сочетании с любым из чанков, чтобы чанки могли обрабатываться с использованием нескольких ядер.

Есть один вопрос на который стоит дать ответ, перед тем как вы начнете использовать данный метод повсеместно. Какого размера фалй стоит обрабатывать данным методом? Грубый ответ будет, когда размер данных становится сопоставимым с доступной оперативной памятью. Лучшим ответом будет, когда издержки на чтение каждой отдельной строки (записи) больше, чем операция над этой записью. Вот пример этого случая, хотя это не совсем справедливое сравнение:

>>> import timeit,os.path
>>> os.path.getsize("Saccharomyces_cerevisiae.R64-1-1.cds.all.fa")
10095955
>>> timeit.timeit('f = open("Saccharomyces_cerevisiae.R64-1-1.cds.all.fa");sum([l.count(">") for l in f])',number=10)
0.8403599262237549
>>> timeit.timeit('f = open("Saccharomyces_cerevisiae.R64-1-1.cds.all.fa");sum([c.count(">") for c in iter(lambda: f.read(1024*1024),"")])',number=10)
0.15671014785766602

Для небольшого 10-мегабайтного файла Fasta мы подсчитываем количество последовательностей, присутствующих в пятой части времени, используя чанки. Я должен быть честным и утверждать, что ускорение в основном связано с отсутствием идентифицирующих символов новой строки в методе чанкинга; но тем не менее, это показывает силу, которую можно иметь, используя куски. Для файла fasta 14 ГБ время для фрагментированных (1 МБ фрагментов) и не фрагментированных методов составляет 55 с и 130 с соответственно.

Возвращаясь на правильный путь, давайте превратим метод chunking в родительский класс, из которого мы можем построить:

import os.path

class Chunker(object):

   @classmethod
   def chunkify(cls,fname,size=1024*1024):
       fileEnd = os.path.getsize(fname)
       with open(fname,'r') as f:
           chunkEnd = f.tell()
           while True:
               chunkStart = chunkEnd
               f.seek(size,1)
               cls._EOC(f)
               chunkEnd = f.tell()
               yield chunkStart, chunkEnd - chunkStart
               if chunkEnd >= fileEnd:
                   break

   @staticmethod
   def _EOC(f):
       f.readline()

   @staticmethod
   def read(fname,chunk):
       with open(fname,'r') as f:
           f.seek(chunk[0])
           return f.read(chunk[1])

   @staticmethod
   def parse(chunk):
       for line in chunk.splitlines():
           yield chunk

Выше мы создаем класс Chunker, у которого есть метод класса chunkify, а также статические методы _EOC, read и parse. Метод chunkify выполняет фактическое разбиение на фрагменты данных файла, возвращая итератор, который выдает кортежи, содержащие начало и размер фрагмента. Это метод класса, так что он может использовать статический метод _EOC (конец фрагмента), чтобы переместить указатель в подходящее место для разделения фрагментов. Для простейшего случая это просто конец / начало новой строки. Методы read и parse считывают данный фрагмент из файла и разбивают его на блоки (отдельные строки в простейшем случае) соответственно. Мы делаем методы non-chunkify статическими, чтобы их можно было вызывать без дополнительных затрат на создание экземпляра класса.

Давайте теперь создадим несколько дочерних элементов этого класса для определенных типов файлов. Во-первых, один из самых известных типов файлов в биоинформатике, FASTA. Ниже приведен фрагмент файла FASTA. Каждая запись имеет строку заголовка, которая начинается с «>», за которым следует уникальный идентификатор последовательности. После строки заголовка следуют одна или несколько строк, задающих последовательность. Последовательности могут представлять собой последовательности белков или нуклеиновых кислот, и они могут содержать пробелы и / или символы выравнивания.

>SEQUENCE_1
MTEITAAMVKELRESTGAGMMDCKNALSETNGDFDKAVQLLREKGLGKAAKKADRLAAEG
LVSVKVSDDFTIAAMRPSYLSYEDLDMTFVENEYKALVAELEKENEERRRLKDPNKPEHK
IPQFASRKQLSDAILKEAEEKIKEELKAQGKPEKIWDNIIPGKMNSFIADNSQLDSKLTL
MGQFYVMDDKKTVEQVIAEKEKEFGGKIKIVEFICFEVGEGLEKKTEDFAAEVAAQL
>SEQUENCE_2
SATVSEINSETDFVAKNDQFIALTKDTTAHIQSNSLQSVEELHSSTINGVKFEEYLKSQI
ATIGENLVVRRFATLKAGANGVVNGYIHTNGRVGVVIAAACDSAEVASKSRDLLRQICMH

А вот обработка данного файла с применением нашего класса Chunker:

from Bio import SeqIO
from cStringIO import StringIO

class Chunker_FASTA(Chunker):

   @staticmethod
   def _EOC(f):
       l = f.readline() #incomplete line
       p = f.tell()
       l = f.readline()
       while l and l[0] != '>':
           p = f.tell()
           l = f.readline()
       f.seek(p) #revert one line

   @staticmethod
   def parse(chunk):
       fh = cStringIO.StringIO(chunk)
       for record in SeqIO.parse(fh,"fasta"):
           yield record
       fh.close()

Мы обновляем метод _EOC, чтобы найти, когда одна запись заканчивается, а следующая начинается с определения местоположения «>», после чего мы перематываем указатель дескриптора файла на начало этой строки. Мы также обновляем метод parse, чтобы использовать анализатор fasta из модуля BioPython, что дает объекты SeqRecord для каждой записи в чанке.

В качестве второго более сложного примера приведу пример работы с выводом, полученным с помощью bowtie, выравнивателя коротких операций чтения из данных NGS. Формат состоит из столбцов, разделенных табуляцией, с идентификатором каждого чтения, расположенным в первом столбце. Обратите внимание, что одно чтение может быть выровнено в нескольких местах (до 8 по умолчанию!), Поэтому один и тот же идентификатор появляется в нескольких строках. Небольшой пример раздела результатов приведен ниже.

SRR014374.1 HWI-EAS355_3_Nick_1_1_464_1416 length=36 + RDN25-2 2502 GTTTCTTTACTTATTCAATGAAGCGG IIIIIIIIIIIIIIIIIIIIIIIIII 3 
SRR014374.1 HWI-EAS355_3_Nick_1_1_464_1416 length=36 + RDN37-2 4460 GTTTCTTTACTTATTCAATGAAGCGG IIIIIIIIIIIIIIIIIIIIIIIIII 3 
SRR014374.1 HWI-EAS355_3_Nick_1_1_464_1416 length=36 + RDN25-1 2502 GTTTCTTTACTTATTCAATGAAGCGG IIIIIIIIIIIIIIIIIIIIIIIIII 3 
SRR014374.1 HWI-EAS355_3_Nick_1_1_464_1416 length=36 + RDN37-1 4460 GTTTCTTTACTTATTCAATGAAGCGG IIIIIIIIIIIIIIIIIIIIIIIIII 3 
SRR014374.2 HWI-EAS355_3_Nick_1_1_341_1118 length=36 + RDN37-2 4460 GTTTCTTTACTTATTCAATGAAGCG IIIIIIIIIIIIIIIIIIIIIIIII 3 
SRR014374.2 HWI-EAS355_3_Nick_1_1_341_1118 length=36 + RDN25-1 2502 GTTTCTTTACTTATTCAATGAAGCG IIIIIIIIIIIIIIIIIIIIIIIII 3 
SRR014374.2 HWI-EAS355_3_Nick_1_1_341_1118 length=36 + RDN37-1 4460 GTTTCTTTACTTATTCAATGAAGCG IIIIIIIIIIIIIIIIIIIIIIIII 3 
SRR014374.2 HWI-EAS355_3_Nick_1_1_341_1118 length=36 + RDN25-2 2502 GTTTCTTTACTTATTCAATGAAGCG IIIIIIIIIIIIIIIIIIIIIIIII 3 
SRR014374.8 HWI-EAS355_3_Nick_1_1_187_1549 length=36 + RDN25-2 2502 GTTTCTTTACTTATTCAATGAAGCGG IIIIIIIIIIIIIIIIIIIIIIIIII 3 
SRR014374.8 HWI-EAS355_3_Nick_1_1_187_1549 length=36 + RDN37-2 4460 GTTTCTTTACTTATTCAATGAAGCGG IIIIIIIIIIIIIIIIIIIIIIIIII 3

с соответствующим Chunker, заданным:

class Chunker_BWT(chunky.Chunker):

   @staticmethod
   def _EOC(f):
       l = f.readline()#incomplete line
       l = f.readline()
       if not l: return #EOF
       readID = l.split()[0]
       while l and (l.split()[0] != readID):
           p = f.tell() 
           l = f.readline()
       f.seek(p)

   @staticmethod
   def parse(chunk):
       lines = chunk.splitlines()
       N = len(lines)
       i = 0 
       while i < N:
           readID = lines[i].split('\t')[0]
           j = i
           while lines[j].split('\t')[0] == readID:
               j += 1
               if j == N:
                   break
           yield lines[i:j]
           i = j

На этот раз конец фрагмента находится путем чтения строк до тех пор, пока в идентификаторе чтения не появится переключатель, после чего мы возвращаем одну строку. Для синтаксического анализа мы приводим все различные местоположения, к которым относится данное чтение, как одну запись.

Надеемся, что эти примеры покажут, как можно легко расширить родительский класс для большинства типов файлов. Давайте теперь объединим эти различные чанкеры с кодом из предыдущего поста, чтобы показать, как мы можем включить многоядерную параллельную обработку чанков, которые они дают. Приведенный ниже код содержит несколько обобщенных функций-оболочек, которые работают в тандеме с любым из вышеперечисленных блоков, позволяя распараллелить большинство задач.

import multiprocessing as mp, sys

def _printMP(text):
   print text
   sys.stdout.flush()

def _workerMP(chunk,inFile,jobID,worker,kwargs):
   _printMP("Processing chunk: "+str(jobID))
   output = worker(chunk,inFile,**kwargs)
   _printMP("Finished chunk: "+str(jobID))
   return output

def main(inFile,worker,chunker=Chunker,cores=1,kwargs={}):
   pool = mp.Pool(cores)

   jobs = []
   for jobID,chunk in enumerate(chunker.chunkify(inFile)):
       job = pool.apply_async(_workerMP,(chunk,inFile,jobID,worker,kwargs))
       jobs.append(job)

   output = []
   for job in jobs:
       output.append( job.get() )

   pool.close()
   
   return output

Основная функция должна быть узнаваемой как код из предыдущего поста. Он генерирует пул воркеров, в идеале по одному на каждое ядро, прежде чем использовать данный блок для разделения соответствующего файла на серию фрагментов для обработки. В отличие от предыдущих примеров, мы собираем выходные данные каждой задачи и возвращаем их после завершения обработки. Основная функция действует как оболочка, позволяющая нам определять разные функции обработки и разные чанкеры, как указано переменными worker и chunker соответственно. Мы обернули вызов функции обработки в функцию _workerMP, которая печатает на терминал по мере выполнения задач. Для этого он использует функцию _printMP, так как вам нужно очистить терминал после оператора печати при использовании многоядерной обработки, иначе ничего не появится, пока все задачи не будут выполнены.

Давайте закончим, продемонстрировав пример того, как мы будем использовать вышеупомянутое, чтобы выполнить ту же задачу, что и в начале этого поста, подсчитывая последовательности в файле fasta, используя базовый чанкер:

def seq_counter(chunk,inFile):
   data = Chunker.read(inFile,chunk)
   return data.count('>')

и используя чанкер FASTA:

def seq_counter_2(chunk,inFile):
   data = list(Chunker_FASTA.parse(Chunker_FASTA.read(inFile,chunk)))
   return len(data)

И время, которое они занимают, чтобы посчитать последовательности в файле 14GB из ранее:

>>> os.path.getsize("SRR951828.fa")
14944287128
>>> x=time.time();f = open("SRR951828.fa");sum([l.count(">") for l in f]);time.time()-x
136829250
190.05533599853516
>>> x=time.time();f = open("SRR951828.fa");sum([c.count(">") for c in iter(lambda: f.read(1024*1024),"")]);time.time()-x
136829250
26.343637943267822
>>> x=time.time();sum(main("SRR951828.fa",seq_counter,cores=8));time.time()-x
136829250
4.36846399307251
>>> x=time.time();main("SRR951828.fa",seq_counter_2,Chunker_FASTA,8);time.time()-x
136829250
398.94060492515564

Давайте проигнорируем последний вывод, поскольку замедление связано с превращением записей в BioPython SeqRecords. Предыдущий, который объединяет разбиение на чанки и многоядерную обработку, примерно в 50 раз быстрее. Я уверен, что это может быть дополнительно уменьшено с использованием большего количества ядер и / или оптимизации размера фрагмента, однако одно это различие может изменить что-то от вычислительно неправдоподобного к правдоподобному. Неплохо только для нескольких строк кода.

#Python
Комментарии
Чтобы оставить комментарий, необходимо авторизоваться