首页 文章

rocketmq抛出异常“[TIMEOUT_CLEAN_QUEUE]经纪人忙,启动流量控制一会儿”

提问于
浏览
0

版本:rocketmq-ALL-4.1.0-孵化

我们发送msg 1000 QPS,同步发送,但抛出异常: -

[TIMEOUT_CLEAN_QUEUE]经纪人忙,启动流量控制一段时间

有相关代码:

while (true) {
        try {
            if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
                final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek();
                if (null == runnable) {
                    break;
                }
                final RequestTask rt = castRunnable(runnable);
                if (rt == null || rt.isStopRun()) {
                    break;
                }

                final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) {
                    if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) {
                        rt.setStopRun(true);
                        rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size()));
                    }
                } else {
                    break;
                }
            } else {
                break;
            }
        } catch (Throwable ignored) {
        }
    }
}

我发现broker的sendMessageThreadPoolNums的默认值是1,

/**
 * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1.
 */
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;

但是以前的版本不是1,如果我配置 sendMessageThreadPoolNums = 100 ,可以解决这个问题吗?它将导致与默认值不同的东西 . 谢谢

1 回答

  • 0

    SHORT ANSWER:

    你有两个选择:

    将sendMessageThreadPoolNums设置为一个较小的数字,比如1,这是版本4.1.x之后的默认值 . 并且,仍然是4.1.x之后引入的useReentrantLockWhenPutMessage = false的默认值

    sendMessageThreadPoolNums=1 
     useReentrantLockWhenPutMessage=false
    

    如果您需要使用大量线程来处理发送消息,那么最好使用useReentrantLockWhenPutMessage = true

    sendMessageThreadPoolNums=128//large thread numbers
     useReentrantLockWhenPutMessage=true  // indicating that do NOT use spin lock but use ReentrantLock when putting message
    

相关问题