首页 文章

使用QPid和golang包装器Electron连接到AMQP 1.0 Azure EventHub

提问于
浏览
0

我想使用Electron golang包装器为Qpid proton-c库连接到Azure EventHub .

我将以下SASL详细信息组合到构建连接字符串所需的host / port / namespace / path但由于某种原因我不断收到错误消息: connection reset by peer .

package main

import (
    "fmt"
    "os"
    "strings"
    "qpid.apache.org/amqp"
    "qpid.apache.org/electron"
)

var (
    eventHubNamespaceName = "<MY_CUSTOM_NAMESPACE>"
    eventHubName = "<MY_CUSTOM_NAME>"
    eventHubSasKeyName = "<MY_CUSTOM_SAS_KEY_NAME>"
    eventHubSasKey = "<MY_CUSTOM_SAS_KEY>" // this is the base64 encoded stuff
)

func main() {

    sentChan := make(chan electron.Outcome) // Channel to receive acknowledgements.
    container := electron.NewContainer(fmt.Sprintf("send[%v]", os.Getpid()))

    urlStr := fmt.Sprintf("amqp://%s.servicebus.windows.net:5671/%s", eventHubNamespaceName, eventHubName)
    fmt.Printf("The URL connection string: '%v'\n", urlStr)

    // parse URL
    url, err := amqp.ParseURL(urlStr)
    if err != nil {
        panic(err)
    }
    fmt.Printf("The AMQP parsed URL: %v\n", url)

    // TCP dial
    amqpHost := url.Host
    fmt.Printf("The AMQP host used in the connection is: '%v'\n", amqpHost)
    c, err := container.Dial(
        "tcp", amqpHost, 
        electron.SASLEnable(), 
        electron.Password([]byte(eventHubSasKey)), 
        electron.User(eventHubSasKeyName),
    )
    if err != nil {
        panic(err)
    }
    defer c.Close(nil)

    // AMQP send
    addr := strings.TrimPrefix(url.Path, "/")
    s, err := c.Sender(electron.Target(addr))
    if err != nil {
        panic(err)
    }
    m := amqp.NewMessage()
    body := fmt.Sprintf("bla bla bla %v", 42)
    m.Marshal(body)
    fmt.Printf("The AMQP message body: '%v'\n", m.Body())

    go s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan

    // AMQP ACK receive
    fmt.Printf("Waiting for ACKs...\n")
    for {
        fmt.Printf("Waiting for an ACK coming out of the channel...\n")
        out := <-sentChan // Outcome of async sends.
        fmt.Printf("Received something: '%v'\n", out)
    }   
}

编译时,然后运行代码,这是输出:

The URL connection string: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP parsed URL: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP host used in the connection is: '<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671'
The AMQP message body: 'bla bla bla 42'
Waiting for ACKs...
Waiting for an ACK coming out of the channel...
Received something: '{unsent : read tcp <MY_PRIVATE_IP_IN_LAN>:<SOME_PORT>-><THE_NSLOOKUP_IP_OF_THE_AZURE_EVENTHUB>:5671: read: connection reset by peer bla bla bla 42}'
Waiting for an ACK coming out of the channel...

对我来说,收到消息说 connection reset by peer 看起来不是一个有效的ACK,我不确定连接尝试有什么问题?

  • proton-c 的编译版本是0.18.0,我使用 go1.7.4 linux/amd64 .

  • 如果我将 electron.SASLAllowedMechs("EXTERNAL") 添加到连接选项,那么我会收到相同的错误消息 .

  • 如果我将端口更改为 5672 ,则在尝试通过TCP拨号后出现 connection refused 恐慌错误 .

  • 如果我使用 base64.StdEncoding.DecodeString(eventHubSasKey) 解码base64密码字段并将字节传递给连接选项,我会一直收到相同的错误 connection reset by peer .

  • 如果我添加此连接选项 electron.SASLAllowedMechs("ANONYMOUS") ,那么我仍然会收到相同的错误消息 connection reset by peer . 这样做的原因是我没有使用任何SSL证书,并且Microsoft提供的AMQP的Java包装器似乎使用了这个"anonymous"而不是证书(事实上,使用Java连接器连接到EventHub不需要证书) ) .

