首页 文章

如何从Apache Flink中的数据库中查找和更新记录的状态?

提问于
浏览
12

我正在研究数据流应用程序,我正在调查使用Apache Flink进行此项目的可能性 . 主要原因是它支持漂亮的高级流构造,非常类似于Java 8的Stream API .

我将接收与数据库中的特定记录相对应的事件,并且我希望能够处理这些事件(来自诸如RabbitMQ或Kafka之类的消息代理)并最终更新数据库中的记录并推送已处理的/将事件转换为另一个接收器(可能是另一个消息代理)

理想情况下,与特定记录相关的事件需要以FIFO顺序进行处理(尽管会有一个时间戳有助于检测无序事件),但可以并行处理与不同记录相关的事件 . 我打算使用 keyBy() 构造来按记录分区流 .

需要完成的处理取决于数据库中有关记录的当前信息 . 但是,我无法找到一个示例或建议的方法来查询数据库以获取此类记录,以便使用我需要处理它的其他信息来丰富正在处理的事件 .

我想到的管道如下:

  • keyBy()对接收到的id - >从数据库中检索对应id的记录 - >对记录执行处理步骤 - >将处理后的事件推送到外部队列并更新数据库记录

需要更新数据库记录,因为另一个应用程序将查询数据 .

在实现此管道之后,可能会有额外的优化措施 . 例如,可以将(更新的)记录缓存在托管状态,以便同一记录上的下一个事件不需要另一个数据库查询 . 但是,如果应用程序不知道特定记录,则需要从数据库中检索它 .

Apache Flink中用于此类场景的最佳方法是什么?

1 回答

  • 7

    您可以通过扩展丰富的功能来执行数据库查找 . RichFlatMap 函数,在 open() 方法中初始化数据库连接一次,然后处理 flatMap() 方法中的每个事件:

    public static class DatabaseMapper extends RichFlatMapFunction<Event, EncrichedEvent> {
    
        // Declare DB coonection and query statements
    
        @Override
        public void open(Configuration parameters) throws Exception {
          // Initialize Database connection
          // Prepare Query statements
        }
    
        @Override
        public void flatMap(Event currentEvent, Collector<EncrichedEvent> out) throws Exception {
          // look up the Database, update record, enrich event
          out.collect(enrichedEvent);        
        }
    })
    

    然后你可以使用 DatabaseMapper 如下:

    stream.keyby(id)
          .flatmap(new DatabaseMapper())
          .addSink(..);
    

    您可以使用Redis的缓存数据找到here示例 .

相关问题