scala - Joining process with broadcast variable ends up endless spilling -


i joining 2 rdds text files in standalone mode. 1 has 400 million (9 gb) rows, , other has 4 million (110 kb).

3-grams  doc1           3-grams   doc2 ion -    100772c111      ion -    200772c222   on  -    100772c111      gon -    200772c222    n  -    100772c111        n -    200772c222 ... -    ....            ... -    ....  ion -    3332145654      on  -    58898874 mju -    3332145654      mju -    58898874 ... -    ....            ... -    .... 

in each file, doc numbers (doc1 or doc2) appear 1 under other. , result of join number of common 3-grams between docs.e.g.

  (100772c111-200772c222,2) --> there 2 common 3-grams 'ion' ,  '  n' 

the server on run code has 128 gb ram , 24 cores. set intellij configurations - vm options part -xmx64g

here code this:

val conf = new sparkconf().setappname("abdulhay").setmaster("local[4]").set("spark.shuffle.spill", "true")       .set("spark.shuffle.memoryfraction", "0.6").set("spark.storage.memoryfraction", "0.4")       .set("spark.executor.memory","40g")       .set("spark.driver.memory","40g")  val sc = new sparkcontext(conf)  val emp = sc.textfile("\\doc1.txt").map(line => (line.split("\t")(3),line.split("\t")(1))).distinct()     val emp_new = sc.textfile("\\doc2.txt").map(line => (line.split("\t")(3),line.split("\t")(1))).distinct()  val emp_newbc = sc.broadcast(emp_new.groupbykey.collectasmap)  val joined = emp.mappartitions(iter => {       (k, v1) <- iter       v2 <- emp_newbc.value.getorelse(k, iterable())     } yield (s"$v1-$v2", 1))  val olsun = joined.reducebykey((a,b) => a+b)  olsun.map(x => x._1 + "\t" + x._2).saveastextfile("...\\out.txt") 

so seen, during join process using broadcast variable key values change. seems need repartition joined values? , highly expensive. result, ended spilling issue, , never ended. think 128 gb memory must sufficient. far understood, when broadcast variable used shuffling being decreased significantly? wrong application?

thanks in advance.

edit:

i have tried spark's join function below:

var joinrdd = emp.join(emp_new);  val kkk = joinrdd.map(line => (line._2,1)).reducebykey((a, b) => + b) 

again ending spilling.

edit2:

val conf = new sparkconf().setappname("abdulhay").setmaster("local[12]").set("spark.shuffle.spill", "true")       .set("spark.shuffle.memoryfraction", "0.4").set("spark.storage.memoryfraction", "0.6")       .set("spark.executor.memory","50g")       .set("spark.driver.memory","50g")     val sc = new sparkcontext(conf)  val emp = sc.textfile("s:\\staff_files\\mehmet\\projects\\spark - scala\\wos14.txt").map{line => val s = line.split("\t"); (s(5),s(0))}//.distinct()     val emp_new = sc.textfile("s:\\staff_files\\mehmet\\projects\\spark - scala\\fwo_word.txt").map{line => val s = line.split("\t"); (s(3),s(1))}//.distinct()      val cog = emp_new.cogroup(emp)  val skk =  cog.flatmap {       case (key: string, (l1: iterable[string], l2: iterable[string])) =>         (l1.toseq ++ l2.toseq).combinations(2).map { case seq(x, y) => if (x < y) ((x, y),1) else ((y, x),1) }.tolist     }      val com = skk.countbykey() 

i not use broadcast variables. when say:

val emp_newbc = sc.broadcast(emp_new.groupbykey.collectasmap) 

spark first moving entire dataset master node, huge bottleneck , prone produce memory errors on master node. piece of memory shuffled nodes (lots of network overhead), bound produce memory issues there too.

instead, join rdds using join (see description here)

figure out if have few keys. joining spark needs load entire key memory, , if keys few might still big partition given executor.

a separate note: reducebykey repartition anyway.

edit: ---------------------

ok, given clarifications, , assuming number of 3-grams per doc# not big, do:

  1. key both files 3-gram (3-gram, doc#) tuples.
  2. cogroup both rdds, gets 3gram key , 2 lists of doc#
  3. process in single scala function, output set of unique permutations of (doc-pairs).
  4. then coutbykey or countbykeyaprox count of number of distinct 3-grams each doc pair.

note: can skip .distinct() calls one. also, should not split every line twice. change line => (line.split("\t")(3),line.split("\t")(1))) line => { val s = line.split("\t"); (s(3),s(1)))

edit 2:

you seem tuning memory badly. instance, using .set("spark.shuffle.memoryfraction", "0.4").set("spark.storage.memoryfraction", "0.6") leaves no memory task execution (since add 1.0). should have seen sooner focused on problem itself.

check tunning guides here , here.

also, if running on single machine, might try single, huge executor (or ditch spark completely), don't need overhead of distributed processing platform (and distributed hardware failure tolerance, etc).


Comments

Popular posts from this blog

1111. appearing after print sequence - php -

java - WARN : org.springframework.web.servlet.PageNotFound - No mapping found for HTTP request with URI [/board/] in DispatcherServlet with name 'appServlet' -

Ruby on Rails, ActiveRecord, Postgres, UTF-8 and ASCII-8BIT encodings -