首页 文章

Spark和Cassandra并行处理

提问于
浏览
0

我前面有一个下面的任务 .

用户在执行spark submit命令时提供一组配置文件的IP地址 .

让我们说阵列看起来像这样:

val ips = Array(1,2,3,4,5)

数组中最多可以有100.000个值 .

对于数组中的所有元素,我应该读取Cassandra的数据,执行一些计算并将数据插回到Cassandra .

如果我做:

ips.foreach(ip =>{
- read data from Casandra for specific "ip" // for each IP there is different amount of data to read (within the functions I determine start and end date for each IP)
- process it
- save it back to Cassandra})

这很好用 .

我相信这个过程顺序进行;我没有利用并行性 .

另一方面,如果我这样做:

val IPRdd = sc.parallelize(Array(1,2,3,4,5))
IPRdd.foreach(ip => {
- read data from Cassandra // I need to use spark context to make the query
-process it
save it back to Cassandra})

我得到序列化异常,因为spark正在尝试序列化spark上下文,这是不可序列化的 .

如何使这项工作,但仍然利用并行性 .

谢谢

Edited

这是我得到的执行:

线程“main”org.apache.spark.SparkException中的异常:org.apache.spark.util.ClosureCleaner $中的org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:304)中的任务不可序列化 . org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294)org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122)at org.apache.spark.SparkContext.clean( SparkContext.scala:2055)org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:919)at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply( RDD.scala:918)org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150)atg.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:111)at org.apache .spark.rdd.RDD.withScope(RDD.scala:316)at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)at com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1.apply(WibeeeBatchJob.scala:59)at com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatch Job $$ anonfun $ main $ 1.apply(WibeeeBatchJob.scala:54)at scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)at scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala: 108)at com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $ .main(WibeeeBatchJob.scala:54)at the com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob.main(WibeeeBatchJob.scala)at sun.reflect.NativeMethodAccessorImpl . 在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)的sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)的java.lang.reflect.Method.invoke上的invoke0(Native Method)(Method.java: 498)在org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:731)at org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit . scala:181)org.apache.spache.deploy.SparkSubmit $ .submit(SparkSubmit.scala:206)atg.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:121)at org.apache.spark由Matchi.com提供回到ploy.SparkSubmit.main(SparkSubmit.scala)引起:java.io.NotSerializableException:org.apache.spark.SparkContext序列化堆栈: - 对象不可序列化(类:org.apache.spark.SparkContext,value:org.apache . spark.SparkContext@311ff287) - field(类:com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1,name:sc $ 1,type:class org.apache.spark.SparkContext) - object(class com .enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1,) - 字段(类:com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1 $$ anonfun $ apply $ 1,name:$ outer,类型:class com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1) - object(class com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1 $$ anonfun $ apply $ 1 ,)org.apache.sprial.Srializer.SerializationDebugger $ .improveException(SerializationDebugger.scala:40)org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)at org.apache.spark.serializer . JA org.apache.spark.util.ClosureCleaner上的vaSerializerInstance.serialize(JavaSerializer.scala:101)$ .ensureSerializable(ClosureCleaner.scala:301)

1 回答

  • 1

    最简单的方法是使用Spark Cassandra Connector,它可以处理连接池和序列化 .

    有了它,你可以做点什么

    sc.parallelize(inputData, numTasks)
      .mapPartitions {  it =>
        val con = CassandraConnection(yourConf)
        con.withSessionDo{ session =>
          //Use the session
        }
        //Do any other processing
      }.saveToCassandra("ks","table"
    

    这将完全是Cassandra Connection的手动操作 . 会话将全部自动池化和缓存,如果您准备一个语句,那么这些会话也将缓存在执行程序中 .

    如果您想使用更多内置方法,那么 joinWithCassandraTable 也可能适用于您的情况 .

    sc.parallelize(inputData, numTasks)
      .joinWithCassandraTable("ks","table") //Retrieves all records for which input data is the primary key
      .map( //manipulate returned results if needed )
      .saveToCassandra("ks","table")
    

相关问题