首页 文章

如何在骆驼中快速停止seda

提问于
浏览
3

我有一个带有分离器(使用流)的camel路由,它将消息发送到要处理的seda队列 . 当我试图轻轻地停止应用程序时,seda队列不会立即停止,它会在最终关闭之前处理所有消息 . 我该怎么做才能马上阻止它?

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.ExpressionBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.main.Main;

public class MySedaShutdownTest extends RouteBuilder  {

@Override
public void configure() throws Exception {
    onException(Exception.class)
        .process(new Processor() {

            @Override
            public void process(Exchange exchange) throws Exception {
                System.out.println("exception");
            }

        });

    from("timer:myTimer?repeatCount=1")
        .split(ExpressionBuilder.beanExpression(new MySplitter(), "myIterator"))
                .streaming()
                .to("seda:mySeda");

    from("seda:mySeda")
        .throttle(1)
        .process(new Processor() {

            @Override
            public void process(Exchange exchange) throws Exception {
                System.out.println("processing: " + exchange.getIn().getBody()
                        + "; app status: " + exchange.getContext().getStatus());

            }

        });
}

public static class MySplitter {

    public Iterator<String> myIterator() {
        List<String> values = new ArrayList<String>();
        for (int i = 0; i < 10; i++) {
            values.add("string nr : " + i);
        }
        System.out.println("in myIterator");
        return values.iterator();
    }
}

public static void main(String[] a) throws Exception {
    final Main main = new Main();
    new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(4);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                System.out.println("invoking shutdown");
                main.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }).start();
    System.out.println("starting app");
    main.enableHangupSupport();
    main.addRouteBuilder(new MySedaShutdownTest());
    main.run();
}

}

2 回答

相关问题