首页 文章

Spark SQL Streaming问题

提问于
浏览
0

我们正在尝试使用 Spark StreamingSpark SQL 来实现一个用例,该用例允许我们针对某些数据运行用户定义的规则(请参阅下文,了解如何捕获和使用数据) . 我们的想法是使用SQL来指定规则并将结果作为警报返回给用户 . 基于每个传入的事件批处理执行查询似乎非常慢 . 如果有人能提出更好的方法来实现这个用例,我将不胜感激 . 另外,想知道Spark是否在驱动程序或工作程序上执行sql?提前致谢 . 以下是我们为实现这一目标而采取的步骤 -

1)从外部数据库加载初始数据集作为JDBCRDD

JDBCRDD<SomeState> initialRDD = JDBCRDD.create(...);

2)创建传入的DStream(捕获对初始化数据的更新)

JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
            FlumeUtils.createStream(ssc, flumeAgentHost, flumeAgentPort);

JavaDStream<SomeState> incomingDStream = flumeStream.map(...);

3)使用传入的DStream创建Pair DStream

JavaPairDStream<Object,SomeState> pairDStream =
            incomingDStream.map(...);

4)使用初始化的RDD作为基本状态,从DStream对创建有状态DStream

JavaPairDStream<Object,SomeState> statefulDStream = pairDStream.updateStateByKey(...);

JavaRDD<SomeState> updatedStateRDD = statefulDStream.map(...);

5)根据传入流中的值,针对更新状态运行用户驱动的查询

incomingStream.foreachRDD(new Function<JavaRDD<SomeState>,Void>() {

            @Override
            public Void call(JavaRDD<SomeState> events) throws Exception { 

                updatedStateRDD.count();
                SQLContext sqx = new SQLContext(events.context());
                schemaDf = sqx.createDataFrame(updatedStateRDD, SomeState.class);
                schemaDf.registerTempTable("TEMP_TABLE");
                sqx.sql(SELECT col1 from TEMP_TABLE where <condition1> and <condition2> ...);

                //collect the results and process and send alerts
                ...

            }
);

2 回答

相关问题