memory - Scala functional way of processing large scala data with lazy collections -


i trying figure out memory-efficient , functional ways process large scale of data using strings in scala. have read many things lazy collections , have seen quite bit of code examples. however, run "gc overhead exceeded" or "java heap space" issues again , again.

often problem try construct lazy collection, evaluate each new element when append growing collection (i don't other way incrementally). of course, try initializing initial lazy collection first , and yield collection holding desired values applying ressource-critical computations map or so, not know exact size of final collection priori initial lazy collection.

maybe me giving me hints or explanations on how improve following code example, splits fasta (definition below) formatted file 2 separate files according rule odd sequence pairs belong 1 file , ones aother 1 ("separation of strands"). "most" straight-forward way in imperative way looping through lines , printing corresponding files via open file streams (and of course works excellent). however, don't enjoy style of reassigning variables holding header , sequences, following example code uses (tail-)recursion, , appreciate have found way maintain similar design without running ressource problems!

the example works small files, files @ around ~500mb code fail standard jvm setups. want process files of "arbitray" size, 10-20gb or so.

val filename = args(0) val in = io.source.fromfile(filename) getlines  type ittype = iterator[string] type stype = stream[(string, string)]  def getfullseqs(ite: ittype) = {     //val metachar = ">"     val headpatt = "(^>)(.+)" r     val seqpatt  = "([\\w\\w]+)" r     @annotation.tailrec     def rec(it: ittype, out: stype = stream[(string, string)]()): stype =          if (it hasnext) next match  {             case headpatt(_,header) =>                 // introduce new header-sequence pair                 rec(it, (header, "") #:: out)             case seqpatt(seq) =>                 val oldval = out head                 // concat subsequences                 val newstream = (oldval._1, oldval._2 + seq) #:: out.tail                     rec(it, newstream)             case _ =>                 println("something went wrong friend, oh oh oh!"); stream[(string, string)]()                         } else out     rec(ite)     }  def printstrands(seqs: stype) {    import java.io.printwriter    import java.io.file    def printstrand(seqse: stype, strand: int) {         // use sequences of 1 strand          val indices =  list.tabulate(seqs.size/2)(_*2 + strand - 1).view         val p = new printwriter(new file(filename + "." + strand))         indices foreach { =>               p.print(">" + seqse(i)._1 + "\n" + seqse(i)._2 + "\n")         }; p.close        println("done bro!")    }    list(1,2).par foreach (s => printstrand(seqs, s)) }  printstrands(getfullseqs(in)) 

three questions arise me:

a) let's assume 1 needs maintain large data structure obtained processing initial iterator getlines in getfullseqs method (note different size of in , output of getfullseqs), because transformations on whole(!) data required repeatedly, because 1 not know part of data 1 require @ step. example might not best, how so? possible @ all??

b) when desired data structure not inherently lazy, 1 store (header -> sequence) pairs map()? wrap in lazy collection?

c) implementation of constructing stream might reverse order of inputted lines. when calling reverse, elements evaluated (in code, are, actual problem). there way post-process "from behind" in lazy fashion? know of reverseiterator, solution, or not evaluate elements first, (as need call on list)? 1 construct stream newval #:: rec(...), lose tail-recursion then, wouldn't i?

so need add elements collection, not evaluated process of adding. lazy val elem = "test"; elem :: lazycollection not looking for.

edit: have tried using by-name parameter stream argument in rec .

thank attention , time, appreciate (again :) ).

/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

fasta defined sequential set of sequences delimited single header line. header defined line starting ">". every line below header called part of sequence associated header. sequence ends when new header present. every header unique. example:

>header1
abcdefg
>header2
hijklmn
opqrstu
>header3
vwxyz
>header4
zyxwv

thus, sequence 2 twice big seq 1. program split file file containing

>header1
abcdefg
>header3
vwxyz

and second file b containing

>header2
hijklmn
opqrstu
>header4
zyxwv

the input file assumed consist of number of header-sequence pairs.

the key working large data structures hold in memory critical perform whatever operation need. so, in case, that's

  • your input file
  • your 2 output files
  • the current line of text

and that's it. in cases can need store information such how long sequence is; in such events, build data structures in first pass , use them on second pass. let's suppose, example, decide want write 3 files: 1 records, 1 odd, , 1 entries total length less 300 nucleotides. (warning--it compiles never ran it, may not work):

final def findsizes(   data: iterator[string], sz: map[string,long] = map(),   currentname: string = "", currentsize: long = 0 ): map[string,long] = {   def currentmap = if (currentname != "") sz + (currentname->currentsize) else sz   if (!data.hasnext) currentmap   else {     val s = data.next     if (s(0) == '>') findsizes(data, currentmap, s, 0)     else findsizes(data, sz, currentname, currentsize + s.length)   } } 

then, processing, use map , pass through again:

import java.io._ final def writefiles(   source: iterator[string], targets: array[printwriter],   sizes: map[string,long], count: int = -1, which: int = 0 ) {   if (!source.hasnext) targets.foreach(_.close)   else {     val s = source.next     if (s(0) == '>') {       val w = if (sizes.get(s).exists(_ < 300)) 2 else (count+1)%2       targets(w).println(s)       writefiles(source, targets, sizes, count+1, w)     }     else {       targets(which).println(s)       writefiles(source, targets, sizes, count, which)     }   } } 

you use source.fromfile(f).getlines() twice create iterators, , you're set. edit: in sense key step, because "lazy" collection. however, it's not important because doesn't read memory in ("lazy"), because doesn't store previous strings either!

more generally, scala can't thinking information need have in memory , can fetch off disk needed. lazy evaluation can help, there's no magic formula because can express requirement have data in memory in lazy way. scala can't interpret commands access memory as, secretly, instructions fetch stuff off disk instead. (well, not unless write library cache results disk that.)


Comments

Popular posts from this blog

java - Play! framework 2.0: How to display multiple image? -

gmail - Is there any documentation for read-only access to the Google Contacts API? -

php - Controller/JToolBar not working in Joomla 2.5 -