Long answer - 我有一个类似的要求,我的回答是基于你正在阅读来自不同kafka主题的不同流的假设 .
从不同主题中读取,在单个来源中汇集不同的事件:
FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
Arrays.asList("topicStream1", "topicStream2", "topicStream3"),
new StringSerializerToEvent(),
props);
kafkaSource.assignTimestampsAndWatermarks(new
TimestampAndWatermarkGenerator());
DataStream<BAMEvent> events = env.addSource(kafkaSource)
.filter(Objects::nonNull);
序列化程序读取数据并将它们解析为具有通用格式 - 例如 .
@Data
public class BAMEvent {
private String keyid; //If key based partitioning is needed
private String eventName; // For different types of events
private String eventId; // Any other field you need
private long timestamp; // For event time based processing
public String toString(){
return eventName + " " + timestamp + " " + eventId + " " + correlationID;
}
}
1 回答
Short Answer - 是的,您可以根据不同流源中的事件类型读取和处理多个流和触发规则 .
Long answer - 我有一个类似的要求,我的回答是基于你正在阅读来自不同kafka主题的不同流的假设 .
从不同主题中读取,在单个来源中汇集不同的事件:
序列化程序读取数据并将它们解析为具有通用格式 - 例如 .
在此之后,事情非常简单,根据事件名称定义规则并比较事件名称以定义规则(您还可以按如下方式定义复杂规则):
我希望这能让您将一个或多个不同的流集成在一起 .