我在使用Kafka Connect API开发Kafka源连接器时遇到了一些问题 .

我使用Retrofit和GSON从REST API获取数据,然后尝试将其插入Kafka .

这是我的源任务类:

public class BitfinexSourceTask extends SourceTask implements BitfinexTickerGetter.OnTickerReadyListener {
private static final String DATETIME_FIELD = "datetime";

private BitfinexService service;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private BlockingQueue<SourceRecord> queue = null;
private BitfinexTickerGetter tickerGetter;

private final Runnable runnable = new Runnable() {
    @Override
    public void run() {
        try {
            tickerGetter.get();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
};
private ScheduledFuture<?> scheduledFuture;

@Override
public String version() {
    return VersionUtil.getVersion();
}

@Override
public void start(Map<String, String> map) {
    service = BitfinexServiceFactory.create();
    queue = new LinkedBlockingQueue<>();

    tickerGetter = new BitfinexTickerGetter(service, this);
    scheduledFuture = scheduler.scheduleAtFixedRate(runnable, 0, 5, TimeUnit.MINUTES);
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
    List<SourceRecord> result = new LinkedList<>();
    if (queue.isEmpty()) result.add(queue.take());
    queue.drainTo(result);
    return result;
}

@Override
public void stop() {
    scheduledFuture.cancel(true);
    scheduler.shutdown();
}

@Override
public void onTickerReady(Ticker ticker) {
    Map<String, ?> srcOffset = Collections.singletonMap(DATETIME_FIELD, ticker.getDatetime());
    Map<String, ?> srcPartition = Collections.singletonMap("from", "bitfinex");

    SourceRecord record = new SourceRecord(srcPartition, srcOffset, ticker.getSymbol(), Schema.STRING_SCHEMA, ticker.getDatetime(), Ticker.SCHEMA, ticker);

    queue.offer(record);
}

}

我实际上能够构建和添加连接器 . 它运行没有任何错误或东西,但主题没有创建 . 我已决定手动创建主题,然后重新运行连接器,但主题仍为空 . Ticker 是我的POJO对象,包含字符串和双字段 .

有人可以帮我弄这个吗?