首页 文章

使用kafka connect的hdfs中没有avro数据

提问于
浏览
0

我正在使用kafka connect distribution . 该命令是:bin / connect-distributed etc / schema-registry / connect-avro-distributed.properties

工作人员配置是:

bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
    group.id=connect-cluster
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false

kafka连接重新开始没有错误!

已创建主题connect-configs,connect-offsets,connect-status . 主题mysiteview已创建 .

然后我使用RESTful API创建kafka连接器,如下所示:

curl -X POST -H "Content-Type: application/json" --data '{"name":"hdfs-sink-mysiteview","config":{"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector","tasks.max":"3","topics":"mysiteview","hdfs.url":"hdfs://master1:8020","topics.dir":"/kafka/topics","logs.dir":"/kafka/logs","format.class":"io.confluent.connect.hdfs.avro.AvroFormat","flush.size":"1000","rotate.interval.ms":"1000","partitioner.class":"io.confluent.connect.hdfs.partitioner.DailyPartitioner","path.format":"YYYY-MM-dd","schema.compatibility":"BACKWARD","locale":"zh_CN","timezone":"Asia/Shanghai"}}'  http://kafka1:8083/connectors

当我生成数据到主题“mysiteview”时,这样的事情:

{"f1":"192.168.1.1","f2":"aa.example.com"}

java代码如下: ``
`Properties props = new Properties();
props.put("bootstrap.servers","kafka1:9092");
props.put("acks","all");
props.put("retries",3);
props.put("batch.size", 16384);
props.put("linger.ms",30);
props.put("buffer.memory",33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String,String>(props);
Random rnd = new Random();
for(long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String site = "www.example.com";
String ipString = "192.168.2." + rnd.nextInt(255);
String key = "" + rnd.nextInt(255);
User u = new User();
u.setF1(ipString);
u.setF2(site+" "+rnd.nextInt(255));
System.out.println(JSON.toJSONString(u));
producer.send(new ProducerRecord<String,String>("mysiteview",JSON.toJSONString(u)));
Thread.sleep(50);
}

producer.flush();
producer.close();`
``

发生了奇怪的事情 . 我从kafka-logs获取数据但在hdfs中没有数据(没有主题目录) . 我尝试连接器命令:

curl -X GET http://kafka1:8083/connectors/hdfs-sink-mysiteview/status

输出是:

{"name":"hdfs-sink-mysiteview","connector":{"state":"RUNNING","worker_id":"10.255.223.178:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"10.255.223.178:8083"},{"state":"RUNNING","id":1,"worker_id":"10.255.223.178:8083"},{"state":"RUNNING","id":2,"worker_id":"10.255.223.178:8083"}]}

但是当我使用以下命令检查任务状态时:

curl -X GET http://kafka1:8083/connectors/hdfs-sink-mysiteview/hdfs-sink-siteview-1

我得到结果:“错误404” . 三个任务是同样的错误!

出了什么问题?

2 回答

  • 0

    在没有看到工作人员的日志的情况下,当您使用上面描述的设置时,我不确定您的HDFS连接器实例确实失败了 . 但是我可以发现配置的一些问题:

    • 您提到启动Connect工作者: bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties . 这些属性默认将键和值转换器设置为 AvroConverter ,并要求您运行 schema-registry 服务 . 如果您确实编辑了 connect-avro-distributed.properties 中的配置以改为使用 JsonConverter ,则在将Kafka记录转换为Connect的 SinkRecord 数据类型之前,您的HDFS连接器可能会失败,就在它尝试将数据导出到HDFS之前 .

    • 直到最近,HDFS连接器才能将Avro记录导出为Avro或Parquet格式的文件 . 这需要使用上面提到的 AvroConverter . 最近添加了将记录作为JSON导出到文本文件的功能,并且将出现在连接器的版本 4.0.0 中(您可以通过签出并从源代码构建连接器来尝试此功能) .

    此时,我的第一个建议是尝试使用 bin/kafka-avro-console-producer 导入您的数据 . 定义其架构,确认已使用 bin/kafka-avro-console-consumer 成功导入数据,然后将HDFS连接器设置为使用 AvroFormat ,如上所述 . 连接器页面上的quickstart描述了一个非常类似的过程,也许这将是您的用例的一个很好的起点 .

  • 0

    也许你只是使用REST-Api错了 . 根据文件,电话应该是 /connectors/:connector_name/tasks/:task_id

    https://docs.confluent.io/3.3.1/connect/restapi.html#get--connectors-(string-name)-tasks-(int-taskid)-status

相关问题