首页 文章

抽象RabbitMQ的Java类无法连接到队列服务器

提问于
浏览
0

我已经推出了以下Java类,它提供了一个发送 byte[] 并等待接收 byte[] 的方法:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

import java.util.concurrent.TimeoutException;
import fi.vakuutustiedot.config.Configuration;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class RabbitMessageQueue {

    private Channel channel;
    private String uri;
    private String messageQueueName;

    public static RabbitMessageQueue AVATAR_MESSAGE_QUEUE;
    public static RabbitMessageQueue EMAIL_MESSAGE_QUEUE;

    static { 
        try {
            AVATAR_MESSAGE_QUEUE = new RabbitMessageQueue("my-queue-name", "my-queue-uri");
            EMAIL_MESSAGE_QUEUE  = new RabbitMessageQueue("my-queue-name", "my-queue-uri");
        } catch (KeyManagementException ex) {
            Logger.getLogger(RabbitMessageQueue.class.getName()).log(Level.SEVERE, null, ex);
        } catch (TimeoutException ex) {
            Logger.getLogger(RabbitMessageQueue.class.getName()).log(Level.SEVERE, null, ex);
        } catch (URISyntaxException ex) {
            Logger.getLogger(RabbitMessageQueue.class.getName()).log(Level.SEVERE, null, ex);
        } catch (NoSuchAlgorithmException ex) {
            Logger.getLogger(RabbitMessageQueue.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public RabbitMessageQueue(String messageQueueName, String uri) throws KeyManagementException, TimeoutException, URISyntaxException, NoSuchAlgorithmException {
        this.messageQueueName = messageQueueName;
        this.uri = uri;

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(uri);

        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            Map<String, Object> params = new HashMap<>();
            params.put("x-ha-policy", "all");
            params.put("x-max-length-bytes", 
                       Configuration.AVATAR_FILE_MAX_SIZE);

            channel.queueDeclare(messageQueueName,
                    false,
                    false,
                    false,
                    params);
        } catch (IOException ex) {
            Logger.getLogger(RabbitMessageQueue.class.getName()).log(Level.SEVERE, null, ex);

        }
    }

    public synchronized void sendMessage(byte[] data) throws Exception {
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri(uri);

            try (Connection connection = connectionFactory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.basicPublish("", messageQueueName, null, data);
            }

        } catch (URISyntaxException ex) {

        } catch (NoSuchAlgorithmException | KeyManagementException ex) {
            Logger.getLogger(RabbitMessageQueue.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public synchronized byte[] receiveMessage() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(uri);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        final byte[] result;

        Consumer consumer = new Consumer() {
            private byte[] result;

            @Override
            public void handleConsumeOk(String consumerTag) {

            }

            @Override
            public void handleCancelOk(String consumerTag) {

            }

            @Override
            public void handleCancel(String consumerTag) throws IOException {

            }

            @Override
            public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

            }

            @Override
            public void handleRecoverOk(String consumerTag) {

            }

            @Override
            public void handleDelivery(String arg0, Envelope arg1, AMQP.BasicProperties arg2, byte[] arg3) throws IOException {
                result = arg3;
            }

            public byte[] getResult() {
                return result;
            }
        };
        channel.basicConsume(messageQueueName, consumer);
        MyDeliverCallback mdc = new MyDeliverCallback();
        MyCancelCallback mcc = new MyCancelCallback();
        channel.basicConsume(messageQueueName, true, mdc, mcc);
        return mdc.receiveMessage();
    }
}

class MyDeliverCallback implements DeliverCallback {

    private byte[] lastReadMessage;

    @Override
    public void handle(String consumerTag, Delivery message) 
            throws IOException {
        this.lastReadMessage = message.getBody();
    }

    byte[] receiveMessage() {
        byte[] tmp = lastReadMessage;
        lastReadMessage = null;
        return tmp;
    }
}

class MyCancelCallback implements CancelCallback {

    @Override
    public void handle(String consumerTag) throws IOException {

    }
}

当我使用该类启动一个简单的程序时,我收到:

java.net.SocketException: Socket Closed
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:598)
    at java.lang.Thread.run(Thread.java:748)
Dec 10, 2018 12:06:31 PM fi.vakuutustiedot.messagequeue.RabbitMessageQueue <init>
SEVERE: null
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:403)
    at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:64)
    at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:156)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1104)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1063)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1021)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1180)
    at fi.vakuutustiedot.messagequeue.RabbitMessageQueue.<init>(RabbitMessageQueue.java:62)
    at fi.vakuutustiedot.messagequeue.RabbitMessageQueue.<clinit>(RabbitMessageQueue.java:42)
    at net.coderodde.mq.Main.main(Main.java:11)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - access to vhost 'yldzxqij' refused for user 'yldzxqij', class-id=10, method-id=40)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:499)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:292)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)

我花了几个小时试图找到罪魁祸首,但没有用 . 有什么想法吗?

1 回答

  • 0

    Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - access to vhost 'yldzxqij' refused for user 'yldzxqij', class-id=10, method-id=40)

    看起来您的队列不允许您的连接 . 确保使用正确的凭据 .

    你可以在这里找到类似的情况:access to vhost refused for guest, with the MassTransit Sample-RequestResponse sample

相关问题