我希望在本教程之后使用Kafka connect将文本文件流式化为主题:http://bigdatums.net/2017/06/20/writing-file-content-to-kafka-topic/

然而,看起来该主题没有收到任何消息 . 接收器文件和消费者控制台都显示为空 .

这些是工作日志在开头显示的警告消息:

WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

这是工作日志最后的消息是:

[2018-03-06 19:55:15,439] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser:109)
[2018-03-06 19:55:15,439] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser:110)
[2018-03-06 19:55:15,441] INFO Created connector local-file-sink (org.apache.kafka.connect.cli.ConnectStandalone:99)
[2018-03-06 19:55:15,442] INFO WorkerSinkTask{id=local-file-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:268)
[2018-03-06 19:55:15,510] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Discovered coordinator King:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[2018-03-06 19:55:15,511] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:341)
[2018-03-06 19:55:15,511] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:336)
[2018-03-06 19:55:15,523] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Successfully joined group with generation 15 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[2018-03-06 19:55:15,525] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Setting newly assigned partitions [connect-test-mta-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:341)
[2018-03-06 19:55:25,413] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:306)
[2018-03-06 19:55:25,414] INFO WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:323)
[2018-03-06 19:55:35,415] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:306)
[2018-03-06 19:55:35,415] INFO WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:323)
[2018-03-06 19:55:45,416] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:306)
[2018-03-06 19:55:45,417] INFO WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:323)
[2018-03-06 19:55:55,417] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:306)
[2018-03-06 19:55:55,418] INFO WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:323)

这是server.properties的配置:

broker.id=0
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600
log.dirs=/home/bdata/kafka_bdata_temp/logs

num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
zookeeper.connect=172.16.100.150:2181
zookeeper.connection.timeout.ms=6000

这也是connect-standalone.properties工作者配置:

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=org.apache.kafka.connect.storage.StringConverter

#Default was in enable both
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/home/bdata/kafka_bdata_temp/offsets/connect.offsets

offset.flush.interval.ms=10000
offset.flush.timeout.ms=50000

这是文件源连接器的配置:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/home/bdata/MTA-Bus-Time_.2014-10-31.txt
topic=connect-test-mta