首页 文章

Kafka Streams在分组和聚合时使用KTable转换为字符串问题

提问于
浏览
3

我有一个Kafka流,其传入消息看起来像 sensor_code: x, time: 1526978768, address: Y 我想创建一个KTable,它存储每个传感器代码的每个唯一地址 .

KTable

KTable<String, Long> numCount = streams
            .map(kvm1)
            .groupByKey(Serialized.with(stringSerde, stringSerde))
            .count()
            .groupBy(kvm2, Serialized.with(stringSerde, longSerde))
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("StateStore"));

kvm1kvm2 是我自己的 KeyValueMappers . 我的想法是用 sensor_code=x, address=y 替换现有密钥,执行 groupByKey()count() . 然后另一个 groupBy(kvm2, Serialized.with(stringSerde, longSerde)) ,其中 kvm2 修改现有的 key 以包含 sensor_code ,然后该值将是其计数 . 但由于它不起作用,也许我做错了...它试图将它转换为Long并抛出异常,因为它正在寻找一个String . 我希望伯爵为 Long ,对吗?

这是我使用的第一个 KeyValueMapper 各自的帮助功能:

private static String getKeySensorIdAddress(String o) {
    String x = "sensor_id=\"x\", address=\"y\""; 
    try {
        WifiStringEvent event = mapper.readValue(o, WifiStringEvent.class);
        x = x.replace("x", event.getSensor_code());
        x = x.replace("y", event.getAddress());
        return x;
    } catch(Exception ex) {
        System.out.println("Error... " + ex);
        return "Error";
    }
}
        //KeyValueMapper1
KeyValueMapper<String, String, KeyValue<String, String>> kvm1 = 
    new KeyValueMapper<String, String, KeyValue<String, String>>() {
         public KeyValue<String, String> apply(String key, String value) {
             return new KeyValue<>(getKeySensorIdAddress(value), value);
         }
    };

这是第二个 KeyValueMapper 及其帮助功能 .

private static String getKeySensorId(String o) {
    int a = o.indexOf(",");
    return o.substring(0,a);
}

        //KeyValueMapper2 
    KeyValueMapper<String, Long, KeyValue<String, Long>> kvm2 = 
    new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
         public KeyValue<String, Long> apply(String key, Long value) {
             return new KeyValue<>(getKeySensorId(key), value);
         }
    };

这是我尝试运行代码时返回的异常和错误 .

[2018-05-29 15:28:40,119] ERROR stream-thread [testUniqueAddresses-ed48daf8-fff0-42e4-bb5a-687584734b45-StreamThread-1]由于以下错误,无法处理流任务2_0:(org.apache .kafka.streams.processor.internals.AssignedStreamsTasks:105)java.lang.ClassCastException:java.lang.Long无法在org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer . )中强制转换为java.lang.String . java:28)atg.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178)org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore $ 1.innerValue(MeteredKeyValueBytesStore.java:66)at at在org.apache的org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198)中的org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore $ 1.innerValue(MeteredKeyValueBytesStore.java:57) org.apache.kafka.streams.kstream.i上的.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) nternals.KTableAggregate $ KTableAggregateProcessor.process(KTableAggregate.java:95)at org.apache.kafka.streams.kstream.internals.KTableAggregate $ KTableAggregateProcessor.process(KTableAggregate.java:56)

请注意 java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String 错误 .

任何想法,为什么我得到这个错误,我如何解决它或建议我如何编辑代码,以达到我所提到的所需输出?

提前谢谢了!

EDIT: 自从我放弃了其中一种方法以来,对我的问题进行了重大改革 .

1 回答

相关问题