我们在 生产环境 中遇到了一个问题,即消费者利用率为零,队列不断增长,性能下降 .
每个使用者都是一个容器,其中包含非线程安全的侦听器bean的单个实例 .
每个侦听器都需要写入自己的文件集 . 为了避免线程争用,我希望只有一个线程可以写入自己的文件集 .
每个侦听器仅使用@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)实例化一次
我正在使用类似于question中的配置
每个容器还配置了一个重试建议,其中包含以下代码:
public class RetryMessageAdvice extends StatelessRetryOperationsInterceptorFactoryBean {
private static final int DEFAULT_RETRY_COUNT = 5;
private static final int DEFAULT_BACKOFF_MS = 250;
private int retryCount;
private int backOffPeriodInMS;
public RetryMessageAdvice() {
this.retryCount = DEFAULT_RETRY_COUNT;
this.backOffPeriodInMS = DEFAULT_BACKOFF_MS;
initializeRetryPolicy();
}
public RetryMessageAdvice(int retryCount, int backoff) {
this.retryCount = retryCount;
this.backOffPeriodInMS = backoff;
initializeRetryPolicy();
}
public void initializeRetryPolicy() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(this.retryCount);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(backOffPeriodInMS);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
this.setRetryOperations(retryTemplate);
this.setMessageRecoverer(new RetryMessageRecoverer());
}
public int getRetryCount() {
return retryCount;
}
public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}
}
消费者看起来像这样:
@Component("exportListenerImpl")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ExportListenerImpl extends ExportListenerBase {
private static final Logger LOG = LoggerFactory.getLogger(ExportListenerImpl.class);
private final ExportMapper exportMapper;
private final ExportFormatter exportFormatter;
@Autowired
public ExportListenerImpl(@Qualifier("exportFormatter") ExportFormatter exportFormatter,
@Qualifier("exportMapper") ExportedMapper exportedMapper,
@Value("${export.root.dir}") String exportDirectory) {
super(exportDirectory);
this.exportedFormatter = exportFormatter;
this.exportedMapper = exportedMapper;
}
@Override
public void handle(AnalyticsEvent analyticsEvent) throws Exception {
ExportedEvent exportedEvent = exportMapper.mapPlace(analyticsEvent);
File csvFile = getCsvFile(exportedEvent);
String csvRow = exportFormatter.writeAsString(exportedEvent);
writeCsvRow(csvRow, csvFile);
}
}
其他注意事项
-
导出映射器和导出格式化程序是线程安全的但不使用@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
-
方法 writeCsvRow 是 synchronized .
-
有大量错误导致exportMapper抛出异常并触发重试建议
-
传入的消息愤怒是120 / s
-
传入和传递速率之间的比率通常为5:1
我对错误的理论是
-
大量错误导致大量重试并降低性能 . 我最好将错误消息放入错误队列中 .
-
不知何故,writeCsvRow中的synchronized方法导致spring-amqp管理的某些更高级别线程出现问题 .
我的问题是,哪种理论是正确的?重试建议的影响是什么?
1 回答
如果这些bean也不是线程安全的,它们也必须是原型范围 .
既然有's only one thread, synchronizing that method is not necessary but it shouldn'伤害了 .
如果错误无法恢复,则应将重试策略配置为不重试这些异常 .
.
使用这些重试设置,每次出现错误时,您将暂停容器线程250毫秒 . 所以,是的;这会伤害表现 .
不应该是一个重大的开销 .