首页 文章

联合Spark Streaming中的Flume接收器列表

提问于
浏览
0

为了增加Spark Streaming Programming指南中建议的并行性,我正在设置多个接收器并尝试联合它们的列表 . 此代码按预期工作:

private JavaDStream<SparkFlumeEvent> getEventsWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) {

        List<JavaReceiverInputDStream<SparkFlumeEvent>> receivers = new ArrayList<>();

        for (String host : hosts) {
            for (String port : ports) {
                receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port)));
            }
        }

        JavaDStream<SparkFlumeEvent> unionStreams = receivers.get(0)
                .union(receivers.get(1))
                .union(receivers.get(2))
                .union(receivers.get(3))
                .union(receivers.get(4))
                .union(receivers.get(5));

        return unionStreams;
    }

但我实际上并不知道我的集群在运行之前会有多少个接收器 . 当我尝试在循环中执行此操作时,我得到了一个NPE .

private JavaDStream<SparkFlumeEvent> getEventsNotWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) {

        List<JavaReceiverInputDStream<SparkFlumeEvent>> receivers = new ArrayList<>();

        for (String host : hosts) {
            for (String port : ports) {
                receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port)));
            }
        }

        JavaDStream<SparkFlumeEvent> unionStreams = null;
        for (JavaReceiverInputDStream<SparkFlumeEvent> receiver : receivers) {
            if (unionStreams == null) {
                unionStreams = receiver;
            } else {
                unionStreams.union(receiver);
            }
        }

        return unionStreams;
    }

错误:

16/09/15 17:05:25错误JobScheduler:org.apache.spark.streaming.DStreamGraph $$ anonfun $ getMaxInputStreamRememberDuration $ 2.apply(DStreamGraph.scala:172)中的作业生成器java.lang.NullPointerException错误.apache.spark.streaming.DStreamGraph $$ anonfun $ getMaxInputStreamRememberDuration $ 2.apply(DStreamGraph.scala:172)at scala.collection.TraversableOnce $$ anonfun $ maxBy $ 1.apply(TraversableOnce.scala:225)at scala.collection.IndexedSeqOptimized $ class.foldl(IndexedSeqOptimized.scala:51)at scala.collection.IndexedSeqOptimized $ class.reduceLeft(IndexedSeqOptimized.scala:68)scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)at scala.collection . 在org.apache的org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)的scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)上的TraversableOnce $ class.maxBy(TraversableOnce.scala:225)组织中的.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:270) .apache.spark.streaming.scheduler.JobGenerator.org $ apache $ spark $ streaming $ scheduler $ jobGenerator $$ processEvent(JobGenerator.scala:182)at org.apache.spark.streaming.scheduler.JobGenerator $$ anon $ 1.onReceive (JobGenerator.scala:87)atg.apache.spark.streaming.scheduler.JobGenerator $$ anon $ 1.onReceive(JobGenerator.scala:86)at org.apache.spark.util.EventLoop $$ anon $ 1.run(EventLoop) .scala:48)16/09/15 17:05:25 INFO MemoryStore:ensureFreeSpace(15128)调用curMem = 520144,maxMem = 555755765 16/09/15 17:05:25 INFO MemoryStore:阻止broadcast_24存储为值内存(估计大小为14.8 KB,自由529.5 MB)org中的org.apache.spark.stream.DStreamGraph $$ anonfun $ getMaxInputStreamRememberDuration $ 2.apply(DStreamGraph.scala:172)中的线程“main”java.lang.NullPointerException异常 . apache.spark.streaming.DStreamGraph $$ anonfun $ getMaxInputStreamRememberDuration $ 2.apply(DStreamGraph.scala:172)at scala.collection.TraversableOnce $$ anonfun $ maxBy $ 1.apply(TraversableOnce.scala:225)at sc ala.collection.IndexedSeqOptimized $ class.foldl(IndexedSeqOptimized.scala:51),位于scala.collection.mlection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47 at scala.collection.TraversableOnce $ class.maxBy(TraversableOnce.scala:225)at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) )org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:270)at org.apache.spark.streaming.scheduler.JobGenerator.org $ apache $ spark $ streaming $ scheduler $ JobGenerator $$ processEvent( JobGenerator.scala:182)atg.apache.spark.streaming.scheduler.JobGenerator $$ anon $ 1.onReceive(JobGenerator.scala:87)at org.apache.spark.streaming.scheduler.JobGenerator $$ anon $ 1.onReceive( JobGenerator.scala:86)at org.apache.spark.util.EventLoop $$ anon $ 1.run(EventLoop.scala:48)

这样做的正确方法是什么?

1 回答

  • 0

    你可以试试下面的代码,它会解决你的问题:

    private JavaDStream<SparkFlumeEvent> getEventsNotWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) {
    
        List<JavaDStream<SparkFlumeEvent>> receivers = new ArrayList<JavaDStream<SparkFlumeEvent>>();
    
        for (String host : hosts) {
            for (String port : ports) {
                receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port)));
            }
        }
    
        return jssc.union(receivers.get(0), receivers.subList(1, receivers.size()));;
    }
    

相关问题