首页 文章
  • 4 votes
     answers
     views

    Kafka Streams:如何更改记录时间戳(0.11.0)?

    我正在使用FluentD(第12版稳定版)向Kafka发送消息 . 但是FluentD使用旧的KafkaProducer,因此记录时间戳始终设置为-1 . 因此,我必须使用WallclockTimestampExtractor将记录的时间戳设置为当消息到达kafka时的时间点 . 我真正感兴趣的时间戳是由流利的信息发送的: “timestamp”:“1507885936”,“host”:“V.X...
  • 4 votes
     answers
     views

    我应该使用什么:Kafka Stream或Kafka消费者api或Kafka连接

    我想知道什么对我最好:Kafka流或Kafka消费者api或Kafka连接? 我想从主题中读取数据,然后进行一些处理并写入数据库 . 所以我写了消费者,但我觉得我可以编写Kafka流应用程序并使用它的有状态处理器来执行任何更改并将其写入数据库,这可以消除我的消费者代码,只需要编写db代码 . 我要插入我的记录的数据库是:HDFS - (插入原始JSON)MSSQL - (处理过的json) 另...
  • 1 votes
     answers
     views

    使用Kafka Stream和节点js进行字数统计

    我是apache kafka的新手 . 我正在尝试使用node.js实现字数计数示例usig kafka streaming . 我正在使用https://www.npmjs.com/package/kafka-streams库 . 我的项目设置如下: 我将使用apache kafka提供的producer-pref-test在一个主题上生成1,00,000条消息 . 我将创建一个消费者,他将使用...
  • 0 votes
     answers
     views

    翻滚窗口概念 Kafka 流

    我正在学习kafka流中的翻滚窗口概念 . 因此,基于https://github.com/timothyrenner/kafka-streams-ex/tree/master/tumbling-window中提到的代码,我创建了我的jar并在我的Linux VM上运行它们(即 生产环境 者和消费者代码都在我的VM上运行) . 我在这里有一个问题(我的理解可能不正确) . 因此,当我在主题 lon...
  • 2 votes
     answers
     views

    Kafka Streams - 来自保留政策主题的KTable

    我正在尝试使用kafka流,我有以下设置: 我有一个现有的kafka主题,其密钥空间无限制(但可预测且众所周知) . 我的主题有一个保留策略(以字节为单位)来老化旧记录 . 我想将此主题实现为Ktable,我可以使用Interactive Queries API按键检索记录 . 有没有办法让我的KTable从我的主题“继承”保留政策?因此,当记录从主要主题中老化时,它们在ktabl...
  • 4 votes
     answers
     views

    消息中心上的Kafka Streams KTable配置错误

    This issue is now solved on Message Hub 我在Kafka创建KTable时遇到了一些麻烦 . 我是Kafka的新手,这可能是我问题的根源,但我想我无论如何都可以问这里 . 我有一个项目,我希望通过计算它们的总发生次数来跟踪不同的ID . 我在IBM Cloud上使用Message Hub来管理我的主题,到目前为止它已经非常出色 . 我在Message Hub上...
  • 0 votes
     answers
     views

    Kafka Stream根据json消息中的时间戳键对消息进行排序

    我正在使用JSON消息发布Kafka,例如: "UserID":111,"UpdateTime":06-13-2018 12:13:43.200Z,"Comments":2,"Like":10 "UserID":111,"UpdateTime":06-13-2018 12:13:...
  • 0 votes
     answers
     views

    KafkaStreams - 在代理失败后恢复流

    我已经实现了具有以下属性的KafkaStreams应用程序 application.id = KafkaStreams application.server = bootstrap.servers = [localhost:9092,localhost:9093] buffered.records.per.partition = 1000 cache.max.bytes.buffering = ...
  • 2 votes
     answers
     views

    Kafka Streams API - 使用自定义时间戳的聚合持久性

    我正在使用Kafka Streams API,我不确定我所看到的行为是否是一个错误或我无法正确理解的东西 . 上下文 我首先要说我是流处理的新手 . 我有一些当前批处理的数据,并希望将其提供给流媒体系统 . 需要注意的是,我现在无法直接向流集群添加事件 - 它们必须通过一个存储记录的中间系统并将它们加载到数据库中(这里,我将添加一个Kafka 生产环境 者) . 此外,中间系统是负载 balanc...
  • 0 votes
     answers
     views

    无法在IDE中删除Kafka Stream Application的状态目录

    我正在开发一个简单的Kafka Stream应用程序,它从主题中提取消息并在转换后将其放入另一个主题 . 我正在使用Intelij进行开发 . 当我调试/运行这个应用程序时,如果我的IDE和Kafka服务器位于 SAME machine (即使用BOOTSTRAP_SERVERS_CONFIG = localhost:9092和SCHEMA_REGISTRY_URL_CONFIG = localh...
  • 1 votes
     answers
     views

    事件采购 - Apache Kafka Kafka Streams - 如何确保原子性/交易性

    我使用Apache Kafka Streams评估事件采购,以了解复杂场景的可行性 . 与关系数据库一样,我遇到过一些案例,原子性/事务性是必不可少的: 购物应用程序有两个服务: OrderService :有一个带有订单的Kafka Streams商店(OrdersStore) ProductService :有一个Kafka Streams商店(ProductStockStore),其...
  • 0 votes
     answers
     views

    使用Apache Kafka查询MySQL表

    我正在尝试使用Kafka Streams来实现用例 . 我在MySQL中有两个表 - 用户和帐户 . 我正在使用Kafka MySQL连接器从MySQL获取事件到Kafka . 我需要从Kafka本身获取帐户中的所有用户ID . 所以我打算在MySQL输出主题上使用 KStream ,处理它以形成输出并将其发布到一个主题,其中Key作为account-id,值作为userIds用逗号(,)分隔 ....
  • 0 votes
     answers
     views

    kafka stream to ktable join

    我想加入一个 KStream:从主题创建,主题具有JSON值 . 我使用值中的两个属性重新键入流 . 示例值(json的片段) . 我创建了一个自定义pojo类并使用自定义serdes . {"value":"0","time":1.540753118800291E9,,"deviceIp":"111....
  • 0 votes
     answers
     views

    加入Kafka流中的外键

    假设我有三个Kafka主题,其中包含代表不同聚合(事件采购应用程序)中发生的业务事件的事件 . 这些事件允许使用以下属性构建聚合: users:usedId,name应用程序的 个模块:moduleId,name 用户模块的授权:grantId,userId,moduleId,scope 现在我想创建一个包含用户和产品名称(而不是id)的所有授权流 . 我想这样做: 通过use...
  • 0 votes
     answers
     views

    KSQL / Kafka Stream:设置和数据的复杂性?

    上周我问过这个问题:KSQL: append multiple child records to parent record 然而,在我对这个问题的解释中,我确实简化了事情,而且我发现我有点担心现实世界中设置的复杂性 . 为了快速重申,我正在使用的数据类型是付款和参与付款的各方: payments: | id | currency | amount | payment_date | |---...
  • 3 votes
     answers
     views

    Kafka Streams“map-side”加入就像字典查找一样

    这个问题是Kafka Streams with lookup data on HDFS的后续行动 . 我需要加入(如"map-side" join)小字典数据到主Kafka流 AFAIK,Kafka Stream实例始终适用于主题的给定分区 . 如果我想进行查找,我需要为连接键重新分配两个流,以将相关记录放在一起 . 如果需要检查多个查找数据,需要多次来回重新分配的成本是多少?...
  • 2 votes
     answers
     views

    如何重新处理批量Kafka流

    我想根据创建消息的时间戳批处理消息 . 此外,我想在固定时间窗口(1分钟)内批量处理这些消息 . 只有在窗口通过后,才能向下游推送批次 . 为了实现这一点,处理器API似乎或多或少适合(la KStream batch process windows): public void process(String key, SensorData sensorData) { //e...
  • 2 votes
     answers
     views

    Kafka流以特定键作为输入连接

    我在架构注册表中有3个不同的主题和3个Avro文件,我想流式传输这些主题并将它们连接在一起并将它们写入一个主题 . 问题是我要加入的密钥与我将数据写入每个主题的密钥不同 . 假设我们有这3个Avro文件:Alarm : { "type" : "record", "name" : "Alarm", &quot...
  • 0 votes
     answers
     views

    Kafka Streams使用依赖对象等待功能

    我创建了一个Kafka Streams应用程序,它接收来自不同主题的不同JSON对象,我想实现某种等待功能,但我不确定如何最好地实现它 . 为了简化问题,我将在下一节中使用简化实体,我希望可以用它来描述问题 . 因此,在我的一个溪流中,我收到了汽车物品,每辆车都有一个身份证 . 在第二个流中,我接收人物对象,并且每个人还具有车辆ID并且被分配给具有该id的车辆 . 我想从两个输入流(主题)中读取我...
  • 0 votes
     answers
     views

    几分钟后ktable-ktable加入的主题数据消失了

    我正在运行将ktable与另一个ktable(内部联接)连接起来的程序,并将其写入输出主题 . 当我执行程序时,一切正常 . 我可以在kafka日志中看到连接的数据 . 但几分钟后,日志文件中的数据就消失了 . 00000000xxx.log文件变为零字节 . 数据是否移动到其他地方?我需要做任何配置设置 . 我正在观察此行为仅适用于我用于创建ktables的主题 . 知道数据在几分钟后从主题中...
  • 3 votes
     answers
     views

    Kafka流的最佳实践

    我们有一个用python编写的预测服务来提供机器学习服务,你发送一组数据,它会给出异常检测或预测等等 . 我想使用Kafka流来处理实时数据 . 有两种方法可供选择: Kafka流作业只完成 ETL 功能:加载数据,并进行简单转换并将数据保存到弹性搜索 . 然后启动计时器定期从ES加载数据并调用预测服务来计算并将结果保存回ES . Kafka流工作除了 ETL 之外还做了所有工作,当Kaf...
  • 0 votes
     answers
     views

    无法为Kafka流打开商店,因为状态无效

    我正在尝试使用Kafka Streams,我创建了以下拓扑: KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(), historyEventSerde)); eventStream.s...
  • 1 votes
     answers
     views

    Kafka流:将值连接到数组中

    我有一个kafka流,它完成了KTable的缺失值(leftjoin完美地做到了这一点) . 但有时,我必须将每个值的连接组成一个数组,我不知道如何正确地做到这一点 . 例如(我带一个家庭): {father: idFather, mother : idMother, children:[{child: id1},{child: id2}] 我可以和Ktable一起加入,找到父亲和母亲的名字(加...
  • 0 votes
     answers
     views

    暗淡的kafka stream groupBy和window

    我无法理解groupBy / groupById的概念和kafka流中的窗口化 . 我的目标是在一段时间(例如5秒)内聚合流数据 . 我的流媒体数据类似于: {"value":0,"time":1533875665509} {"value":10,"time":1533875667511} {"value&q...
  • 0 votes
     answers
     views

    Kafka Streams窗口聚合批处理

    我在我的应用程序中处理了Kafka Streams: myStream .mapValues(customTransformer::transform) .groupByKey(Serialized.with(new Serdes.StringSerde(), new SomeCustomSerde())) .windowedBy(TimeWindows.of(10000L...
  • 3 votes
     answers
     views

    Kafka Streams动态路由(ProducerInterceptor可能是一个解决方案吗?)

    我正在使用Apache Kafka,我一直在尝试使用Kafka Streams功能 . 我想要实现的是非常简单的,至少在单词中,它可以通过常规的普通消费者/ 生产环境 者方法轻松实现: 从动态主题列表中读取a 对邮件进行一些处理 将消息推送到另一个主题,该主题根据消息内容计算名称 最初我认为我可以创建一个自定义接收器或注入某种 endpoints 解析器,以便以编程方式为每条消息定...
  • 1 votes
     answers
     views

    如何提高从kafka读取的性能并使用kafka Stream Application转发到kafka

    我有1.0.0 Kafka流API的Kafka流应用程序 . 我有单个经纪人0.10.2.0 kafka和单个主题与单个分区 . 除 生产环境 者request.timeout.ms之外,所有可配置参数都相同 . 我用5分钟配置了 生产环境 者request.timeout.ms来修复Kafka Streams program is throwing exceptions when produci...
  • 2 votes
     answers
     views

    使用Kafka Streams在输出中设置时间戳

    我在Kafka主题“原始数据”中获取CSV,目标是通过使用正确的时间戳(每行不同)发送另一个主题“数据”中的每一行来转换它们 . 目前,我有2个飘带: 一个用于拆分"raw-data"中的行,将它们发送到"internal"主题(无时间戳) 一个 TimestampExtractor 消耗"internal"并将它们发送到&quo...
  • 1 votes
     answers
     views

    Kafka Streams创建没有架构的avro主题

    我开发了一个java应用程序,它使用Schema Registry从avro主题读取数据,然后进行简单的转换并在控制台中打印结果 . 默认情况下,我使用GenericAvroSerde类来获取键和值 . 一切都很好,除了我必须为每个serde定义额外的配置 final Map<String, String> serdeConfig = Collections.singletonMap(...
  • 1 votes
     answers
     views

    反序列化Avro序列化Kafka流的问题

    我试图实现商店时收到异常 . 我正在运行Kafka 1.0,Confluent的Schema Registry 4.0和Avro 1.8.2 . 我使用Avro的maven插件生成了Pojo,并使用Confluent maven插件将模式部署到Confluent服务器 . 我能够为STREAM1主题生成一条消息 . 以下是设置流的代码: Properties properties = new Pr...

热门问题