首页 文章

是否可以在apache flink CEP中处理多个流?

提问于
浏览
1

我的问题是,如果我们有两个原始事件流,即 SmokeTemperature ,我们想通过将运算符应用于原始流来查明复杂事件,即 Fire 是否已经发生,我们可以在Flink中执行此操作吗?

我问这个问题,因为我到目前为Flink CEP看到的所有例子都只包含一个输入流 . 如果我错了,请纠正我 .

1 回答

  • 2

    Short Answer - 是的,您可以根据不同流源中的事件类型读取和处理多个流和触发规则 .

    Long answer - 我有一个类似的要求,我的回答是基于你正在阅读来自不同kafka主题的不同流的假设 .

    从不同主题中读取,在单个来源中汇集不同的事件:

    FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
            Arrays.asList("topicStream1", "topicStream2", "topicStream3"),
            new StringSerializerToEvent(),
            props);
    
    kafkaSource.assignTimestampsAndWatermarks(new 
    TimestampAndWatermarkGenerator());
    DataStream<BAMEvent> events = env.addSource(kafkaSource)
            .filter(Objects::nonNull);
    

    序列化程序读取数据并将它们解析为具有通用格式 - 例如 .

    @Data
    public class BAMEvent {
     private String keyid;  //If key based partitioning is needed
     private String eventName; // For different types of events
     private String eventId;  // Any other field you need
     private long timestamp; // For event time based processing 
    
     public String toString(){
       return eventName + " " + timestamp + " " + eventId + " " + correlationID;
     }
    
    }
    

    在此之后,事情非常简单,根据事件名称定义规则并比较事件名称以定义规则(您还可以按如下方式定义复杂规则):

    Pattern.<BAMEvent>begin("first")
            .where(new SimpleCondition<BAMEvent>() {
              private static final long serialVersionUID = 1390448281048961616L;
    
              @Override
              public boolean filter(BAMEvent event) throws Exception {
                return event.getEventName().equals("event1");
              }
            })
            .followedBy("second")
            .where(new IterativeCondition<BAMEvent>() {
              private static final long serialVersionUID = -9216505110246259082L;
    
              @Override
              public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {
    
                if (!secondEvent.getEventName().equals("event2")) {
                  return false;
                }
    
                for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
                  if (secondEvent.getEventId = firstEvent.getEventId()) {
                    return true;
                  }
                }
                return false;
              }
            })
            .within(withinTimeRule);
    

    我希望这能让您将一个或多个不同的流集成在一起 .

相关问题