首页 文章

Flink(1.3.2)向每个运营商播放一次记录

提问于
浏览
1

我有一个像这样的Executiongraph:

{"nodes":[{"id":1,"type":"Source: AggregatedData","pact":"Data Source","contents":"Source: AggregatedData","parallelism":1},{"id":2,"type":"AddVirtualKeyFunction","pact":"Operator","contents":"AddVirtualKeyFunction","parallelism":4,"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":3,"type":"Source: FilterInformation","pact":"Data Source","contents":"Source: FilterInformation","parallelism":1},{"id":4,"type":"BroadcastFilterInformation","pact":"Operator","contents":"BroadcastFilterInformation","parallelism":1,"predecessors":[{"id":3,"ship_strategy":"FORWARD","side":"second"}]},{"id":7,"type":"ConnectAndApplyFilterFunction","pact":"Operator","contents":"ConnectAndApplyFilterFunction","parallelism":4,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"},{"id":4,"ship_strategy":"HASH","side":"second"}]},{"id":8,"type":"Sink: OutputFilteredData","pact":"Data Sink","contents":"Sink: OutputFilteredData","parallelism":4,"predecessors":[{"id":7,"ship_strategy":"FORWARD","side":"second"}]}]}

(可以在这里看到:https://flink.apache.org/visualizer/

我有一个聚合数据流(“AggregatedData”,ID = 1),需要通过来自另一个流的一些过滤器(“FilterInformation”,ID = 3)进行过滤 .

我最初做的是在我的“ConnectAndApplyFilterFunction”(ID = 7)中使用运算符状态,这在技术上工作正常,但仅限于1的并行性 .

目前,我正在做一些黑客攻击:在"AddVirtualKeyFunction"中,我将聚合数据映射到 Tuple2<Integer, AggregatedData> ,其中整数(f0)是从0到19的随机生成的数字:

@Override
public Tuple2<Integer, AggregatedData> map(AggregatedData value) throws Exception {
    return new Tuple2<>(ThreadLocalRandom.current().nextInt(this.virtualKeyCount), value);
}

"BroadcastFilterInformation"是一个flatMap,每次收到一个新的FilterInformation时都会发布 Tuple2<Integer, FilterInfo> 20 Times(f0 0-19):

@Override
public void flatMap(FilterInfo filterInfo, Collector<Tuple2<Integer, FilterInfo>> collector) throws Exception {
    if (this.currentLatestTimestamp < filterInfo.getLastUpdateTime()) {
        this.currentLatestTimestamp = filterInfo.getLastUpdateTime();

        for (int i = 0; i < this.broadcastCount; i++) {
            collector.collect(new Tuple2<>(i, filterInfo));
        }
    }
}

我现在连接两个流并通过"virtual key"( Tuple2.f0 )键入它们 . 我在"ConnectAndapplyFilterFunction"(ID = 7)中将我的 FilterInfo 的20份副本保存在键控状态 .

工作正常,我可以在我的主要道路上使用并行性 . 但为什么我使用20“虚拟键”而我的并行度只有4?因为只有4个键,所以不会使用所有运算符(2个运算符在我的测试中没有接收到任何数据) .

有什么办法可以从一个流中广播一些数据,以便另一端的每个操作员都能收到它自己的副本吗?

1 回答

  • 1

    您最有可能使用 broadcast 选项使数据可用于操作中的其他实例 .

    如果是 batch processing ,您可以使用Broadcast variables,根据链接的网站描述如下,在那里也可以找到相应的示例:

    除了常规的操作输入之外,广播变量还允许您为操作的所有并行实例创建数据集 . 这对辅助数据集或数据相关参数化非常有用 . 然后,操作员可以将数据集作为集合访问 .

    如果是 stream processing ,您可以添加 datastream.broadcast() 以将流广播到另一个 .

    根据flink website - 广播功能 - 将元素(从一个流)广播到每个分区 .

    在流处理场景中,您需要提醒自己需要考虑竞争条件,因为来自任一流的数据可以按任何顺序排列 .

    可以检出示例代码here

相关问题