我已经推出了以下Java类,它提供了一个发送 byte[]
并等待接收 byte[]
的方法:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.util.concurrent.TimeoutException;
import fi.vakuutustiedot.config.Configuration;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
public final class RabbitMessageQueue {
private Channel channel;
private String uri;
private String messageQueueName;
public static RabbitMessageQueue AVATAR_MESSAGE_QUEUE;
public static RabbitMessageQueue EMAIL_MESSAGE_QUEUE;
static {
try {
AVATAR_MESSAGE_QUEUE = new RabbitMessageQueue("my-queue-name", "my-queue-uri");
EMAIL_MESSAGE_QUEUE = new RabbitMessageQueue("my-queue-name", "my-queue-uri");
} catch (KeyManagementException ex) {
Logger.getLogger(RabbitMessageQueue.class.getName()).log(Level.SEVERE, null, ex);
} catch (TimeoutException ex) {
Logger.getLogger(RabbitMessageQueue.class.getName()).log(Level.SEVERE, null, ex);
} catch (URISyntaxException ex) {
Logger.getLogger(RabbitMessageQueue.class.getName()).log(Level.SEVERE, null, ex);
} catch (NoSuchAlgorithmException ex) {
Logger.getLogger(RabbitMessageQueue.class.getName()).log(Level.SEVERE, null, ex);
}
}
public RabbitMessageQueue(String messageQueueName, String uri) throws KeyManagementException, TimeoutException, URISyntaxException, NoSuchAlgorithmException {
this.messageQueueName = messageQueueName;
this.uri = uri;
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri(uri);
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
Map<String, Object> params = new HashMap<>();
params.put("x-ha-policy", "all");
params.put("x-max-length-bytes",
Configuration.AVATAR_FILE_MAX_SIZE);
channel.queueDeclare(messageQueueName,
false,
false,
false,
params);
} catch (IOException ex) {
Logger.getLogger(RabbitMessageQueue.class.getName()).log(Level.SEVERE, null, ex);
}
}
public synchronized void sendMessage(byte[] data) throws Exception {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri(uri);
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.basicPublish("", messageQueueName, null, data);
}
} catch (URISyntaxException ex) {
} catch (NoSuchAlgorithmException | KeyManagementException ex) {
Logger.getLogger(RabbitMessageQueue.class.getName()).log(Level.SEVERE, null, ex);
}
}
public synchronized byte[] receiveMessage() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri(uri);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
final byte[] result;
Consumer consumer = new Consumer() {
private byte[] result;
@Override
public void handleConsumeOk(String consumerTag) {
}
@Override
public void handleCancelOk(String consumerTag) {
}
@Override
public void handleCancel(String consumerTag) throws IOException {
}
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
}
@Override
public void handleRecoverOk(String consumerTag) {
}
@Override
public void handleDelivery(String arg0, Envelope arg1, AMQP.BasicProperties arg2, byte[] arg3) throws IOException {
result = arg3;
}
public byte[] getResult() {
return result;
}
};
channel.basicConsume(messageQueueName, consumer);
MyDeliverCallback mdc = new MyDeliverCallback();
MyCancelCallback mcc = new MyCancelCallback();
channel.basicConsume(messageQueueName, true, mdc, mcc);
return mdc.receiveMessage();
}
}
class MyDeliverCallback implements DeliverCallback {
private byte[] lastReadMessage;
@Override
public void handle(String consumerTag, Delivery message)
throws IOException {
this.lastReadMessage = message.getBody();
}
byte[] receiveMessage() {
byte[] tmp = lastReadMessage;
lastReadMessage = null;
return tmp;
}
}
class MyCancelCallback implements CancelCallback {
@Override
public void handle(String consumerTag) throws IOException {
}
}
当我使用该类启动一个简单的程序时,我收到:
java.net.SocketException: Socket Closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:598)
at java.lang.Thread.run(Thread.java:748)
Dec 10, 2018 12:06:31 PM fi.vakuutustiedot.messagequeue.RabbitMessageQueue <init>
SEVERE: null
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:403)
at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:64)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:156)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1104)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1063)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1021)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1180)
at fi.vakuutustiedot.messagequeue.RabbitMessageQueue.<init>(RabbitMessageQueue.java:62)
at fi.vakuutustiedot.messagequeue.RabbitMessageQueue.<clinit>(RabbitMessageQueue.java:42)
at net.coderodde.mq.Main.main(Main.java:11)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - access to vhost 'yldzxqij' refused for user 'yldzxqij', class-id=10, method-id=40)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:499)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:292)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
我花了几个小时试图找到罪魁祸首,但没有用 . 有什么想法吗?
1 回答
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - access to vhost 'yldzxqij' refused for user 'yldzxqij', class-id=10, method-id=40)
看起来您的队列不允许您的连接 . 确保使用正确的凭据 .
你可以在这里找到类似的情况:access to vhost refused for guest, with the MassTransit Sample-RequestResponse sample