这是关于连接键控流的一个非常基本的问题 .
如果我有两个流相关事件共享相同的逻辑密钥,并且这些流正在连接(使用密钥逻辑连接)并且这都运行并行> 1,那么Flink如何保证来自不同流的两个事件具有相同的逻辑键最终在同一个并行运算符实例中?
以下是使用 ConnectedStream
和 CoFlatMapFunction
按患者姓名查询医院's patient streams - temperature stream and heartbeat stream. We want to join these two stream'的示例 .
DataStream<PatientTemperature> temperatureStream = ..
DataStream<HeartbeatStream> heartbeatStream = ..
temperatureStream
.keyBy(pt -> pt.getPatientId())
.connect (heartBeatStream.keyBy(hbt -> hbt.getPatientId() )
.flatMap (new RichCoFlatMapFunction() {
ValueState<PatientTemperatureAndHeartBeat> state = ...
public void flatMap1(PatientTemperature value, Collector<PatientTemperatureAndHeartBeat> out) {
state.value().setTemperature(value);
}
public void flatMap2(PatentHeartbeat value, Collector<PatientTemperatureAndHeartBeat> out) {
PatientTemperatureAndHeartBeat temperatureAndHeartBeat = state.value()
temperatureAndHeartBeat.setHeartBeat(value)
out.collect(temperatureAndHeartBeat);
}
});
假设这是使用parallelism = 3运行的,运算符任务A,B,C,它们都在不同的物理机器上运行 .
Flink将保证患者"JohnDoe"的所有 Temperature
事件都将在同一个并行运算符实例中结束 . 说它最终在运营商B.
但是当Flink收到"JohnDoe"的 HeartBeat
事件时,它如何知道将它们发送到操作员B,在那里病人的 Temperature
事件被发送 . 除非 Temperature
和 HeartBeat
事件都发送到同一个并行实例运算符,否则连接将不起作用 .
两个流使用相同的逻辑密钥(即患者的id)的事实是特定于应用程序的,而Flink不知道 . 这两个连接的流可以使用彼此无关的自己的密钥 .
1 回答
当然,密钥的选择是特定于应用程序的 . 但是,Flink知道如何访问密钥,因为您提供了密钥选择器功能(
pt -> pt.getPatientId()
和hbt -> hbt.getPatientId()
) . Flink确保两个流的密钥具有相同的类型,并在两个流上应用相同的散列函数以确定将记录发送到何处 .因此,两个流的相同值被传送到同一个运算符实例 .