从Flink 1.5发布公告中,我们知道Flink现在支持“广播状态”,并且描述了“广播状态解锁Flink的CEP库的”动态模式“功能的实现 . ”
这是否意味着目前我们可以使用“广播状态”来实现没有Flink CEP的“动态模式”?另外我不知道在有或没有广播状态的Flink CEP实现“动态模式”时有什么区别?我将不胜感激如果有人可以用代码举例解释差异 .
=============
Updating for testing broadcast data-stream by operator broadcast() with keyed-datastream
在Flink 1.4.2中测试后,我发现广播数据流(由旧操作员broadcast())可以与键控数据流连接,下面是测试代码,我们发现所有控制流事件都广播到所有操作员实例 . 所以似乎旧的broadcast()可以实现与新的“广播状态”相同的功能 .
public static void ConnectBroadToKeyedStream() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);
List<Tuple1<String>>
controlData = new ArrayList<Tuple1<String>>();
controlData.add(new Tuple1<String>("DROP"));
controlData.add(new Tuple1<String>("IGNORE"));
DataStream<Tuple1<String>> control = env.fromCollection(controlData);//.keyBy(0);
List<Tuple1<String>>
dataStreamData = new ArrayList<Tuple1<String>>();
dataStreamData.add(new Tuple1<String>("data"));
dataStreamData.add(new Tuple1<String>("DROP"));
dataStreamData.add(new Tuple1<String>("artisans"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
// DataStream<String> data2 = env.fromElements("data", "DROP", "artisans", "IGNORE");
DataStream<Tuple1<String>> keyedDataStream = env.fromCollection(dataStreamData).keyBy(0);
DataStream<String> result = control
.broadcast()
.connect(keyedDataStream)
.flatMap(new MyCoFlatMap());
result.print();
env.execute();
}
private static final class MyCoFlatMap
implements CoFlatMapFunction<Tuple1<String>, Tuple1<String>, String> {
HashSet blacklist = new HashSet();
@Override
public void flatMap1(Tuple1<String> control_value, Collector<String> out) {
blacklist.add(control_value);
out.collect("listed " + control_value);
}
@Override
public void flatMap2(Tuple1<String> data_value, Collector<String> out) {
if (blacklist.contains(data_value)) {
out.collect("skipped " + data_value);
} else {
out.collect("passed " + data_value);
}
}
}
以下是测试结果 .
1> passed (data)
1> passed (DROP)
3> passed (artisans)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> listed (DROP)
3> listed (IGNORE)
1> listed (DROP)
1> listed (IGNORE)
2> listed (DROP)
2> listed (IGNORE)
https://data-artisans.com/blog/apache-flink-1-5-0-release-announcement
2 回答
在没有广播状态的情况下,两个Flink数据流不能以有状态的方式一起处理,除非它们以完全相同的方式键入 . 广播流可以连接到键控流,但是如果您尝试在RichCoFlatMap中使用键控状态,那么将失败 .
经常需要的是能够将一个具有动态"rules"的流应用于另一个流上的每个事件,而不管密钥 . 需要有一种新的托管Flink状态,可以存储这些规则 . 使用broadcast state,现在可以通过简单的方式完成 .
现在有了这个功能,可以开始在CEP中支持动态模式 .
这是一个代码示例,它在flink 1.5.0 上实现了没有参数的flink原始广播方法和新引入的广播状态 . https://gist.github.com/syhily/932e0d1e0f12b3e951236d6c36e5a7ed
据我所知,广播状态可以在没有flink cep的情况下实现,就像上面显示的代码一样 .
原始
DataStream
的broadcast
方法将创建DataStream
而不是BroadcastConnectedStream
. 这将是原始的 coGroup 设计方案 . 在将度量流与广播的规则流连接之后,我们可以使用ConnectedStreams
中定义的更多流转换函数 . 例如keyBy
函数,这将使广播的流和连接流 have same key 被process
编辑并粘贴 on the same parallelled CoProcessFunction . 所以CoProcessFunction
可能有自己的本地存储 . 除了从ReadOnlyContext
访问的 Map 状态之外,进程函数可以在其字段上具有自定义数据结构 .广播状态可以通过带有
MapStateDescriptor
的broadcast
方法来实现,这意味着广播的流可以多次与其他流连接 . 不同的已连接BroadcastConnectedStream
可以与process
函数中的唯一MapStateDescriptor
共享其自己的广播状态 .我认为这些将是广播与论证和广播状态之间的主要区别 .