首页 文章

Kafka Cassandra连接器实际上并没有写入数据库

提问于
浏览
1

我从这里使用开源Kafka Cassandra连接器:https://github.com/tuplejump/kafka-connect-cassandra

我按照教程并设置了说明 . 但是,连接器不会向我的数据库插入任何数据 . 这是我的sink.properties文件的内容:

name=cassandra-sink-connector
connector.class=com.tuplejump.kafka.connect.cassandra.CassandraSink
tasks.max=1
topics=hello-mqtt-kafka
cassandra.sink.route.hello-mqtt-kafka=devices_data.messages

我经营Kafka,Cassandra和Zookeeper,他们正在工作 . 我发送一些消息到主题“hello-kafka” . 出于测试目的,我有控制台消费者运行,它看到所有消息:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"text"}],"optional":false,"name":"devices.schema"},"payload":{"id":75679795,"text":"example5"}}
    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"text"}],"optional":false,"name":"devices.schema"},"payload":{"id":86874233,"text":"example6"}}

这是我的cassandra表的架构:

CREATE TABLE IF NOT EXISTS devices_data.messages (
        id int,
        created text,
        message text,
        PRIMARY KEY (id, created))
        WITH ID = 2de24390-03d5-11e7-a32a-ed242ef1cc00
        AND CLUSTERING ORDER BY (created ASC)
        AND bloom_filter_fp_chance = 0.01
        AND dclocal_read_repair_chance = 0.1
        AND crc_check_chance = 1.0
        AND default_time_to_live = 0
        AND gc_grace_seconds = 864000
        AND min_index_interval = 128
        AND max_index_interval = 2048
        AND memtable_flush_period_in_ms = 0
        AND read_repair_chance = 0.0
        AND speculative_retry = '99PERCENTILE'
        AND comment = ''
        AND caching = { 'keys': 'ALL', 'rows_per_partition': 'NONE' }
        AND compaction = { 'max_threshold': '32', 'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' }
        AND compression = { 'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor' }

现在,我有连接器运行,它不会抛出任何错误,但当我从cqlsh进行选择查询时,我看到我的数据没有插入cassandra . 我按照设置的说明和 Worker 的日志也没有显示任何问题 . 出于调试目的,我将一些格式错误的数据传递给kafka,连接器报告了消息格式的错误 . 所以肯定会看到消息,但由于某种原因它不会将其插入数据库 .

我和这个小虫坐了好几个小时,不知道会出现什么问题......我真的很感激任何帮助或想法,我可能会错过什么 .

以下是连接器的日志 . 关于“提交偏移”的最后几行是一直重复的,但数据库中没有任何内容 .

[2017-03-13 15:46:40,540] INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-03-13 15:46:40,540] INFO Kafka commitId : b8642491e78c5a13 (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-03-13 15:46:40,547] INFO Created connector cassandra-sink-connector (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-03-13 15:46:40,554] INFO Configured 1 Kafka - Cassandra mappings. (com.tuplejump.kafka.connect.cassandra.CassandraSinkTask:86)
[2017-03-13 15:46:40,955] INFO Did not find Netty's native epoll transport in the classpath, defaulting to NIO. (com.datastax.driver.core.NettyUtil:83)
[2017-03-13 15:46:42,039] INFO Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor) (com.datastax.driver.core.policies.DCAwareRoundRobinPolicy:95)
[2017-03-13 15:46:42,040] INFO New Cassandra host localhost/127.0.0.1:9042 added (com.datastax.driver.core.Cluster:1475)
[2017-03-13 15:46:42,041] INFO Connected to Cassandra cluster: Test Cluster (com.tuplejump.kafka.connect.cassandra.CassandraCluster:81)
[2017-03-13 15:46:42,114] INFO com.datastax.driver.core.SessionManager@63cd5271 created. (com.tuplejump.kafka.connect.cassandra.CassandraCluster:84)
[2017-03-13 15:46:42,146] INFO CassandraSinkTask starting with 1 routes. (com.tuplejump.kafka.connect.cassandra.CassandraSinkTask:189)
[2017-03-13 15:46:42,149] INFO Sink task WorkerSinkTask{id=cassandra-sink-connector-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:208)
[2017-03-13 15:46:42,294] INFO Discovered coordinator ismop-virtual-machine:9092 (id: 2147483647 rack: null) for group connect-cassandra-sink-connector. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:505)
[2017-03-13 15:46:42,302] INFO Revoking previously assigned partitions [] for group connect-cassandra-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
[2017-03-13 15:46:42,309] INFO (Re-)joining group connect-cassandra-sink-connector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
[2017-03-13 15:46:42,319] INFO Successfully joined group connect-cassandra-sink-connector with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:434)
[2017-03-13 15:46:42,320] INFO Setting newly assigned partitions [hello-mqtt-kafka-0] for group connect-cassandra-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)
[2017-03-13 15:46:43,337] INFO Reflections took 4909 ms to scan 64 urls, producing 3915 keys and 28184 values  (org.reflections.Reflections:229)
[2017-03-13 15:46:50,462] INFO WorkerSinkTask{id=cassandra-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
[2017-03-13 15:47:00,460] INFO WorkerSinkTask{id=cassandra-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
[2017-03-13 15:47:10,455] INFO WorkerSinkTask{id=cassandra-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
[2017-03-13 15:47:20,455] INFO WorkerSinkTask{id=cassandra-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)

1 回答

  • 0

    Apache Kafka包含一个使用者组管理工具:bin / kafka-consumer-groups.sh

    连接器中应该从Kafka读取事件的使用者组称为“connect-cassandra-sink-connector”(您可以在发布的日志片段中看到它) .

    我建议使用此工具检查:

    • 消费者是否都赶上了(即它得到的最后一个偏移量是日志的结尾)?如果是这样,尝试将新事件写入主题并查看是否写入了这些事件 . 也许它刚开始很晚并错过了早期活动?

    • 它似乎取得了进展吗?如果是的话,看起来它认为它已成功写入Cassandra . 如果没有,则无法从 Kafka 读取 . 尝试检查代理日志上的错误,并可能增加Connect的日志级别,看看是否有任何有趣的内容出现 .

相关问题