我试图打开一个火花流并听一个Kafka主题 - 并做一些基本的逻辑 . 我按照教程进行了简单的单词计数 . 执行以下代码时,我不断收到相同的错误
16/03/29 11:21:40 ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
但是,在每个操作之后,您可以在下面的代码片段中看到 - 我正在使用 print()
函数,因此应该注册这些函数 . 我最后也有 start()
和 awaitTermination()
函数,因此所有函数都应该按顺序排列 . 我正在使用spark版本1.4.1(流媒体和kafka流媒体依赖) .
// Create streaming context
SparkConf conf = new SparkConf().setMaster(this.getApplicationMode()).setAppName(this.getApplicationName());
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(this.getBatchDuration()));
// Add checkpoint only if we specified...
if(this.getCheckpointPath() != null) {
ssc.checkpoint(this.getCheckpointPath());
}
// Prepare configurations, such as topic to listen to, and also the kafka broker list
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(this.getKafkaTopicName().split(",")));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", this.getBrokerList());
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
ssc,
String.class,
String.class,
kafka.serializer.StringDecoder.class,
kafka.serializer.StringDecoder.class,
kafkaParams,
topicsSet
);
messages.print();
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
lines.print();
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) throws Exception {
TreeSet treeSet = new TreeSet<String>();
treeSet.add(s);
Iterable<String> it = treeSet;
return it;
}
});
words.print();
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
// Trigger!
ssc.start();
// Await stopping of the service...
ssc.awaitTermination();
什么指针,我可能会失踪?