我在使用Kafka Connect API开发Kafka源连接器时遇到了一些问题 .
我使用Retrofit和GSON从REST API获取数据,然后尝试将其插入Kafka .
这是我的源任务类:
public class BitfinexSourceTask extends SourceTask implements BitfinexTickerGetter.OnTickerReadyListener {
private static final String DATETIME_FIELD = "datetime";
private BitfinexService service;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private BlockingQueue<SourceRecord> queue = null;
private BitfinexTickerGetter tickerGetter;
private final Runnable runnable = new Runnable() {
@Override
public void run() {
try {
tickerGetter.get();
} catch (IOException e) {
e.printStackTrace();
}
}
};
private ScheduledFuture<?> scheduledFuture;
@Override
public String version() {
return VersionUtil.getVersion();
}
@Override
public void start(Map<String, String> map) {
service = BitfinexServiceFactory.create();
queue = new LinkedBlockingQueue<>();
tickerGetter = new BitfinexTickerGetter(service, this);
scheduledFuture = scheduler.scheduleAtFixedRate(runnable, 0, 5, TimeUnit.MINUTES);
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> result = new LinkedList<>();
if (queue.isEmpty()) result.add(queue.take());
queue.drainTo(result);
return result;
}
@Override
public void stop() {
scheduledFuture.cancel(true);
scheduler.shutdown();
}
@Override
public void onTickerReady(Ticker ticker) {
Map<String, ?> srcOffset = Collections.singletonMap(DATETIME_FIELD, ticker.getDatetime());
Map<String, ?> srcPartition = Collections.singletonMap("from", "bitfinex");
SourceRecord record = new SourceRecord(srcPartition, srcOffset, ticker.getSymbol(), Schema.STRING_SCHEMA, ticker.getDatetime(), Ticker.SCHEMA, ticker);
queue.offer(record);
}
}
我实际上能够构建和添加连接器 . 它运行没有任何错误或东西,但主题没有创建 . 我已决定手动创建主题,然后重新运行连接器,但主题仍为空 . Ticker
是我的POJO对象,包含字符串和双字段 .
有人可以帮我弄这个吗?