-
0 votesanswersviews
如何使用Python中的kafkaProducer发送数据?
我无法将数据发送到应具有适当权限的远程Kafka群集 . 该主题已在群集上创建 . 我也试图以字节发送数据,但仍然有相同的错误 . 如果您有任何信息,可能会有很大的帮助!我所遇到的错误就在提供的图片上 . -
1 votesanswersviews
scala-如何确认Kafka服务器(代理)中存在特定主题?
我正在使用scala,spark和Kafka . 我有两个问题 . 1.如何确认Kafka经纪人(服务器)中存在的主题? 2.如何确认Kafka服务器(引导服务器)是否正在运行? object kafkaProducer extends App { def sendMessages(): Unit = { //define topic val topic = "spark-to... -
0 votesanswersviews
是否可以从kafka中的代理中排除主题
我是 Kafka 的新手!! 是否可以在apache kafka中从代理中排除主题?怎么样? To run a producer for a topic we give the command: bin / kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatopic 我们在此命令中没有代理选项 . 我们可以... -
5 votesanswersviews
Kafka:如何连接kafka-console-consumer来获取远程代理主题内容?
我在ec2上的一台机器上设置了一个kafka zookeeper和3个代理,端口为9092..9094,我正在尝试使用另一台机器上的主题内容 . 端口2181(zk),9092,9093和9094(服务器)对消费者机器开放 . 我甚至可以做一个给我的 bin/kafka-topics.sh --describe --zookeeper 172.X.X.X:2181 --topic remoteto... -
1 votesanswersviews
Kafka控制台 生产环境 者的相关Id错误
我已经启动了一个独立的kafka服务器(版本2.11-0.11.0.1),其中包含1个节点和1个zookeeper,我正在尝试使用acls实现ssl但无法生成 . 执行以下步骤: Started kafka node using following configurations ie (server.properties): broker.id = 0听众= PLAINTEXT://127.0... -
7 votesanswersviews
在Kafka中读取字段'topic_metadata'时出错
我正在尝试使用auto.create.topics.enable = true在我的server.properties文件中连接到我的代理 . 但是当我尝试使用Java客户端 生产环境 者连接到代理时,我得到以下 error . 1197 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.pr... -
0 votesanswersviews
Kafka:在制作Kafka主题的记录时出现异常
我在制作Kafka主题的记录时遇到异常: java.lang.RuntimeException: This server is not the leader for that topic-partition. 以下是将记录发送到Kafka主题的代码 . AtomicReference<Exception> exRef = new AtomicReference<>(); ... -
4 votesanswersviews
Kafka Streams:如何更改记录时间戳(0.11.0)?
我正在使用FluentD(第12版稳定版)向Kafka发送消息 . 但是FluentD使用旧的KafkaProducer,因此记录时间戳始终设置为-1 . 因此,我必须使用WallclockTimestampExtractor将记录的时间戳设置为当消息到达kafka时的时间点 . 我真正感兴趣的时间戳是由流利的信息发送的: “timestamp”:“1507885936”,“host”:“V.X... -
-3 votesanswersviews
用于检查kafka代理(服务器)是否可用的Scala代码
如何检查Kafka服务器是否可用 . 我试过下面的scala代码-Producer API . val props = new Properties() props.put("bootstrap.servers", "hworker.dev.arch.mum.private:6667") props.put("acks", &qu... -
1 votesanswersviews
无法在独立的Kafka经纪人上创建主题
我想在Kafka服务器上创建一个新主题 . 但低于错误 . 请注意,它不是一个只设置一个经纪人的独立系统 . 它之前工作得很好,我改变的只是要创建的新主题名称 . 什么突然错了?以及它如何运作? /usr/kafka_2.11-0.9.0.0# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor ... -
0 votesanswersviews
kafka 生产环境 商和经纪人在不同的服务器上
我的问题是:如何将数据从kafka 生产环境 者发送到经纪人?下面的架构解释了我的网络配置: 我在VM中有一个 生产环境 者,它位于服务器A中,我的代理也位于服务器B中的VM中 . 我使用从 生产环境 者VM到服务器B的 SSH 连接以及重定向端口:ssh -L 9092:192.168.56.101:9092 xx @ IP1 我使用kafka控制台来测试: bin/kafka-console... -
2 votesanswersviews
客户和用户的Kafka配额
我对kafka很新,想了解配额制度如何适用于kafka . 直到现在我一直在关注文件here 我已经能够使用以下命令为新客户端设置配额( 生产环境 和消费) bin / kafka-configs.sh --zookeeper 10.11.10.2:2181 --alter --add-config'producer_byte_rate = 1024,consumer_byte_rate = 1... -
0 votesanswersviews
Kafka 制片人表现不佳
我和kafka制作人有问题 . 其实我正在使用spring kafka,并通过KafkaTemplate发送消息: DefaultKafkaProducerFactory<K, V> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(producerParams); KafkaTemplate k... -
5 votesanswersviews
KafkaProducer未成功将消息发送到队列中
我在我的Windows PC上构建了一个小测试环境并写下以下代码来测试kafka(使用org.apache.kafka中的kafka_2.10:0.9.0.1) . package iii.functiontesting; import java.text.ParseException; import java.util.Properties; import org.apache.kafka.... -
0 votesanswersviews
KafkaStreams - 在代理失败后恢复流
我已经实现了具有以下属性的KafkaStreams应用程序 application.id = KafkaStreams application.server = bootstrap.servers = [localhost:9092,localhost:9093] buffered.records.per.partition = 1000 cache.max.bytes.buffering = ... -
3 votesanswersviews
spark kafka 生产环境 商可序列化
我想出了一个例外: 错误yarn.ApplicationMaster:用户类抛出异常:org.apache.spark.SparkException:任务不可序列org.apache.spark.SparkException:任务不能序列在org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala... -
4 votesanswersviews
使用Kafka在应用程序上打开太多文件错误
我正在使用Kafka和Spark Streaming构建应用程序 . 输入数据来自第三部分流媒体,并在kafka主题上发布 . 此代码显示了Stream Proxy模块:这是我从流式传输中获取结果以及如何将它们发送到KafkaPublisher的方式(它只显示了草图): def on_result_response(self,*args): self.kafkaPublisher.push... -
0 votesanswersviews
Kafka 主题描述没有给出一致的结果
我在一个有3个节点的集群中使用Kafka 0.10.0.0 我在经纪人和所有经纪人上启用了自动主题创建,我已添加了指定分区和副本数量的属性 num.partitions=2 default.replication.factor=2 现在,当我通过命令行查询主题描述时,我得到了意想不到的结果 . 有时它为此主题显示1个分区,有时为此主题显示2个分区 . bin/kafka-topics.sh ... -
0 votesanswersviews
Kafka Remote Producer - advertised.listeners
我在CDH 5.9上运行Kafka 0.10.0,集群是kerborized . 我想要做的是将消息从远程机器写入我的Kafka经纪人 . 群集(安装Kafka的位置)具有内部和外部IP地址 . 群集中的计算机主机名解析为专用IP,远程计算机将相同的主机名解析为公共IP地址 . 我从远程机器打开了必要的端口9092(我正在使用SASL_PLAINTEXT协议)到Kafka Broker,验证了使用... -
0 votesanswersviews
当一个经纪人失败时, Kafka 制片人失败了
我们有一个3节点kafka(0.10.2.0)集群3节点ZK(zookeeper-3.4.10)集群 . 有大约80个主题,每个主题有10个分区和2个复制因子 . 每个Producer都会获得所有3个代理的列表,每个消费者都会获得所有3个zookeeper节点的列表 . Zookeeper properties: initLimit=10 syncLimit=5 # disable the per... -
0 votesanswersviews
连接从我的本地机器在EC2机器上运行的Kafka
我是Kafka的新手并在论坛中搜索了不同的帖子,但找不到解决方案 . 我已经在EC2实例上安装了kafka,并尝试从我的ubuntu本地机器上连接它 . 我的目标是让python kafka客户端(包括Producer和Consumer)在我的本地机器上运行,并通过EC2 kafka实例发送/接收数据 . 那可能吗? Properties set in server.properties conf... -
1 votesanswersviews
使用Windows子系统运行的Kafka的连接超时
我在Windows 10笔记本电脑上的Windows SubSystem for Linux下安装了Kafka 1.1.0和Zookeeper 3.4.12 . 我能够在我停留在ubuntu时生成和消费消息,但是当我想从windows(使用java程序或使用工具kafka-console-producer.bat)生成消息时,我有以下错误: [2018-05-11 15:31:01,449] ER... -
2 votesanswersviews
Kafka控制台 生产环境 者可以在本地连接但不能远程连接
我有一个Kafka节点,比如IP地址1.2.3.4 . 如果我从2个不同的终端窗口SSH到该节点,并从1个终端运行控制台消费者,从另一个终端运行控制台 生产环境 者,一切都很好: # Run the consumer from terminal 1 kafka-console-consumer.sh --zookeeper zkA:2181 --topic simpletest # Run th... -
2 votesanswersviews
当kafka服务器关闭时,Kafka 生产环境 者无限期地发送块
我正在使用Kafka 0.11.0.0 . 我有一个发布到Kafka主题的测试程序;如果zookeeper和Kafka服务器关闭(这在我的开发环境中是正常的;我根据需要启动它们),那么对KafkaProducer <> . send()的调用将无限期挂起 . 我要么需要send()返回,最好指出错误;或者我需要一种方法来检查服务器是启动还是关闭 . 基本上,我希望我的测试工具能够告诉我... -
1 votesanswersviews
Kafka:该服务器不是该主题分区的领导者
可能重复Kafka - This server is not the leader for that topic-partition但没有接受的答案也没有明确的解决方案 . 我有一个简单的java程序来生成给Kafka的消息: Properties props = new Properties(); props.put("bootstrap.servers", "lo... -
60 votesanswersviews
了解Kafka主题和分区
我开始学习Kafka用于企业解决方案 . 在我的阅读中,我想到了一些问题: 当一个制作人正在制作一条消息时 - 它会指定要发送消息的 topic ,是吗?它关心分区吗? 订阅者正在运行时 - 它是否指定了其组ID,以便它可以是同一主题的消费者群集的一部分,或者是该群体消费者感兴趣的几个主题? 每个消费者组在代理上是否有相应的分区,或者每个消费者都有一个分区? 作为代理创建的分区,因... -
0 votesanswersviews
Kafka Consumer分区映射
我有100个消费者在同一组听同一主题和100分区 . 因此,根据文档,每个消费者应该只听一个分区,因为有100个消费者和100个分区 . 我使用密钥向kafka发送消息 . 因此,具有相同密钥的某些消息应该位于同一分区中,并且应始终由该组中的同一消费者使用 . 但在我的情况下,具有相同密钥的多个消息被随机消耗多个消费者 . 无论如何,来自分区的所有消息都只由该组中的一个特定使用者使用 . 我不想明... -
0 votesanswersviews
如果在生成Kafka主题记录时出现任何故障,则会使进程失败
我希望每当KafkaProducer因为任何原因在kafka主题上生成记录失败时都会使进程失败 . 我发送如下记录: for(String message : messages) { producer.send(new ProducerRecord<>(topic, message), (metadata, ex) -> { if (ex != null)... -
9 votesanswersviews
如何在apache kafka中创建主题?
在 Kafka 创建主题的最佳途径是什么? 创建主题时要定义多少副本/分区? 在新的 生产环境 者API中,当我尝试将消息发布到不存在的主题时,它首次失败然后成功发布 . 我想知道,副本,分区和集群节点数之间的关系 . 我们是否需要在发布消息之前创建主题? -
1 votesanswersviews
如何使用Spring集成kafka配置具有多个分区的kafka生成器主题
我阅读了很多文章,但没有找到如何使用Spring Integration Kafka配置具有多个分区(在运行时创建的主题)主题的Producer . 我正在使用github link来理解和配置我的应用程序的kafka . 请为此提供解决方案 还有一件事,kafkaHeader.messageKey的用途是什么 . Update : Below is test code which I devel...