-
6 votesanswersviews
具有纯文本输入和avro输出的mapreduce作业
我对将Avro与map reduce一起使用感到困惑,并且无法找到好的教程 . 当输入和输出都是Avro数据文件时,像AvroJob和AvroMapper这样的类似乎可以解决问题 . 当你的输入只是纯文本时呢? 特别: 我的映射器将LongWritable键和Text值作为输入 . 它会发出文本键和MyAvroRecord值 . 我的reducer使用Text键和MyAvroRecords的Ite... -
0 votesanswersviews
如何在加入Kstreams后选择String格式数据中的空值
我已经在两个kstream上执行了连接操作,这些kstream由avro格式数据组成,然后我的键是Integer类型,值是string类型 . 输出是这样的: [KSTREAM-MERGE-0000000016]: 1, {"id": 1, "name": "john", "age": 26}/{"id&qu... -
1 votesanswersviews
无法阅读Kafka主题avro消息
Debezium连接器的Kafka连接事件是Avro编码的 . 在传递给Kafka connect独立服务的connect-standalone.properties中提到了以下内容 . key.converter=io.confluent.connect.avro.AvroConverter value.confluent=io.confluent.connect.avro.AvroConver... -
1 votesanswersviews
Avro Map-Reduce on oozie
我一直试图在oozie上运行Avro map-reduce . 我在workflow.xml中指定了mapper和reducer类,并提供了其他配置 . 但它给出了一个 java.lang.RunTime Exception - class mr.sales.avro.etl.SalesMapper not org.apache.hadoop.mapred.Mapper 直接在hadoop集群上... -
0 votesanswersviews
如何通过Spark Streaming从二进制数据中找出Avro架构?
我 Build 了一个Spark-Streaming管道,通过Kafka获取测量数据 . 该数据使用Avro序列化 . 数据可以有两种类型 - EquidistantData 和 DiscreteData . 我使用 avdl 文件和 sbt-avrohugger 插件创建了这些文件 . 我使用生成从 SpecificRecord 继承的Scala案例类的变体 . 在我的接收应用程序中,我可... -
2 votesanswersviews
Spring Cloud Stream w / Kafka Confluent Schema Registry Client坏了?
好奇,如果有人有这个工作,因为我目前正在努力 . 我创建了简单的Source和Sink应用程序来发送和接收基于Avro架构的消息 . 消息的架构保存在Confluent架构注册表中 . 这两个应用程序都配置为使用ConfluentSchemaRegistryClient类,但我认为这里可能存在一个错误 . 这就是我看到的让我惊讶的地方 . 如果我与Confluent注册表的REST API交互,我... -
8 votesanswersviews
带解码器问题的 Kafka Avro Consumer
当我尝试使用我各自的模式对数据运行Avaf 的卡夫卡消费者时,它返回错误“AvroRuntimeException:格式错误的数据。长度为负:-40”。我看到其他人有类似的问题将字节数组转换为 json,Avro 写和读和Kafka Avro Binary *编码员。我也引用了这个消费者群体示例,这些都有帮助,但到目前为止这个错误没有任何帮助..它可以工作直到这部分代码(第 73 行) 解码器解码... -
2 votesanswersviews
Apache Kafka Avro 反序列化:无法反序列化或解码特定类型的消息。
我正在尝试使用 Avro Serialize 和 Apache kafka 进行 serialize/deserialize 消息。我创建了一个生成器,用于序列化特定类型的消息并将其发送到队列。当消息成功发送到队列时,我们的消费者选择消息并尝试处理,但在尝试时我们面临异常,对于特定对象的大小写字节。例外情况如下: [error] (run-main-0) java.lang.ClassCastEx... -
0 votesanswersviews
如何使用 Python decode/deserialize Kafka Avro 字符串
我从 Python 中接收远程服务器 Kafka Avro 消息(使用 Confluent Kafka Python 库的消费者),它使用 json 字典表示点击流数据,其中包含用户代理,位置,URL 等字段。这是消息的样子: b'\x01\x00\x00\xde\x9e\xa8\xd5\x8fW\xec\x9a\xa8\xd5\x8fW\x1axxx.xxx.xxx.xxx\x02:https:... -
0 votesanswersviews
kafka connect avro enums 解析为字符串
我正在使用来自汇编的 kafka connect 框架来生成从我的应用程序服务器到 kafka 集群的消息(用于 avro 支持的 zookeeper 代理模式注册表)。 我通过 connect 发送的数据由 avro 架构定义。我的架构表示包含 ENUMS 的结构化对象。事实上,Apache avro 支持支持枚举类型。我不必将我的架构提交到注册表,因为 kafka connect API 会自... -
0 votesanswersviews
在火花流中消耗 Avro 事件并创建 data-frame
我是一个新兴的引发流和 scala 并需要一些帮助来消费来自 kafka 的 Avro 消息并将其转换为 spark 数据帧。 请参考以下来自 Confluent kafka connect 的 Avro事件,其中包含Schema和Data-payload。 我需要使用它,然后从中创建一个包含“Data Rows”和“Schema”的数据帧。这听起来有点复杂,但是请您提供一些我可以使用的示例代码吗... -
0 votesanswersviews
反序列化来自 Kafka Connect 的 Avro 事件
我有来自 Kafka-connect producer 的 kafka 消息,格式如下,并希望反序列化它以获取核心数据。 Kafka-connect producer 将它作为“SourceRecord”发送,它嵌入“schema”和“Struct” 如何在 JAVA 或 SCALA 中将数据反序列化并从中提取为域对象? {"schema": { "type&qu... -
0 votesanswersviews
如何从apache nifi在kafka主题中生成Avro消息,然后使用kafka流读取它?
我想使用apache nifi在kafka主题中生成一些通用数据,我希望这些数据采用avro格式 . 我为它做了什么: 在架构注册表中创建新架构: {“type”:“record”,“name”:“my_schema”,“namespace”:“my_namespace”,“doc”:“”,“fields”:[{“name”:“key”,“type” :“int”},{“name”:“val... -
3 votesanswersviews
Avro日期和时间与BigQuery的兼容性?
BigQuery通常可以很好地加载Avro数据,但是“bq load”在时间戳和使用Avro logicalType属性的其他日期/时间字段方面遇到了很多麻烦 . 当BigQuery TIMESTAMP将它们解释为微秒时间戳(关闭1000)时,我的Avro类型timestamp-millis数据被破坏 . 可加载到TIMESTAMP中的时间戳 - 微整数在BigQuery DATETIME... -
1 votesanswersviews
Google BigQuery支持Avro logicalTypes
正如Google声称的那样,不支持从Avro logicalType 转换为BigQuery特定类型(如here on the bottom所述) . 但是,我可以使用以下架构加载Avro文件: schema = { 'name': 'test', 'namespace': 'testing', 'type': 'record', 'fields': [ ... -
1 votesanswersviews
Flume Kafka HDFS:拆分消息
我有以下flume代理配置来读取来自kafka源的消息并将它们写回HDFS接收器 tier1.sources = source1 tier 1.channels = channel1 tier1.sinks = sink1 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.s... -
9 votesanswersviews
使用AWS Glue和Apache Avro进行架构更改
我是AWS Glue的新手,并且很难完全理解AWS文档,但我正在努力解决以下用例: 我们有一个带有许多Avro文件的s3存储桶 . 我们已经决定使用Avro,因为它可以在超时的情况下对数据模式更改提供广泛支持,从而允许将新字段应用于旧数据而不会出现任何问题 . 使用AWS Glue,我了解只要存在架构更改,爬虫就会创建一个新表 . 当我们的模式发生变化时,这会导致爬虫程序按照预期创建许多新表,但并... -
3 votesanswersviews
Oozie:从Oozie <java>行动中启动Map-Reduce?
我正在尝试使用 <java> 操作在Oozie工作流中执行Map-Reduce任务 . O 'Reilley' s Apache Oozie(Islam and Srinivasan 2015)指出: 虽然不推荐,但可以使用Java操作来运行Hadoop MapReduce作业,因为MapReduce作业毕竟只是Java程序 . 调用的主类可以是Hadoop MapReduce驱动程... -
1 votesanswersviews
最佳实践:如何通过更改“schema”/“columns”来处理数据记录
这是一个最佳实践问题 . 我们的设置是一个hadoop集群,在hdfs中存储(日志)数据 . 我们以csv格式获取数据,每天一个文件 . 只要文件的“模式”(尤其是列数)不会更改,就可以在hadoop中对这些文件运行MR作业 . 但是,我们面临的问题是,我们要分析的日志记录最终会发生变化,因为可能会添加或删除列 . 我想知道你们中的一些人是否愿意分享你们在这些情况下的最佳实践 . 我们现在想到的最... -
5 votesanswersviews
流式传输JSON数据,在S3中保存为Parquet
我有一个生成JSON的Kinesis流,并希望使用Storm以Parquet格式写入S3 . 这种方法需要在流处理期间从JSON - > Avro - > Parquet进行转换 . 此外,我需要处理这种方法中的模式演变,并不断更新avro架构和avsc生成的java类 . 另一个选择是直接在S3中编写JSON并使用Spark将存储的文件转换为镶木地板 . 在这种情况下,Spark... -
0 votesanswersviews
Pyspark无法使用协议s3a,s3n和s3从s3读取avro文件
尝试使用路径协议从s3读取avro文件时://bucket/prefix/filename.avro 使用协议s3,s3a或s3n时出现这种错误: 得到错误:在get_return_value py4j中输入文件“/usr/local/spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py”,第319行 .... -
0 votesanswersviews
avro.io.AvroTypeException:数据<avro data>不是模式的示例{...}
我们正在努力将Apache Storm与Kafka的Confluent框架集成在一起 . 我们正在使用一个名为“Pyleus”的python风暴包装器 我们设置了一个监控数据库表的Confluent-Kafka JDBC连接器,每当DB发生变化时,新记录将以Avro格式发送为Kafka消息 . 在Pyleus bolt中,我们能够获得Kafka消息,但是,我们无法将其反序列化为JSON . 我们使... -
1 votesanswersviews
Kafka Streams创建没有架构的avro主题
我开发了一个java应用程序,它使用Schema Registry从avro主题读取数据,然后进行简单的转换并在控制台中打印结果 . 默认情况下,我使用GenericAvroSerde类来获取键和值 . 一切都很好,除了我必须为每个serde定义额外的配置 final Map<String, String> serdeConfig = Collections.singletonMap(... -
0 votesanswersviews
Kafka控制台消费者没有消费主题
我们有一个处理消息 生产环境 和消费的服务器 . 我们有4台笔记本电脑,所有macs都融合了所有运行相同的命令行...... ./kafka-avro-console-consumer --from-beginning --bootstrap-server 0.0.0.0:9092,0.0.0.0:9092-topic topicName --property schema.registry.u... -
1 votesanswersviews
反序列化Avro序列化Kafka流的问题
我试图实现商店时收到异常 . 我正在运行Kafka 1.0,Confluent的Schema Registry 4.0和Avro 1.8.2 . 我使用Avro的maven插件生成了Pojo,并使用Confluent maven插件将模式部署到Confluent服务器 . 我能够为STREAM1主题生成一条消息 . 以下是设置流的代码: Properties properties = new Pr... -
1 votesanswersviews
使用Avro的单个Kafka主题中的多个消息类型
我有一个基于Kafka的事件源应用程序 . 目前我有一个主题,其中包含多种消息类型 . 全部使用JSON进行序列化/反序列化 . 融合的模式注册表看起来像是一种很好的消息类型维护方法,并且在Avro完全兼容模式下,它还提供了一种在我的事件源应用程序中发送消息版本控制的机制 . 最近patch - blog post到4.1.1汇合 . 使用Avro序列化程序/反序列化程序,您可以在一个主题中拥... -
0 votesanswersviews
如何将模式附加到kakfa主题以过滤json消息?
我是kafka和schema注册表的新手 . 我正在尝试为kafka主题强制执行json架构 . 因此,每当生成器生成json消息并将其推送到kafka主题时,它应该仅在符合模式的情况下才会通过 . 我使用的是avro架构格式 . 我已经使用汇合平台安装了模式注册表 . 我跟着这个 https://github.com/confluentinc/schema-registry 我有kaka主题“m... -
1 votesanswersviews
将Avro文件加载到BigQuery会因内部错误而失败
Google BigQuery位于March 23, 2016 announced "Added support for Avro source format for load operations and as a federated data source in the BigQuery API or command-line tool" . 它说here "Th... -
2 votesanswersviews
使用AVRO格式的BiqQuery流插入
有没有办法使用流式插入将AVRO格式的数据插入BigQuery?如果是,我们正在使用google-cloud-python,似乎不支持 . 只有支持的类型是json . 我错过了什么吗? 有关BQ流式插入的文档似乎不包括数据类型 . 我只能找到here提到的AVRO数据格式,但不能在流式插入的上下文中找到 . 有没有办法使用流媒体插入以AVRO格式插入数据,你能否请我链接到任何一个例子?如果无法使... -
1 votesanswersviews
如何将AVRO文件中的整数值加载到bigquery中的日期列?
我指的是一个类似的帖子,我发现它非常有用 . 它显示了我们如何将avro文件中的整数列加载到包含时间戳字段的BigQuery表 . Compatibility of Avro dates and times with BigQuery? 我有一个类似的问题 . 有没有办法将avro文件中的整数值加载到bigquery中的日期列? 由于avro不支持date数据类型,我尝试将日期保留为avro中的...