我一直在尝试编写一个消费者,这个消费者读取了由融合的kafka jdbc连接器创建的kafka主题 .

  • jdbc连接器从sqlite数据库表名称DATA_RECORD读取 .
    sqlite表的
  • 模式是
CREATE TABLE DATA_RECORD (ID INTEGER PRIMARY KEY NOT NULL, RECORD BLOB);
  • 使用avro工具创建了avro架构文件并生成了java源代码 . 下面提到的架构
{ "type": "record",   "name": "DataRecord",   "fields": [
{"name": "ID", "type": "long"},
{"name": "RECORD", "type": "string"}    ] }
  • 然后创建了不轮询数据的消费者应用程序 . 消费者java程序如下
package com.fedo.kafka.data.transform;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class DataTransformer {

public static void main(String[] args) {
    String topicName    = "data-topic";
    String groupName    = "data-group";

    Properties props    = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", groupName);
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    props.put("schema.registry.url", "http://localhost:8081");
    props.put("specific.avro.reader", "true");

    KafkaConsumer<String, DataRecord> consumer = new KafkaConsumer<>(props);

    consumer.subscribe(Arrays.asList(topicName));

    System.out.println("Record polling started.....");
    try {
        while (true){
            ConsumerRecords<String, DataRecord> records = consumer.poll(100);
            for (ConsumerRecord<String, DataRecord> record : records){
                System.out.println("Key = " + record.key());
                System.out.println("Val = " + record.value());
            }
        }
    }catch(Exception ex){
        ex.printStackTrace();
    }
    finally{
        consumer.close();
    }
}
}
  • 消费者在每次投票后总是不返回任何记录 .

我究竟做错了什么?