我在kafka connect SinkTask实现中有如下代码 . 对于关键异常,我想在任务中调用 stop() 并停止任务 .

我需要知道我是否转到 stop() kafka connect是否在kafka主题的分区中提交了偏移?即如果我处理我的异常,记录它,根据异常执行一些活动并调用 stop() ,是否提交了主题的偏移量(应用程序主题),因为 put() 没有异常?我假设因为没有从 put() 成功运行或者没有完成批处理(基于最大轮询记录)不应该完成偏移,所以当我重试时,不会遗漏任何记录 . 如果没有提交抵消,是否有人可以提供建议,请告诉我?

代码:

public class ExampleSinkTask extends SinkTask {
    @Override
    public void start(Map<String, String> map) {
    ....

    }

    @Override
    private void doSomeCriticalOperation()  {
        // call stop if critical operation fails. 

        ...

        // Exception handled, close the task (allow user interference in such cases) 
        stop();
    }

    @Override
    public void put(Collection<SinkRecord> collection)  {
        ..
        doSomeCriticalOperation();
        ..

    }

    @Override
    public void stop() {
        log.info("*** Stopping Kafka task completely ***");
        System.exit(0);
    }
}