我在kafka connect SinkTask实现中有如下代码 . 对于关键异常,我想在任务中调用 stop()
并停止任务 .
我需要知道我是否转到 stop()
kafka connect是否在kafka主题的分区中提交了偏移?即如果我处理我的异常,记录它,根据异常执行一些活动并调用 stop()
,是否提交了主题的偏移量(应用程序主题),因为 put()
没有异常?我假设因为没有从 put()
成功运行或者没有完成批处理(基于最大轮询记录)不应该完成偏移,所以当我重试时,不会遗漏任何记录 . 如果没有提交抵消,是否有人可以提供建议,请告诉我?
代码:
public class ExampleSinkTask extends SinkTask {
@Override
public void start(Map<String, String> map) {
....
}
@Override
private void doSomeCriticalOperation() {
// call stop if critical operation fails.
...
// Exception handled, close the task (allow user interference in such cases)
stop();
}
@Override
public void put(Collection<SinkRecord> collection) {
..
doSomeCriticalOperation();
..
}
@Override
public void stop() {
log.info("*** Stopping Kafka task completely ***");
System.exit(0);
}
}