我不知道如何继续这里,因为我被困在连接部分,我相信SASL的详细信息是按照这里的文档以正确的方式传递的:https://godoc.org/qpid.apache.org/electron#ConnectionOption

我仍然不确定失败的原因不是由于SSL证书,如果是这样的话我很难看到如何在过程中包含它们 .

Edit:

我后来发现我必须通过TCP Build TLS连接,即使我没有提供任何私有/公共密钥,也指定了“虚拟主机”(否则AMQP抱怨无法识别主机):

// TLS connection details
    tlsConfig := &tls.Config{}
    eventHubDomainPort := fmt.Sprintf("%s.servicebus.windows.net:5671", eventHubNamespaceName)
    tlsConn, err := tls.Dial("tcp", eventHubDomainPort, tlsConfig)
    if err != nil {
        panic(err)
    }

    // AMPQ container connection on top of TLS via TCP
    eventHubDomain := fmt.Sprintf("%s.servicebus.windows.net", eventHubNamespaceName)
    amqpConn, err := container.Connection(
        tlsConn, 
        electron.SASLEnable(),
        electron.User(eventHubSasKeyName), 
        electron.Password([]byte(eventHubSasKey)),
        electron.VirtualHost(eventHubDomain),
        // electron.SASLAllowedMechs(<SOME_MECHANISM>),
    )
    if err != nil {
        panic(err)
    }
    defer amqpConn.Close(nil)

    // AMQP sender (a AMQP link with target the name defined on the Azure portal)
    s, err := amqpConn.Sender(electron.Target(eventHubName))
    if err != nil {
        panic(err)
    }

但是,当使用环境变量 PN_TRACE_FRM=true (在 proton-c 级别给我一些详细的日志记录)运行应用程序时,错误是:

[handle=0, closed=true, error=@error(29) [condition=:"amqp:unauthorized-access", description="Unauthorized access. 'Send' claim(s) are required to perform this operation. Resource: 'sb://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>'. TrackingId:<SOME_UUID-ISH_HERE>, SystemTracker:<A_LABEL_HERE>, Timestamp:10/25/2017 4:02:58 PM"]]

此afaik意味着SASL详细信息(用户名/密码)必须是“发件人”类型,因为我正在尝试向事件中心发送内容 . 我仔细检查了Azure门户上的这些详细信息(单击“共享访问策略”>然后使用指定为“发送”的“声明”策略)并且它们是正确的 . 所以我不确定为什么我会收到这个错误 .

我实际上尝试了在Azure门户上定义的不同级别的这些SASL策略,包括 <MY_CUSTOM_NAMESPACE><MY_CUSTOM_NAME> ,但始终是相同的错误消息 .

我也试过包括各种SASL机制,例如当使用 electron.SASLAllowedMechs("PLAIN") 时,我收到此错误: no mechanism available: No worthy mechs found (Authentication failed [mech=none]) .

