首页 文章

在Spring-AMQP中演出403s

提问于
浏览
1

我需要在不同的运行时间内使用可变数量的消费者线程来保证消费者独占性,这些消费者线程使用固定数量的队列(其中队列的数量远大于消费者的队列数量) .

我的一般想法是,我让每个消费者线程尝试 Build 一个独占连接来清除队列,如果它在给定时间段内没有从该队列接收消息,则将其重定向到另一个队列 .

即使队列被临时清除,它也有可能在将来再次接收消息,因此不能简单地忘记队列 - 相反,消费者应该稍后返回它 . 为了实现这种轮换,我想我会使用队列队列 . 当消费者失败时,危险将是失去对队列队列中队列的引用;我认为这似乎可以通过确认来解决,如下所示 .

本质上,每个消费者线程等待从队列队列中获取带有对队列(1)的引用的消息(A);消息(A)最初仍未得到确认 . 消费者愉快地尝试清除队列(1),并且一旦队列(1)在给定的时间量内保持为空,消费者就从队列队列中请求新的队列名称 . 在接收到第二消息(B)和对新队列(2)的引用时,对队列(1)的引用作为新消息(C)被放回到队列队列的末尾,并且最后消息(A)得到承认 .

实际上,队列队列's delivered-at-least-and-probably-only-once guarantee almost gets me exclusivity for the normal queues (1, 2) here, but in order to make sure I absolutely don' t会丢失对队列的引用,我需要在确认消息(A)之前将队列(1)重新发布为消息(C) . 这意味着如果服务器在将队列(1)重新发布为消息(C)之后但在确认(A)之前失败,则队列队列中可能存在两个对队列(1)的引用,并且不再保证排他性 .

因此,我需要使用AMQP的独家消费者标志,这很好,但就目前而言,如果我收到“403 ACCESS REFUSED”,我也不想重新发布对队列的引用,以便重复参考文献不会增加 .

但是,我是一个优秀的AMQP库,我不知道如何用错误处理程序挂钩 . 容器上公开的 setErrorHandler 方法似乎没有"403 ACCESS REFUSED"错误 .

有没有办法让我可以使用我目前正在使用的框架对403进行操作?或者,还有另一种方法可以实现我需要的保证吗?我的代码如下 .

“监控服务”:

import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpAuthenticationException;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class ListenerMonitoringService {

    private static final Logger log = LoggerFactory.getLogger(ListenerMonitoringService.class);

    private static final Period EXPIRATION_PERIOD = Period.millis(5000);

    private static final long MONTIORING_POLL_INTERVAL = 5000;
    private static final long MONITORING_INITIAL_DELAY = 5000;

    private final Supplier<AbstractMessageListenerContainer> messageListenerContainerSupplier;

    private final QueueCoordinator queueCoordinator;
    private final ScheduledExecutorService executorService;

    private final Collection<Record> records;

    public ListenerMonitoringService(Supplier<AbstractMessageListenerContainer> messageListenerContainerSupplier,
                                     QueueCoordinator queueCoordinator, ScheduledExecutorService executorService) {
        this.messageListenerContainerSupplier = messageListenerContainerSupplier;
        this.queueCoordinator = queueCoordinator;
        this.executorService = executorService;

        records = new ArrayList<>();
    }

    public void registerAndStart(MessageListener messageListener) {
        Record record = new Record(messageListenerContainerSupplier.get());

        // wrap with listener that updates record
        record.container.setMessageListener((MessageListener) (m -> {
            log.trace("{} consumed a message from {}", record.container, Arrays.toString(record.container.getQueueNames()));
            record.freshen(DateTime.now(DateTimeZone.UTC));
            messageListener.onMessage(m);
        }));

        record.container.setErrorHandler(e -> {
            log.error("{} received an {}", record.container, e);
            // this doesn't get called for 403s
        });

        // initial start up
        executorService.execute(() -> {
            String queueName = queueCoordinator.getQueueName();

            log.debug("Received queue name {}", queueName);
            record.container.setQueueNames(queueName);

            log.debug("Starting container {}", record.container);
            record.container.start();

            // background monitoring thread
            executorService.scheduleAtFixedRate(() -> {
                log.debug("Checking container {}", record.container);
                if (record.isStale(DateTime.now(DateTimeZone.UTC))) {
                    String newQueue = queueCoordinator.getQueueName();
                    String oldQueue = record.container.getQueueNames()[0];
                    log.debug("Switching queues for {} from {} to {}", record.container, oldQueue, newQueue);
                    record.container.setQueueNames(newQueue);

                    queueCoordinator.markSuccessful(queueName);
                }
            }, MONITORING_INITIAL_DELAY, MONTIORING_POLL_INTERVAL, TimeUnit.MILLISECONDS);
        });

        records.add(record);
    }

    private static class Record {
        private static final DateTime DATE_TIME_MIN = new DateTime(0);

        private final AbstractMessageListenerContainer container;
        private Optional<DateTime> lastListened;

        private Record(AbstractMessageListenerContainer container) {
            this.container = container;
            lastListened = Optional.empty();
        }

        public synchronized boolean isStale(DateTime now) {
            log.trace("Comparing now {} to {} for {}", now, lastListened, container);
            return lastListened.orElse(DATE_TIME_MIN).plus(EXPIRATION_PERIOD).isBefore(now);
        }

        public synchronized void freshen(DateTime now) {
            log.trace("Updating last listened to {} for {}", now, container);
            lastListened = Optional.of(now);
        }
    }
}

“队列队列”处理程序:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

private class MetaQueueCoordinator implements QueueCoordinator {

    private static final Logger log = LoggerFactory.getLogger(MetaQueueCoordinator.class);

    private final Channel channel;
    private final Map<String, Envelope> envelopeMap;
    private final RabbitTemplate rabbitTemplate;

    public MetaQueueCoordinator(ConnectionFactory connectionFactory) {
        Connection connection = connectionFactory.createConnection();
        channel = connection.createChannel(false);

        envelopeMap = new ConcurrentHashMap<>();
        rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange("");
        rabbitTemplate.setRoutingKey("queue_of_queues");
    }

    @Override
    public String getQueueName() {
        GetResponse response;
        try {
            response = channel.basicGet("queue_of_queues", false);
        } catch (IOException e) {
            log.error("Unable to get from channel");
            throw new RuntimeException(e);
        }

        String queueName = new String(response.getBody());
        envelopeMap.put(queueName, response.getEnvelope());

        return queueName;
    }

    @Override
    public void markSuccessful(String queueName) {
        Envelope envelope = envelopeMap.remove(queueName);
        if (envelope == null) {
            return;
        }

        log.debug("Putting {} at the end of the line...", queueName);
        rabbitTemplate.convertAndSend(queueName);

        try {
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (IOException e) {
            log.error("Unable to acknowledge {}", queueName);
        }
    }

    @Override
    public void markUnsuccessful(String queueName) {
        Envelope envelope = envelopeMap.remove(queueName);
        if (envelope == null) {
            return;
        }

        try {
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (IOException e) {
            log.error("Unable to acknowledge {}", queueName);
        }
    }
}

1 回答

  • 1

    ErrorHandler 用于在邮件传递过程中处理错误,而不是设置侦听器本身 .

    即将发布的1.5版本publishes application events何时发生这种情况 .

    它将于今年夏天晚些时候发布;此功能目前仅适用于1.5.0.BUILD-SNAPSHOT;应在未来几周内提供候选版本 .

    project page显示了如何从快照存储库获取快照 .

相关问题