我们正在尝试将MQTT源连接器链接到InfluxDB接收器连接器 . 现在前者工作正常,但后者给出了以下例外:

org.apache.kafka.connect.errors.ConnectException:由于不可恢复的异常而退出WorkerSinkTask . org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)位于org.apache.kafka.connect的org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265) .runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)位于org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask .java:146)atg.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)at java.util.concurrent.Executors $ runnableAdapter.call(Executors.java:511)at java.util . concurrent.FutureTask.run(FutureTask.java:266)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)at java .lang.Thread.run(Thread.java:748)

这是InfluxDB配置文件

connector.class=com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector
connect.influx.url=http://localhost:8086
connect.influx.db=iot
tasks.max=1
topics=simMetrics
connect.influx.kcql=INSERT INTO sensorMetrics SELECT * FROM simMetrics WITHTIMESTAMP sys_time()
name=influxdb-sink
connect.influx.username=""

这是消息结构:

{“timestamp”:1524572345184,“partition”:0,“key”:{“topic”:“machine / sensor / mytopic / test”,“id”:“1”},“offset”:0,“topic” “:”simMetrics“,”value“:{”metrics“:{”buzzer“:0,”led“:0,”water“:false,”buzzer_timestamp“:1524571762798,”temperature_timestamp“:1524571762816,”water_timestamp“: 1524571762835,“fan”:0,“light”:500,“temperature”:27.371554588194957,“assetName”:“SIMopcua”,“fan_timestamp”:1524571762791,“light_timestamp”:1524571762808,“led_timestamp”:1524571762827}}}

MQTT源连接器配置:

connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
name=mqtt-source
connect.mqtt.kcql=INSERT INTO simMetrics SELECT * FROM machine/sensor/mytopic/test WITHCONVERTER=com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter
connect.mqtt.service.quality=1
connect.mqtt.hosts=tcp://192.168.208.203:1884

UPDATE
我们发现问题出在温度值格式上 . 由于我们没有't configured the field'类型,InfluxDB将温度值理解为双倍 . 具有小数分隔符的所有值都被正确保存,当Kafka发送值时出现问题,没有小数部分,省略小数分隔符 . 我们如何解决这个问题?
PS: 实际的解决方法是将0.00000001添加到所有传入温度 .