3 回答

  • 0

    在urlStr中使用端口为5671的“amqps”方案 . 事件中心不允许普通的TCP连接 . 您还需要启用SASL PLAIN以发送在命名空间或事件中心实体上配置的SAS密钥(用户名=密钥名称,密码=密钥)(看起来您已经在执行此操作) . 我不确定golang但是使用Python绑定可以将所有内容放在Uri中,例如“amqps:// sas-key-name:url-encoded-key@your-namespace.servicebus.windows.net:5671” . 端口号是可选的 .

    如果底层质子c引擎看到不同的支持SASL机制,则它可能不会使用SASL PLAIN . 要强制执行PLAIN,您可以在容器上设置允许的机制 . 在go中,SASLAllowedMechs函数似乎为您提供了创建连接时可以提供的连接选项 .

    这是Python code,适用于事件中心 .

  • 1

    我设法在AMQP上使用"Claims-based authorization"(CBS) Build 连接 . 这似乎是微软特有的东西 . 有些详细信息可以在本页底部找到:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-protocol-guide

    基本上这是步骤列表:

    • electron.VirtualHost(eventHubDomain) 的TLS连接和 ANONYMOUS SASL机制 electron.SASLAllowedMechs("ANONYMOUS") (无需指定SASL用户名和密码) . 检查上面问题 Edit 部分的详细信息^ .

    • 特殊 $cbs 事件中心名称的AMQP链接: cbsLink, err := amqpConnection.Sender(electron.Target("$cbs"))

    • 使用Microsoft对CBS握手的要求准备AMQP消息:

    消息属性(检查此C#代码以比较https://github.com/Azure/amqpnetlite/blob/master/Examples/ServiceBus/Scenarios/CbsAsyncExample.cs):

    appProps := make(map[string]interface{})
    appProps["operation"] = "put-token"
    appProps["type"] = "servicebus.windows.net:sastoken"
    appProps["name"] = "amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>"
    

    以微软想要的方式格式化的SAS令牌,我已经改编了这段代码:https://github.com/michaelbironneau/asbclient/blob/master/azure.go这样:

    aqClient := newClient(Queue, "<MY_CUSTOM_NAMESPACE>", "<MY_CUSTOM_SAS_KEY_NAME>", "<MY_CUSTOM_SAS_KEY>")
    sasToken := aqClient.authHeader("amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>", aqClient.signatureExpiry(time.Now()))
    

    那段代码^基于python SDK这里:https://github.com/Azure/azure-sdk-for-python/blob/master/azure-servicebus/azure/servicebus/servicebusservice.py包含许多内容,例如大写/小写URL编码,与到期目的的时间戳混合以及SASL用户名和密码 .

    构建导入 "qpid.apache.org/amqp" 的AMQP消息:

    cbsHandshakeMsg := amqp.NewMessage()
    cbsHandshakeMsg.SetApplicationProperties(appProps)
    cbsHandshakeMsg.Marshal(sasToken)
    
    • 使用 outcome := cbsLink.SendSync(cbsHandshakeMsg) 发送此AMQP消息然后神奇地您应该在事件中心验证一段时间了 .

    • 首先设置要连接到的事件中心名称的AMQP链接: msgSender, err := amqpConnection.Sender(electron.Target("<MY_CUSTOM_NAME>"))

    现在,您可以通过以下方式使用此最后一个AMQP链接发送要发送的消息:

    m := amqp.NewMessage()
    m.Marshal("my message: bla bla bla, foo bar baz!")
    outcome := msgSender.SendSync(m)
    

    完成:)

    使用环境变量 PN_TRACE_FRM=true 运行此代码有助于对AMQP进行故障排除,因为 proton-c 库记录了大量有用的调试消息 .

    由于某种原因,在连接尝试期间直接传递SASL用户名和密码的AMQP PLAIN 机制不能与事件中心一起使用 . 它可能是他们或Electron / Qpid库的问题,我不确定,但现在至少有人能够使用golang和他们提供的 CBS Microsoft协议发送消息 .

  • 0

    azure AMQP protocol guide所述,需要TLS .

    设置连接和TLS后,Service Bus提供两种SASL机制选项:SASL PLAIN通常用于将用户名和密码凭据传递给服务器 . Service Bus没有帐户,但命名为Shared Access Security规则,它赋予权限并与密钥相关联 . 规则名称用作用户名,密钥(作为base64编码文本)用作密码 . 与所选规则关联的权限管理连接上允许的操作 . 当客户端想要使用稍后描述的基于声明的安全性(CBS)模型时,SASL ANONYMOUS用于绕过SASL授权 . 使用此选项,可以在短时间内匿名 Build 客户端连接,在此期间客户端只能与CBS endpoints 交互,并且必须完成CBS握手 .

    我们可以选择SASL PLAIN或CBS进行身份验证,以PLAIN为例,我稍微修改了一下代码并按预期工作 . 神奇的部分是连接选项:

    amqpConn, err := container.Connection(
        tlsConn,
        electron.SASLEnable(),
        electron.Password([]byte(eventHubSasKey)),
        electron.User(eventHubSasKeyName),
        electron.VirtualHost(eventHubDomain),
        electron.SASLAllowInsecure(true),
        electron.SASLAllowedMechs("PLAIN"),
    )
    

    SASLAllowInsecure返回允许或禁止明文SASL身份验证机制的ConnectionOption,如果我们选择使用SASL PLAIN,则应将其设置为true .

    希望能帮助到你 .

相关问题