首页 文章

如何使用可查询状态客户端在flink中获取多个keyBy的状态?

提问于
浏览
3

我正在使用Flink 1.4.2,我有一个场景,我需要使用两个键 . 对于例如

KeyedStream<UsageStatistics, Tuple> keyedStream = stream.keyBy("clusterId", "ssid");
usageCounts = keyedStream.process(new CustomProcessFunction(windowSize,queryableStateName));

值描述会

ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, SsidTotalUsage.class);
        descriptor.setQueryable(queryableStateName);

任何人都可以建议我使用可查询状态客户端获取状态在flink中的多个键?

在QueryableClient下面,单个键'clusterId'运行良好 .

kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, clusterId, BasicTypeInfo.STRING_TYPE_INFO, descriptor);

对于多个键, type_info 应该是什么?任何与此相关的建议/示例或参考都会非常有用吗?

2 回答

  • 3

    我找到了解决方案 .

    我在valueStateDescription中给出了TypeHint .

    In Flink Job:

    TypeInformation<SsidTotalUsage> typeInformation = TypeInformation.of(new TypeHint<SsidTotalUsage>() {});
    
    ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);
    

    On Client Side:

    ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);
    

    我有两个键,所以我使用了 Tuple2 类并设置了我的键的值,如下所示 . Note: 如果你有两个以上的键,那么你必须根据你的键选择Tuple3,Tuple4类 .

    Tuple2<String, String> tuple = new Tuple2<>();
     tuple.f0 = clusterId;
     tuple.f1 = ssid;
    

    然后我提供了TypeHint .

    TypeHint<Tuple2<String, String>> typeHint = new TypeHint<Tuple2<String, String>>() {};
    
    CompletableFuture<ValueState<SsidTotalUsage>> kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, tuple, typeHint, descriptor);
    

    在上面的代码中, getState 方法将返回 ImmutableValueState 所以我需要得到我的pojo,如下所示 .

    ImmutableValueState<SsidTotalUsage> state = (ImmutableValueState<SsidTotalUsage>) kvState.get();
    
    totalUsage = state.value();
    
  • 0

    感谢您将解决方案发布到您自己的问题上,这对其他人有帮助 . Flink文档是否遗漏了这些信息?如果是这样,请打开一个Jira问题来添加它,因为它会让更多人受益,谢谢 .

相关问题