我从这里使用开源Kafka Cassandra连接器:https://github.com/tuplejump/kafka-connect-cassandra
我按照教程并设置了说明 . 但是,连接器不会向我的数据库插入任何数据 . 这是我的sink.properties文件的内容:
name=cassandra-sink-connector
connector.class=com.tuplejump.kafka.connect.cassandra.CassandraSink
tasks.max=1
topics=hello-mqtt-kafka
cassandra.sink.route.hello-mqtt-kafka=devices_data.messages
我经营Kafka,Cassandra和Zookeeper,他们正在工作 . 我发送一些消息到主题“hello-kafka” . 出于测试目的,我有控制台消费者运行,它看到所有消息:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"text"}],"optional":false,"name":"devices.schema"},"payload":{"id":75679795,"text":"example5"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"text"}],"optional":false,"name":"devices.schema"},"payload":{"id":86874233,"text":"example6"}}
这是我的cassandra表的架构:
CREATE TABLE IF NOT EXISTS devices_data.messages (
id int,
created text,
message text,
PRIMARY KEY (id, created))
WITH ID = 2de24390-03d5-11e7-a32a-ed242ef1cc00
AND CLUSTERING ORDER BY (created ASC)
AND bloom_filter_fp_chance = 0.01
AND dclocal_read_repair_chance = 0.1
AND crc_check_chance = 1.0
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND min_index_interval = 128
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE'
AND comment = ''
AND caching = { 'keys': 'ALL', 'rows_per_partition': 'NONE' }
AND compaction = { 'max_threshold': '32', 'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' }
AND compression = { 'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor' }
现在,我有连接器运行,它不会抛出任何错误,但当我从cqlsh进行选择查询时,我看到我的数据没有插入cassandra . 我按照设置的说明和 Worker 的日志也没有显示任何问题 . 出于调试目的,我将一些格式错误的数据传递给kafka,连接器报告了消息格式的错误 . 所以肯定会看到消息,但由于某种原因它不会将其插入数据库 .
我和这个小虫坐了好几个小时,不知道会出现什么问题......我真的很感激任何帮助或想法,我可能会错过什么 .
以下是连接器的日志 . 关于“提交偏移”的最后几行是一直重复的,但数据库中没有任何内容 .
[2017-03-13 15:46:40,540] INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-03-13 15:46:40,540] INFO Kafka commitId : b8642491e78c5a13 (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-03-13 15:46:40,547] INFO Created connector cassandra-sink-connector (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-03-13 15:46:40,554] INFO Configured 1 Kafka - Cassandra mappings. (com.tuplejump.kafka.connect.cassandra.CassandraSinkTask:86)
[2017-03-13 15:46:40,955] INFO Did not find Netty's native epoll transport in the classpath, defaulting to NIO. (com.datastax.driver.core.NettyUtil:83)
[2017-03-13 15:46:42,039] INFO Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor) (com.datastax.driver.core.policies.DCAwareRoundRobinPolicy:95)
[2017-03-13 15:46:42,040] INFO New Cassandra host localhost/127.0.0.1:9042 added (com.datastax.driver.core.Cluster:1475)
[2017-03-13 15:46:42,041] INFO Connected to Cassandra cluster: Test Cluster (com.tuplejump.kafka.connect.cassandra.CassandraCluster:81)
[2017-03-13 15:46:42,114] INFO com.datastax.driver.core.SessionManager@63cd5271 created. (com.tuplejump.kafka.connect.cassandra.CassandraCluster:84)
[2017-03-13 15:46:42,146] INFO CassandraSinkTask starting with 1 routes. (com.tuplejump.kafka.connect.cassandra.CassandraSinkTask:189)
[2017-03-13 15:46:42,149] INFO Sink task WorkerSinkTask{id=cassandra-sink-connector-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:208)
[2017-03-13 15:46:42,294] INFO Discovered coordinator ismop-virtual-machine:9092 (id: 2147483647 rack: null) for group connect-cassandra-sink-connector. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:505)
[2017-03-13 15:46:42,302] INFO Revoking previously assigned partitions [] for group connect-cassandra-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
[2017-03-13 15:46:42,309] INFO (Re-)joining group connect-cassandra-sink-connector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
[2017-03-13 15:46:42,319] INFO Successfully joined group connect-cassandra-sink-connector with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:434)
[2017-03-13 15:46:42,320] INFO Setting newly assigned partitions [hello-mqtt-kafka-0] for group connect-cassandra-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)
[2017-03-13 15:46:43,337] INFO Reflections took 4909 ms to scan 64 urls, producing 3915 keys and 28184 values (org.reflections.Reflections:229)
[2017-03-13 15:46:50,462] INFO WorkerSinkTask{id=cassandra-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
[2017-03-13 15:47:00,460] INFO WorkerSinkTask{id=cassandra-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
[2017-03-13 15:47:10,455] INFO WorkerSinkTask{id=cassandra-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
[2017-03-13 15:47:20,455] INFO WorkerSinkTask{id=cassandra-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
1 回答
Apache Kafka包含一个使用者组管理工具:bin / kafka-consumer-groups.sh
连接器中应该从Kafka读取事件的使用者组称为“connect-cassandra-sink-connector”(您可以在发布的日志片段中看到它) .
我建议使用此工具检查:
消费者是否都赶上了(即它得到的最后一个偏移量是日志的结尾)?如果是这样,尝试将新事件写入主题并查看是否写入了这些事件 . 也许它刚开始很晚并错过了早期活动?
它似乎取得了进展吗?如果是的话,看起来它认为它已成功写入Cassandra . 如果没有,则无法从 Kafka 读取 . 尝试检查代理日志上的错误,并可能增加Connect的日志级别,看看是否有任何有趣的内容出现 .