首页 文章

Apache Spark与Cassandra的行为

提问于
浏览
4

我正在编写一个独立的Spark程序,从Cassandra获取数据 . 我按照示例通过newAPIHadoopRDD()和ColumnFamilyInputFormat类创建了RDD . 创建了RDD,但是当我调用RDD的.groupByKey()方法时,我得到一个NotSerializableException:

public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf();
    sparkConf.setMaster("local").setAppName("Test");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);

    Job job = new Job();
    Configuration jobConf = job.getConfiguration();
    job.setInputFormatClass(ColumnFamilyInputFormat.class);

    ConfigHelper.setInputInitialAddress(jobConf, host);
    ConfigHelper.setInputRpcPort(jobConf, port);
    ConfigHelper.setOutputInitialAddress(jobConf, host);
    ConfigHelper.setOutputRpcPort(jobConf, port);
    ConfigHelper.setInputColumnFamily(jobConf, keySpace, columnFamily, true);
    ConfigHelper.setInputPartitioner(jobConf,"Murmur3Partitioner");
    ConfigHelper.setOutputPartitioner(jobConf,"Murmur3Partitioner");

    SlicePredicate predicate = new SlicePredicate();
    SliceRange sliceRange = new SliceRange();
    sliceRange.setFinish(new byte[0]);
    sliceRange.setStart(new byte[0]);
    predicate.setSlice_range(sliceRange);
    ConfigHelper.setInputSlicePredicate(jobConf, predicate);

    JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> rdd =
    spark.newAPIHadoopRDD(jobConf,
    ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
    ByteBuffer.class, SortedMap.class);

    JavaPairRDD<ByteBuffer, Iterable<SortedMap<ByteBuffer, IColumn>>> groupRdd = rdd.groupByKey();
    System.out.println(groupRdd.count());
}

例外:

java.io.NotSerializableException:java.nio.HeapByteBuffer在java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164)在java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)在java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1483)在java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)在java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)在java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java: 330)在org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)在org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179)在org.apache.spark.scheduler.ShuffleMapTask $$ anonfun $ runTask $ 1.适用(ShuffleMapTask.scala:161)在org.apache.spark.scheduler.ShuffleMapTask $$ anonfun $ runTask $ 1.适用(ShuffleMapTask.scala:158)在scala.collection.Iterator $ class.foreach (Iterator.scala:727)org.apache.spark.InterruptibleIterator.foreach(Interr) uptibleIterator.scala:28)在org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)在org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)在org.apache.spark .scheduler.Task.run(Task.scala:51)在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:187)在java.util.concurrent.ThreadPoolExecutor中的$ Worker.runTask(ThreadPoolExecutor.java :895)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:918)at java.lang.Thread.run(Thread.java:662)

我想要做的是将所有行键列合并为一个条目 . 当我尝试使用reduceByKey()方法时,我也得到了相同的异常:

JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> reducedRdd = rdd.reduceByKey(
    new Function2<SortedMap<ByteBuffer, IColumn>, SortedMap<ByteBuffer, IColumn>, sortedMap<ByteBuffer, IColumn>>() {
        public SortedMap<ByteBuffer, IColumn> call(SortedMap<ByteBuffer, IColumn> arg0,
            SortedMap<ByteBuffer, IColumn> arg1) throws Exception {
            SortedMap<ByteBuffer, IColumn> sortedMap = new TreeMap<ByteBuffer, IColumn>(arg0.comparator());
            sortedMap.putAll(arg0);
            sortedMap.putAll(arg1);
            return sortedMap;
        }
    }
);

我在用:

  • spark-1.0.0-bin-hadoop1

  • Cassandra 1.2.12

  • Java 1.6

有谁知道问题是什么?什么是序列化失败?

谢谢,
夏嘉曦

1 回答

  • 4

    您的问题可能是由于尝试序列化ByteBuffers引起的 . 它们不是可序列化的,您需要在生成RDD之前将它们转换为字节数组 .

    您应该尝试使用Spark的官方DataStax Cassandra驱动程序here

相关问题