Spark Cassandra connector saveToCassandra() is sending data to driver and causing a OOM exception -
i trying use spark cassandra connector.
here code:
javardd<userstatistics> rdd=cassandrajavautil.javafunctions(sparkcontext).cassandratable( configstore.read("cassandra", "keyspace"), "user_activity_" + type).where("bucket =?", date).select("user_id", "code").maptopair(row -> new tuple2<string, integer>(row .getstring("user_id"), 1)).reducebykey((value1, value2) -> value1 + value2).map(s -> { list<userstatistics> userstatistics = new arraylist<>(); userstatistics userstatistic = new userstatistics(); userstatistic.setuser_id(s._1); userstatistic.setstatistics_type(type); long total = s._2; int failurecount = 0;//s._2._2().iterator().next(); int selectedcount = 0; //s._2._2().iterator().next(); userstatistic.settotal_count((int) total); userstatistic.setfailure_count(failurecount); userstatistic.setselected_count(selectedcount); userstatistics.add(userstatistic); return userstatistic; }); cassandrajavautil.javafunctions(rdd).writerbuilder(configstore.read("cassandra", "keyspace"), "user_statistics",maptorow(userstatistics.class)).savetocassandra();
after execute this, outputs follow. throws oom exception driver. not sure why trying send data driver.
executor: finished task 1007.0 in stage 0.0 (tid 1007). 84821 bytes result sent driver 15/09/29 13:57:32 info tasksetmanager: starting task 1016.0 in stage 0.0 (tid 1016, localhost, node_local, 2096 bytes) 15/09/29 13:57:32 info tasksetmanager: finished task 1007.0 in stage 0.0 (tid 1007) in 78 ms on localhost (1009/640442) 15/09/29 13:57:32 info executor: running task 1016.0 in stage 0.0 (tid 1016)
Comments
Post a Comment