-
1 votesanswersviews
从KStream开始阅读Kafka主题
我的spring boot项目有一个演示Kafka Streams API的应用程序 . 我可以使用该命令使用主题 customer 中的所有消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic customer --from-beginning Kafka Streams API中使用KStream或... -
1 votesanswersviews
Kafka Streams - 与旧州的聚合
我有一个 KStream 来自主题 to1 的数据,如下所示: T1-KEY -> {T1} T2-KEY -> {T2} 和 KTable ,构造如下: 我正在使用org.apache.kafka.streams.StreamsBuilder从某个主题 to2 创建 KTable ,如下所示: A1-KEY -> { "A1", "Set&quo... -
3 votesanswersviews
Kafka Streams在分组和聚合时使用KTable转换为字符串问题
我有一个Kafka流,其传入消息看起来像 sensor_code: x, time: 1526978768, address: Y 我想创建一个KTable,它存储每个传感器代码的每个唯一地址 . KTable KTable<String, Long> numCount = streams .map(kvm1) .groupByKey(S... -
0 votesanswersviews
Kafka Streams:不重新划分共分区数据的 Map
我有一个来自底层主题的KStream,类型为[K3,V] . K3是由三个字段组成的密钥,即K3(a,b,c) . 然而,该主题仅由密钥的字段的子集分区,即K2(a,b) . 现在,我想创建一个KTable来连接和使用我的PAPI处理器 . 我希望这个KTable按K2(a,b)聚合 . 聚合只是将值收集到一个集合中 . 为此,我必须使用“map”功能将我的键从K3转换为K2 . 这将(尝试)重... -
0 votesanswersviews
KStream到KTable左连接返回Null
我目前正在尝试使用KStream到KTable连接来执行Kafka主题的丰富 . 对于我的概念证明,我目前有一个Kafka Stream,其中包含大约600,000条记录,这些记录都具有相同的密钥,并且KTable是从一个主题创建的,其中包含一个密钥,值对的1个记录,其中KTable主题中的密钥与600,000的密钥相匹配创建KStream主题中的记录 . 当我使用左连接(通过下面的代码)时,所有... -
21 votesanswersviews
使用Kafka的Streams API处理错误消息
我有一个基本的流处理流程,看起来像 master topic -> my processing in a mapper/filter -> output topics 我想知道处理“坏消息”的最佳方法 . 这可能是我无法正确反序列化的消息,或者处理/过滤逻辑可能以某种意外的方式失败(我没有外部依赖,所以不应该有这种类型的瞬态错误) . 我正在考虑将所有处理/过滤代码包装在try ca... -
2 votesanswersviews
使用kafka-streams有条件地对json输入流进行排序
我是开发kafka-streams应用程序的新手 . 我的流处理器用于根据输入json消息中的用户密钥值对json消息进行排序 . Message 1: {"UserID": "1", "Score":"123", "meta":"qwert"} Message 2: {&quo... -
3 votesanswersviews
如果HTTP / 1.0客户端请求Connection:keep-alive,它会理解分块编码吗?
如果我的HTTP服务器获得带有“Connection:keep-alive”标头的HTTP / 1.0请求,那么客户是否理解“Transfer-Encoding:chunked”是否公平? 本质上,我正在尝试决定是否尊重HTTP / 1.0客户端的“Connection:keep-alive”标头 . 如果我确实尊重它,那么我必须使用chunked编码进行回复,因为我无法缓冲整个回复以便计算Con... -
5 votesanswersviews
Apache Kafka订单根据其值来窗口化消息
我正在尝试找到一种方法来重新排序主题分区中的消息并将有序消息发送到新主题 . 我有Kafka发布者发送以下格式的String消息: {system_timestamp}-{event_name}?{parameters} 例如: 1494002667893-client.message?chatName=1c&messageBody=hello 1494002656558-chat.sta... -
0 votesanswersviews
消息密钥在Kafka Streams中为Long
我试图使用Long作为消息密钥的类型,但我明白了 Exception in thread "kafka_stream_app-f236aaca-3f90-469d-9d32-20ff694806ff-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize ke... -
1 votesanswersviews
如何提取Kafka Streams中消息中嵌入的时间戳
我想提取嵌入每条消息的时间戳,并将它们作为json有效负载发送到我的数据库中 . 我想获得以下三个时间戳 . Event-time: The point in time when an event or data record occurred, i.e. was originally created “by the source”. Processing-time: The point ... -
1 votesanswersviews
Kafka流来关联消息
是否可以根据在一段时间内以随机顺序出现的标准对邮件进行分组? 消息具有不同的标识符,并且标识符可能需要存储在存储器中以进行相关 . -
1 votesanswersviews
Kafka Streams - 按时间戳/序列保留消息?
我在Kafka流上收到消息 . 它们由用户ID键入 . 当他们进来时,他们会得到一个序列号和时间戳 . 这些消息在15分钟后“过期” . 用户可以根据给定时间(最多15分钟)或序列请求新消息 . 我最初的东西是这样的: `StreamsBuilder streamsBuilder = new StreamsBuilder(); KStream<String, Message> inbo... -
0 votesanswersviews
scala sbt版本的插件
我需要运行一个为scala 2.11编译的sbt插件 . 但是,sbt尝试下载scala 2.12的插件 . 我如何强迫sbt使用scala 2.11? ps:将 scalaVersion := "2.11.11" 添加到 build.sbt 并不能解决问题,因为sbt插件不受此影响 . -
0 votesanswersviews
KafkaStreams没有关闭
如果我创建并启动KafkaStream实例,然后在关闭钩子中调用.close(),则不会发生任何事情 - 日志记录表明存在的一个StreamThread已进入PENDING_SHUTDOWN状态,但随后就像这样永远 . 我已经尝试了一切没有运气 - 已经阅读了Kafka流源代码,看看它在关机期间做了什么,但在我看来,代码暗示StreamThread,如果处于运行状态,将永远不会停止(这不能正确 -... -
4 votesanswersviews
Kafka Spark流媒体:无法阅读消息
我正在整合Kafka和Spark,使用spark-streaming . 我创建了一个作为kafka制作人的主题: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 我正在kafka发布消息并尝试使用spark-streaming j... -
1 votesanswersviews
针对同一主题的Spark-kafka流式传输多个消费者群组不起作用
我是kafka和spark的新手 . 我有一个用例,需要从多个火花流窗口中消耗kafka主题 . Topic ... kafka-topics.sh --create --topic feed --partitions 10 --zookeeper xxx.xxx.xxx.xxx:xxxx --replication-factor 2 Code ... package tech.websta... -
4 votesanswersviews
Spark Streaming Kafka Consumer
我正在尝试设置一个Spark Streaming简单应用程序,它将从Kafka主题中读取消息 . 经过大量工作后,我处于这个阶段,但得到下面显示的例外情况 . 码: public static void main(String[] args) throws Exception { String brokers = "my.kafka.broker" + ":... -
1 votesanswersviews
无法通过火花流消耗kafka消息
我试图通过火花流程序消费来自kafka制作人的消息 . 这是我的计划 val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local") ... -
0 votesanswersviews
Apache Spark Kafka集成问题
尝试使用kafka主题使用以下命令在Spark中创建输入流但是会收到错误 . 这是我第一次尝试使用kafka进行Spark Streaming 版本细节, Spark版本:spark-2.2.0-bin-hadoop2.7 Kafka 版本:kafka_2.11-0.11.0.0 Zookeepr版本:zookeeper-3.4.10 Spark Streaming jar文件: ... -
0 votesanswersviews
Spark消费者没有收到Kafka消息
我有一个Spark scala使用者连接到另一个集群上的Kafka代理(Kafka集群与CDH集群分开) . params 是我正确拾取的 Kafka 参数 . val incomingstream = KafkaUtils.createDirectStream[String, String]( streamingContext, .....](topicSet, params)) pri... -
10 votesanswersviews
不消耗Spark流式传输Kafka消息
我想使用Spark(1.6.2)Streaming从Kafka(broker v 0.10.2.1 )中的主题接收消息 . 我正在使用 Receiver 方法 . 代码如下代码: public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppNam... -
3 votesanswersviews
我可以在没有SCM的情况下部署Capistrano吗?
Capistrano configfile有配置 set:scm,:git和:repo_url和:branch 我可以在没有scm的情况下部署capistrano吗? set :scm, :none set :repository, "." Capistrano部署中scm配置的好处是什么? -
3 votesanswersviews
Kafka Streams - 在同一主题上获得KTable和KStream的最佳方式?
我对Kafka Streams有一个问题(0.10.1.1) . 我正在尝试在同一主题上创建 KStream 和 KTable . 我尝试的第一种方法是简单地在同一主题上调用流和表的 KStreamBuilder 方法 . 这导致了 org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology buildin... -
0 votesanswersviews
使用Kafka Streams DSL进行2步窗口聚合
假设我有一个流"stream-1"每秒由1个数据点组成,并且我希望能够在不同的进程中运行每个步骤 . 如果stream-5和stream-10包含相同键/时间戳的更新(因此我不一定需要How to send final kafka-streams aggregation result of a time windowed KTable?),只要最后的值正确,这本身就不是问题 .... -
29 votesanswersviews
Akka Stream Kafka vs Kafka Streams
我目前正在与Akka Stream Kafka合作与kafka互动,我很惊讶与Kafka Streams有什么不同 . 我知道基于Akka的方法实现了反应性规范并处理了kafka流似乎缺乏的背压和功能 . 使用kafka流比akka溪流kafka有什么好处? -
0 votesanswersviews
Spark流式传输计算平均值
我从格式中接收来自kafka的数据,其中null是关键 . null,val1,val2,val3,val4,val5,val6,val7,...val23 null,val1,val2,val3,val4,val5,val6,val7,...val23 null,val1,val2,val3,val4,val5,val6,val7,...val23 我现在映射了值以删除空键并使用以下代码形成新... -
0 votesanswersviews
Kafka Streams:在收到新消息时重置窗口计时器
我的Kafka主题包含带有id字段的消息 . 我希望使用时间窗口按该id聚合消息,当副本进入时,应该移动该时间窗口 . 例: 我用 id = 94 检索了一条消息 . 我想等待 id = 94 的下一条消息30秒,如果它赢了't appear, I' m将开始处理 . 但是如果在这30秒内收到带有 id = 94 的新消息,我想重置定时器并等待下一个 id = 94 30秒 . 是否可以使用Ka... -
1 votesanswersviews
我如何使用kafka流窗口为烛台图表生成生成一条记录
我必须使用Kafka Stream来获取交易信息,以便在交易结果主题的每个特定持续时间内绘制烛台图表,它具有交易ID,金额,价格,交易时间,关键是交易ID,这对于每条记录完全不同,我想要做的是根据交易结果进行计算,以获得每个持续时间的最高价格,最低价格,开盘价,收盘价,tx close_time,并用它来创建烛台图表 . 我使用kafka流窗口来执行此操作: final KStreamBuilde... -
-1 votesanswersviews
Kafka Streams:提交不会发生
我是Kafka Streams的新手,我试图在超时的情况下试验kafka流的行为 . 这是我使用Processor API测试的场景: 我的kafka流应用程序使用kafka主题(String key,String message)并写入kafka主题(String key,String message) 我已将Consumer Config参数max.poll.interval.ms设置...