首页 文章

spring-amqp使用非线程安全的侦听器实现零消费者利用率

提问于
浏览
0

我们在 生产环境 中遇到了一个问题,即消费者利用率为零,队列不断增长,性能下降 .

每个使用者都是一个容器,其中包含非线程安全的侦听器bean的单个实例 .

每个侦听器都需要写入自己的文件集 . 为了避免线程争用,我希望只有一个线程可以写入自己的文件集 .

每个侦听器仅使用@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)实例化一次

我正在使用类似于question中的配置

每个容器还配置了一个重试建议,其中包含以下代码:

public class RetryMessageAdvice extends StatelessRetryOperationsInterceptorFactoryBean {
    private static final int DEFAULT_RETRY_COUNT = 5;
    private static final int DEFAULT_BACKOFF_MS = 250;
    private int retryCount;
    private int backOffPeriodInMS;

    public RetryMessageAdvice() {
        this.retryCount = DEFAULT_RETRY_COUNT;
        this.backOffPeriodInMS = DEFAULT_BACKOFF_MS;
        initializeRetryPolicy();
    }

    public RetryMessageAdvice(int retryCount, int backoff) {
        this.retryCount = retryCount;
        this.backOffPeriodInMS = backoff;
        initializeRetryPolicy();
    }

    public void initializeRetryPolicy() {

        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(this.retryCount);

        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(backOffPeriodInMS);

        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy);
        retryTemplate.setBackOffPolicy(backOffPolicy);

        this.setRetryOperations(retryTemplate);
        this.setMessageRecoverer(new RetryMessageRecoverer());
    }

    public int getRetryCount() {
        return retryCount;
    }

    public void setRetryCount(int retryCount) {
        this.retryCount = retryCount;
    }
}

消费者看起来像这样:

@Component("exportListenerImpl")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ExportListenerImpl extends ExportListenerBase {
    private static final Logger LOG = LoggerFactory.getLogger(ExportListenerImpl.class);

    private final ExportMapper exportMapper;
    private final ExportFormatter exportFormatter;

    @Autowired
    public ExportListenerImpl(@Qualifier("exportFormatter") ExportFormatter exportFormatter,
            @Qualifier("exportMapper") ExportedMapper exportedMapper,
            @Value("${export.root.dir}") String exportDirectory) {
        super(exportDirectory);
        this.exportedFormatter = exportFormatter;
        this.exportedMapper = exportedMapper;
    }

    @Override
    public void handle(AnalyticsEvent analyticsEvent) throws Exception {

        ExportedEvent exportedEvent = exportMapper.mapPlace(analyticsEvent);

        File csvFile = getCsvFile(exportedEvent);
        String csvRow = exportFormatter.writeAsString(exportedEvent);
        writeCsvRow(csvRow, csvFile);
    }
}

其他注意事项

  • 导出映射器和导出格式化程序是线程安全的但不使用@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)

  • 方法 writeCsvRowsynchronized .

  • 有大量错误导致exportMapper抛出异常并触发重试建议

  • 传入的消息愤怒是120 / s

  • 传入和传递速率之间的比率通常为5:1

我对错误的理论是

  • 大量错误导致大量重试并降低性能 . 我最好将错误消息放入错误队列中 .

  • 不知何故,writeCsvRow中的synchronized方法导致spring-amqp管理的某些更高级别线程出现问题 .

我的问题是,哪种理论是正确的?重试建议的影响是什么?

1 回答

  • 1
    • 如果这些bean也不是线程安全的,它们也必须是原型范围 .

    • 既然有's only one thread, synchronizing that method is not necessary but it shouldn'伤害了 .

    • 如果错误无法恢复,则应将重试策略配置为不重试这些异常 .

    .

    • 使用这些重试设置,每次出现错误时,您将暂停容器线程250毫秒 . 所以,是的;这会伤害表现 .

    • 不应该是一个重大的开销 .

相关问题