首页 文章

Spark Streaming - Java - 将Kafka中的JSON插入Cassandra

提问于
浏览
0

我正在使用Java在Spark Streaming中编写一个简单的数据管道,从Kafka中提取JSON数据,将JSON解析为自定义类( Transaction ),然后将该数据插入到Cassandra表中,但我无法获得 mapToRow() 函数上班 .

我已经看到很多例子说你需要做的就是这样:

JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
        streamingContext,
        String.class, 
        String.class, 
        StringDecoder.class, 
        StringDecoder.class,
        kafkaParams,
        topicsSet
);

JavaDStream<String> lines = stream.map(
    new Function<Tuple2<String,String>, String>(){
        @Override
        public String call(Tuple2<String,String> tuple2) {
            return tuple2._2();
        }
    }
);

javaFunctions(lines).writerBuilder("myKeyspace", "myTableName", mapToRow(Transaction.class)).saveToCassandra();

但是,当我这样做时,我收到错误:

The method mapToRow(Class<Transaction>) is undefined for the type SaveTransactions

我认为我所缺乏的是我班上的某种装饰,但我还没有成功找出哪一个 . 我试过裸骨,基本上让这个类成为一个属性包:

public class Transaction implements java.io.Serializable{

    public int TransactionId;
    ...

    public Transaction(){}
}

我已经尝试了所有的DataStax映射注释:

@Table(keyspace = "myKeyspace", name = "myTableName",
       readConsistency = "QUORUM",
       writeConsistency = "QUORUM",
       caseSensitiveKeyspace = false,
       caseSensitiveTable = false)
public class Transaction implements java.io.Serializable{

    @PartitionKey(0)
    @Column(name="transaction_id")
    public int TransactionId;
    ...

    public Transaction(){}
}

我还尝试为每个属性 Build 公共get / set方法并将属性设置为private:

public class Transaction implements java.io.Serializable{

    private int transactionId;
    ...

    public Transaction(){}

    public int getTransactionId() {
        return transactionId;
    }

    public void setTransactionId(int transactionId) {
        this.transactionId = transactionId;
    }
}

我已经能够使用下面的类将 DStream 解析为 RDDTransactions

public class Transaction implements java.io.Serializable{

    ...

    public static class ParseJSON implements FlatMapFunction<Iterator<String>, Transaction> {
        public Iterable<Transaction> call(Iterator<String> lines) throws Exception {
            ArrayList<Transaction> transactions = new ArrayList<Transaction>();
                ObjectMapper mapper = new ObjectMapper();
                while (lines.hasNext()) {
                    String line = lines.next();
                    try {
                        transactions.add(mapper.readValue(line, Transaction.class));
                    } catch (Exception e) {
                        System.out.println("Skipped:" + e);
                    }
                }

                return transactions;
        }
    }
}

结合以下代码,从上面作用于 lines 对象:

JavaDStream<Transaction> events = lines.mapPartitions(new Transaction.ParseJSON());

但是,一旦我从中获得它仍然无法使用writeBuilder() . saveToCassandra()链 .

非常感谢任何帮助 .

1 回答

  • 0

    事实证明这个问题只是一个导入问题 . 我导入 com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.* 认为它会给我我需要的一切但是我还需要为.mapToRow()函数引入 com.datastax.spark.connector.japi.CassandraJavaUtil.* .

    一旦我解决了这个问题,我就开始收到以下错误:

    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/package$ScalaReflectionLock$
        at org.apache.spark.sql.catalyst.ReflectionLock$.<init>(ReflectionLock.scala:5)
        at org.apache.spark.sql.catalyst.ReflectionLock$.<clinit>(ReflectionLock.scala)
        at com.datastax.spark.connector.mapper.ReflectionColumnMapper.<init>(ReflectionColumnMapper.scala:38)
        at com.datastax.spark.connector.mapper.JavaBeanColumnMapper.<init>(JavaBeanColumnMapper.scala:10)
        at com.datastax.spark.connector.util.JavaApiHelper$.javaBeanColumnMapper(JavaApiHelper.scala:93)
        at com.datastax.spark.connector.util.JavaApiHelper.javaBeanColumnMapper(JavaApiHelper.scala)
        at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1204)
        at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1222)
        at globalTransactions.Process.main(Process.java:77)
    Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.package$ScalaReflectionLock$
        at java.net.URLClassLoader.findClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        ... 9 more
    

    通过引入spark-sql项目解决了这个问题:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>
    

    希望这有助于下一个人/加仑 .

相关问题