我们正在尝试使用 Spark Streaming 和 Spark SQL 来实现一个用例,该用例允许我们针对某些数据运行用户定义的规则(请参阅下文,了解如何捕获和使用数据) . 我们的想法是使用SQL来指定规则并将结果作为警报返回给用户 . 基于每个传入的事件批处理执行查询似乎非常慢 . 如果有人能提出更好的方法来实现这个用例,我将不胜感激 . 另外,想知道Spark是否在驱动程序或工作程序上执行sql?提前致谢 . 以下是我们为实现这一目标而采取的步骤 -
1)从外部数据库加载初始数据集作为JDBCRDD
JDBCRDD<SomeState> initialRDD = JDBCRDD.create(...);
2)创建传入的DStream(捕获对初始化数据的更新)
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
FlumeUtils.createStream(ssc, flumeAgentHost, flumeAgentPort);
JavaDStream<SomeState> incomingDStream = flumeStream.map(...);
3)使用传入的DStream创建Pair DStream
JavaPairDStream<Object,SomeState> pairDStream =
incomingDStream.map(...);
4)使用初始化的RDD作为基本状态,从DStream对创建有状态DStream
JavaPairDStream<Object,SomeState> statefulDStream = pairDStream.updateStateByKey(...);
JavaRDD<SomeState> updatedStateRDD = statefulDStream.map(...);
5)根据传入流中的值,针对更新状态运行用户驱动的查询
incomingStream.foreachRDD(new Function<JavaRDD<SomeState>,Void>() {
@Override
public Void call(JavaRDD<SomeState> events) throws Exception {
updatedStateRDD.count();
SQLContext sqx = new SQLContext(events.context());
schemaDf = sqx.createDataFrame(updatedStateRDD, SomeState.class);
schemaDf.registerTempTable("TEMP_TABLE");
sqx.sql(SELECT col1 from TEMP_TABLE where <condition1> and <condition2> ...);
//collect the results and process and send alerts
...
}
);
2 回答
第一步应该是确定大部分时间采取的步骤 . 请参阅Spark Master UI并确定大多数时间所用的步骤/阶段 .
我可以考虑的观察结果很少: -
使用Singleton SQLContext - 请参阅示例 - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
updateStateByKey可以是大量密钥的情况下的内存密集型操作 . 您需要检查updateStateByKey函数处理的数据大小,以及它是否适合给定的内存 .
你的GC表现如何?
你真的在使用"initialRDD"吗?如果没有,那么不加载它 . 如果它是静态数据集,则缓存它 .
检查SQL查询所花费的时间 .
以下是可以帮助您的更多问题/领域
DStreams的StorageLevel是什么?
群集的大小和群集的配置
版的Spark?
最后 - ForEachRDD是一个输出操作,它在驱动程序上执行给定的功能,但RDD可能会执行操作,并且这些操作在工作节点上执行 .
您可能需要阅读本文以更好地解释输出操作 - http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
我也面临同样的问题,如果你有相同的解决方案,请告诉我吗?虽然我在下面的帖子中提到了详细的用例 .
Spark SQL + Window + Streming Issue - Spark SQL query is taking long to execute when running with spark streaming