我正在使用spark streaming将一些数据从kafka中提取到cassandra中 . 来自kafka的数据是json格式,看起来像这样

{“message”:“从kafka appender测试异常”,“loggerName”:“com ... KafkaAppenderTest”,“params”:null,“complete”:“假例外”}

这是我从kafka消息创建流的代码

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,
            StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);

映射流以提取json消息并解析以形成 LogEvent 对象

messages.map(new Function<Tuple2<String, String>, LogEvent>() {
        @Override
        public LogEvent call(Tuple2<String, String> v1) throws Exception {
            Map<String, Object> map = mapper.readValue(v1._2, new TypeReference<Map<String, Object>>() {
            });
            return new LogEvent(map);
        }
    })

最后,将每个rdd写入cassandra

.foreach(new Function2<JavaRDD<LogEvent>, Time, Void>() {
        @Override
        public Void call(JavaRDD<LogEvent> rdd, Time v2) throws Exception {
            javaFunctions(rdd).writerBuilder("myks", "logs", mapToRow(LogEvent.class)).saveToCassandra();
            return null;
        }
    })

这工作正常但我不想将json字符串转换为 LogEvent 对象,相反,我想将json字符串传递给cassandra并利用其json解析功能直接从json将数据插入表中 . 这样,我不必知道json中会发生什么,只要列名匹配,数据将/应该映射到表 . 有没有办法做到这一点?