我目前有以下设置:
应用程序将数据写入Kafka - > SparkStreaming读取存储的数据(始终从最早的条目读取)并转换到流 - >应用程序需要此结果的RDD来训练mllib模型 .
我想基本上实现类似于https://github.com/keiraqz/anomaly-detection的东西 - 但是我的数据不是来自文件而是来自kafka并且需要在Spark中进行一些重新处理以从输入中提取训练数据 .
读取数据并在流中处理它是没有问题的 . 但是将它提供给主线程以进行进一步处理根本不起作用 .
是否有一种简单的方法让流在一定时间内使用数据,将此时读取的所有内容写入某种数据结构,然后使用此数据结构进行进一步处理?
我到目前为止尝试的是在流外部设置RDD然后使用:
spanDurationVectorStream.foreachRDD { rdd =>
if(rdd.count()==0){
flag = 1
}
bufferRdd.union(rdd)
}
Logger.getRootLogger.setLevel(rootLoggerLevel)
ssc.start()
while (flag == 0) {
Thread.sleep(1)
}
Thread.sleep(1)
但是,从来没有添加到bufferRdd中的东西 - 它仍然是我需要初始化它的单个条目 .
我正在使用scala 2.11在2.1.1版上运行所有需要的Spark库
如果您需要任何进一步的信息,我会尽我所能为您提供所需的一切 .
任何帮助将不胜感激 .
编辑:快速总结@maasg的惊人提示 - 当他给我接受它们作为答案的可能性时,我会高兴地这样做:
第一:要解决RDD问题,可以将代码更改为以下内容:
spanDurationVectorStream.foreachRDD { rdd =>
if(rdd.count()==0){
flag = 1
}
bufferRdd = bufferRdd.union(rdd)
}
Logger.getRootLogger.setLevel(rootLoggerLevel)
ssc.start()
while (flag == 0) {
}
由于RDD是不可变的,每个rdd.union将返回一个必须保存的新RDD(How history RDDs are preserved for further use in the given code) . Thread.sleep(1)
电话根本就没必要 . 通过此设置,我可以使用RDD来训练模型 .
但是@maasg补充说他会建议训练场景不要使用Spark Streaming但是基本的Spark,如Read Kafka topic in a Spark batch job所述
对于我来说,目前唯一尚不清楚的部分是如何有效地获取最早和最新的偏移量来接收执行时存储在Kafka中的完整内容 .