我试图打开一个火花流并听一个Kafka主题 - 并做一些基本的逻辑 . 我按照教程进行了简单的单词计数 . 执行以下代码时,我不断收到相同的错误

16/03/29 11:21:40 ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
    at scala.Predef$.require(Predef.scala:233)

但是,在每个操作之后,您可以在下面的代码片段中看到 - 我正在使用 print() 函数,因此应该注册这些函数 . 我最后也有 start()awaitTermination() 函数,因此所有函数都应该按顺序排列 . 我正在使用spark版本1.4.1(流媒体和kafka流媒体依赖) .

// Create streaming context
    SparkConf conf = new SparkConf().setMaster(this.getApplicationMode()).setAppName(this.getApplicationName()); 
    JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(this.getBatchDuration()));

    // Add checkpoint only if we specified...
    if(this.getCheckpointPath() != null) {
        ssc.checkpoint(this.getCheckpointPath());
    }

    // Prepare configurations, such as topic to listen to, and also the kafka broker list
    HashSet<String> topicsSet = new HashSet<>(Arrays.asList(this.getKafkaTopicName().split(",")));
    HashMap<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("metadata.broker.list", this.getBrokerList());

    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
            ssc,
            String.class,
            String.class,
            kafka.serializer.StringDecoder.class,
            kafka.serializer.StringDecoder.class,
            kafkaParams,
            topicsSet
    );

    messages.print();

    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
        @Override
        public String call(Tuple2<String, String> tuple2) {
            return tuple2._2();
        }
    });

    lines.print();

    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

        @Override
        public Iterable<String> call(String s) throws Exception {
            TreeSet treeSet = new TreeSet<String>();
            treeSet.add(s);

            Iterable<String> it = treeSet;

            return it;
        }
    });

    words.print();

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
            new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) {
                    return new Tuple2<>(s, 1);
                }
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
        }
    });

    wordCounts.print();

    // Trigger!
    ssc.start();

    // Await stopping of the service...
    ssc.awaitTermination();

什么指针,我可能会失踪?