我想使用Electron golang包装器为Qpid proton-c库连接到Azure EventHub .
我将以下SASL详细信息组合到构建连接字符串所需的host / port / namespace / path但由于某种原因我不断收到错误消息: connection reset by peer
.
package main
import (
"fmt"
"os"
"strings"
"qpid.apache.org/amqp"
"qpid.apache.org/electron"
)
var (
eventHubNamespaceName = "<MY_CUSTOM_NAMESPACE>"
eventHubName = "<MY_CUSTOM_NAME>"
eventHubSasKeyName = "<MY_CUSTOM_SAS_KEY_NAME>"
eventHubSasKey = "<MY_CUSTOM_SAS_KEY>" // this is the base64 encoded stuff
)
func main() {
sentChan := make(chan electron.Outcome) // Channel to receive acknowledgements.
container := electron.NewContainer(fmt.Sprintf("send[%v]", os.Getpid()))
urlStr := fmt.Sprintf("amqp://%s.servicebus.windows.net:5671/%s", eventHubNamespaceName, eventHubName)
fmt.Printf("The URL connection string: '%v'\n", urlStr)
// parse URL
url, err := amqp.ParseURL(urlStr)
if err != nil {
panic(err)
}
fmt.Printf("The AMQP parsed URL: %v\n", url)
// TCP dial
amqpHost := url.Host
fmt.Printf("The AMQP host used in the connection is: '%v'\n", amqpHost)
c, err := container.Dial(
"tcp", amqpHost,
electron.SASLEnable(),
electron.Password([]byte(eventHubSasKey)),
electron.User(eventHubSasKeyName),
)
if err != nil {
panic(err)
}
defer c.Close(nil)
// AMQP send
addr := strings.TrimPrefix(url.Path, "/")
s, err := c.Sender(electron.Target(addr))
if err != nil {
panic(err)
}
m := amqp.NewMessage()
body := fmt.Sprintf("bla bla bla %v", 42)
m.Marshal(body)
fmt.Printf("The AMQP message body: '%v'\n", m.Body())
go s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan
// AMQP ACK receive
fmt.Printf("Waiting for ACKs...\n")
for {
fmt.Printf("Waiting for an ACK coming out of the channel...\n")
out := <-sentChan // Outcome of async sends.
fmt.Printf("Received something: '%v'\n", out)
}
}
编译时,然后运行代码,这是输出:
The URL connection string: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP parsed URL: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP host used in the connection is: '<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671'
The AMQP message body: 'bla bla bla 42'
Waiting for ACKs...
Waiting for an ACK coming out of the channel...
Received something: '{unsent : read tcp <MY_PRIVATE_IP_IN_LAN>:<SOME_PORT>-><THE_NSLOOKUP_IP_OF_THE_AZURE_EVENTHUB>:5671: read: connection reset by peer bla bla bla 42}'
Waiting for an ACK coming out of the channel...
对我来说,收到消息说 connection reset by peer
看起来不是一个有效的ACK,我不确定连接尝试有什么问题?
-
proton-c
的编译版本是0.18.0,我使用go1.7.4 linux/amd64
. -
如果我将
electron.SASLAllowedMechs("EXTERNAL")
添加到连接选项,那么我会收到相同的错误消息 . -
如果我将端口更改为
5672
,则在尝试通过TCP拨号后出现connection refused
恐慌错误 . -
如果我使用
base64.StdEncoding.DecodeString(eventHubSasKey)
解码base64密码字段并将字节传递给连接选项,我会一直收到相同的错误connection reset by peer
. -
如果我添加此连接选项
electron.SASLAllowedMechs("ANONYMOUS")
,那么我仍然会收到相同的错误消息connection reset by peer
. 这样做的原因是我没有使用任何SSL证书,并且Microsoft提供的AMQP的Java包装器似乎使用了这个"anonymous"而不是证书(事实上,使用Java连接器连接到EventHub不需要证书) ) .
我不知道如何继续这里,因为我被困在连接部分,我相信SASL的详细信息是按照这里的文档以正确的方式传递的:https://godoc.org/qpid.apache.org/electron#ConnectionOption
我仍然不确定失败的原因不是由于SSL证书,如果是这样的话我很难看到如何在过程中包含它们 .
Edit:
我后来发现我必须通过TCP Build TLS连接,即使我没有提供任何私有/公共密钥,也指定了“虚拟主机”(否则AMQP抱怨无法识别主机):
// TLS connection details
tlsConfig := &tls.Config{}
eventHubDomainPort := fmt.Sprintf("%s.servicebus.windows.net:5671", eventHubNamespaceName)
tlsConn, err := tls.Dial("tcp", eventHubDomainPort, tlsConfig)
if err != nil {
panic(err)
}
// AMPQ container connection on top of TLS via TCP
eventHubDomain := fmt.Sprintf("%s.servicebus.windows.net", eventHubNamespaceName)
amqpConn, err := container.Connection(
tlsConn,
electron.SASLEnable(),
electron.User(eventHubSasKeyName),
electron.Password([]byte(eventHubSasKey)),
electron.VirtualHost(eventHubDomain),
// electron.SASLAllowedMechs(<SOME_MECHANISM>),
)
if err != nil {
panic(err)
}
defer amqpConn.Close(nil)
// AMQP sender (a AMQP link with target the name defined on the Azure portal)
s, err := amqpConn.Sender(electron.Target(eventHubName))
if err != nil {
panic(err)
}
但是,当使用环境变量 PN_TRACE_FRM=true
(在 proton-c
级别给我一些详细的日志记录)运行应用程序时,错误是:
[handle=0, closed=true, error=@error(29) [condition=:"amqp:unauthorized-access", description="Unauthorized access. 'Send' claim(s) are required to perform this operation. Resource: 'sb://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>'. TrackingId:<SOME_UUID-ISH_HERE>, SystemTracker:<A_LABEL_HERE>, Timestamp:10/25/2017 4:02:58 PM"]]
此afaik意味着SASL详细信息(用户名/密码)必须是“发件人”类型,因为我正在尝试向事件中心发送内容 . 我仔细检查了Azure门户上的这些详细信息(单击“共享访问策略”>然后使用指定为“发送”的“声明”策略)并且它们是正确的 . 所以我不确定为什么我会收到这个错误 .
我实际上尝试了在Azure门户上定义的不同级别的这些SASL策略,包括 <MY_CUSTOM_NAMESPACE>
和 <MY_CUSTOM_NAME>
,但始终是相同的错误消息 .
我也试过包括各种SASL机制,例如当使用 electron.SASLAllowedMechs("PLAIN")
时,我收到此错误: no mechanism available: No worthy mechs found (Authentication failed [mech=none])
.
3 回答
在urlStr中使用端口为5671的“amqps”方案 . 事件中心不允许普通的TCP连接 . 您还需要启用SASL PLAIN以发送在命名空间或事件中心实体上配置的SAS密钥(用户名=密钥名称,密码=密钥)(看起来您已经在执行此操作) . 我不确定golang但是使用Python绑定可以将所有内容放在Uri中,例如“amqps:// sas-key-name:url-encoded-key@your-namespace.servicebus.windows.net:5671” . 端口号是可选的 .
如果底层质子c引擎看到不同的支持SASL机制,则它可能不会使用SASL PLAIN . 要强制执行PLAIN,您可以在容器上设置允许的机制 . 在go中,SASLAllowedMechs函数似乎为您提供了创建连接时可以提供的连接选项 .
这是Python code,适用于事件中心 .
我设法在AMQP上使用"Claims-based authorization"(CBS) Build 连接 . 这似乎是微软特有的东西 . 有些详细信息可以在本页底部找到:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-protocol-guide
基本上这是步骤列表:
与
electron.VirtualHost(eventHubDomain)
的TLS连接和ANONYMOUS
SASL机制electron.SASLAllowedMechs("ANONYMOUS")
(无需指定SASL用户名和密码) . 检查上面问题 Edit 部分的详细信息^ .特殊
$cbs
事件中心名称的AMQP链接:cbsLink, err := amqpConnection.Sender(electron.Target("$cbs"))
使用Microsoft对CBS握手的要求准备AMQP消息:
消息属性(检查此C#代码以比较https://github.com/Azure/amqpnetlite/blob/master/Examples/ServiceBus/Scenarios/CbsAsyncExample.cs):
以微软想要的方式格式化的SAS令牌,我已经改编了这段代码:https://github.com/michaelbironneau/asbclient/blob/master/azure.go这样:
那段代码^基于python SDK这里:https://github.com/Azure/azure-sdk-for-python/blob/master/azure-servicebus/azure/servicebus/servicebusservice.py包含许多内容,例如大写/小写URL编码,与到期目的的时间戳混合以及SASL用户名和密码 .
构建导入
"qpid.apache.org/amqp"
的AMQP消息:使用
outcome := cbsLink.SendSync(cbsHandshakeMsg)
发送此AMQP消息然后神奇地您应该在事件中心验证一段时间了 .首先设置要连接到的事件中心名称的AMQP链接:
msgSender, err := amqpConnection.Sender(electron.Target("<MY_CUSTOM_NAME>"))
现在,您可以通过以下方式使用此最后一个AMQP链接发送要发送的消息:
完成:)
使用环境变量
PN_TRACE_FRM=true
运行此代码有助于对AMQP进行故障排除,因为proton-c
库记录了大量有用的调试消息 .由于某种原因,在连接尝试期间直接传递SASL用户名和密码的AMQP
PLAIN
机制不能与事件中心一起使用 . 它可能是他们或Electron / Qpid库的问题,我不确定,但现在至少有人能够使用golang和他们提供的 CBS Microsoft协议发送消息 .如azure AMQP protocol guide所述,需要TLS .
我们可以选择SASL PLAIN或CBS进行身份验证,以PLAIN为例,我稍微修改了一下代码并按预期工作 . 神奇的部分是连接选项:
SASLAllowInsecure返回允许或禁止明文SASL身份验证机制的ConnectionOption,如果我们选择使用SASL PLAIN,则应将其设置为true .
希望能帮助到你 .