首页 文章

从ServiceBus触发的azure功能中检索IoT Hub Twin

提问于
浏览
0

我们将数据从IoT设备发送到Azure IoT Hub,并尝试将某种类型的消息传递给Azure功能 .

目前,我们是通过创建Azure Service Bus endpoints 并在IoTHub中创建消息路由来实现的 . 它按预期工作,Azure功能正确接收消息 .

现在,我们想要在Azure功能中从IoT Hub获取DeviceId,以及在Device Twin中定义的标签,我完全不知道如何做到这一点 .

如果我们使用 EventHubTrigger ,它似乎很简单,做这样的事情:

public static class Test
{
    [FunctionName("TestQueueTrigger")]
    public static void Run(
        [EventHubTrigger("messages/events", Connection = "IoTHubConnection")]
        EventData message,
        Twin deviceTwin,
        TraceWriter log)
    { ... }
}

但目前尚不清楚如何使用Service Bus触发器完成此操作 .

此外,我们希望将所有消息(独立于路由)存储到Azure Data Lake存储中,并且我对如何工作有点失落 .

1 回答

  • 1

    Azure IoT Hub设备到 Cloud 消息格式描述为here . 这种格式没有设备双胞胎属性 . 设备双胞胎存储在 Cloud 后端中,可以根据到特定 endpoints (内置和/或自定义 endpoints )的iot中心路由通知其更改 .

    您的函数“TestQueueTrigger”的示例正在使用azure-functions-iothub-extension用于版本1.扩展输入绑定 Twin 允许使用扩展中的单独调用来获取设备twin:

    deviceTwin = await registryManager.GetTwinAsync(attribute.DeviceId);
    

    基本上,此扩展也可用于ServiceBusTrigger绑定 . 请注意,此扩展只能用于功能版本1,因此我建议使用例如REST API Get Twin调用在设备中使用设备 .

    Update

    以下代码段显示了ServiceBusTrigger函数和REST API Get Twin调用的示例 .

    run.csx文件:

    #r "..\\bin\\Microsoft.Azure.ServiceBus.dll"
    #r "..\\bin\\Microsoft.Azure.Devices.Shared.dll"
    #r "Microsoft.Azure.WebJobs.ServiceBus"
    #r "Newtonsoft.Json"
    
    
    using System;
    using System.Threading.Tasks;
    using System.Text;
    using Newtonsoft.Json;
    using Newtonsoft.Json.Linq;
    using Microsoft.Azure.WebJobs.ServiceBus;
    using Microsoft.Azure.ServiceBus;
    using System.Globalization;
    using System.Linq;
    using System.Net.Http;
    using System.Security.Cryptography;
    using System.Web;
    using Microsoft.Azure.Devices.Shared;
    
    // reusable proxy
    static HttpClientHelper iothub = new HttpClientHelper(Environment.GetEnvironmentVariable("AzureIoTHubShariedAccessPolicy"));
    
    public static async Task Run(Message queueItem, ILogger log)
    {
        // payload
        log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(queueItem.Body)}");
    
        // device identity Id
        var deviceId = queueItem.UserProperties["iothub-connection-device-id"];
    
        // get the device twin
        var response = await iothub.Client.GetAsync($"/twins/{deviceId}?api-version=2018-06-30");
        response.EnsureSuccessStatusCode();
        Twin twin = await response.Content.ReadAsAsync<Twin>();
    
        log.LogInformation(JsonConvert.SerializeObject(twin.Tags, Formatting.Indented));
    
        await Task.CompletedTask;
    }
    
    
    // helpers
    class HttpClientHelper
    {
        HttpClient client;
        DateTime expiringSaS;
        (string hostname, string keyname, string key) config;
    
        public HttpClientHelper(string connectionString)
        {
            config = GetPartsFromConnectionString(connectionString);
            client = new HttpClient() { BaseAddress = new Uri($"https://{config.hostname}")};
            SetAuthorizationHeader();         
        }
    
        public HttpClient Client
        {
            get
            {          
                if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
                {
                   SetAuthorizationHeader();  
                }         
                return client;
            }
        }
    
        internal void SetAuthorizationHeader()
        {
            lock (client)
            {
                if (expiringSaS < DateTime.UtcNow.AddMinutes(-1)) 
                {
                    string sasToken = GetSASToken(config.hostname, config.key, config.keyname, 1);
                    if (client.DefaultRequestHeaders.Contains("Authorization"))
                        client.DefaultRequestHeaders.Remove("Authorization");
                    client.DefaultRequestHeaders.Add("Authorization", sasToken);
                    expiringSaS = DateTime.UtcNow.AddHours(1);
                }
            }
        }
    
        internal (string hostname, string keyname, string key) GetPartsFromConnectionString(string connectionString)
        {
            var parts = connectionString.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Split(new[] { '=' }, 2)).ToDictionary(x => x[0].Trim(), x => x[1].Trim());
            return (parts["HostName"] ?? "", parts["SharedAccessKeyName"] ?? "", parts["SharedAccessKey"] ?? "");
        }
    
        internal string GetSASToken(string resourceUri, string key, string keyName = null, uint hours = 24)
        {
            var expiry = GetExpiry(hours);
            string stringToSign = HttpUtility.UrlEncode(resourceUri) + "\n" + expiry;
            HMACSHA256 hmac = new HMACSHA256(Convert.FromBase64String(key));
    
            var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
            var sasToken = String.Format(CultureInfo.InvariantCulture, $"SharedAccessSignature sr={HttpUtility.UrlEncode(resourceUri)}&sig={HttpUtility.UrlEncode(signature)}&se={expiry}");
            if (!string.IsNullOrEmpty(keyName))
                sasToken += $"&skn={keyName}";
            return sasToken;
        }
    
        internal string GetExpiry(uint hours = 24)
        {
            TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);
            return Convert.ToString((int)sinceEpoch.TotalSeconds + 3600 * hours);
        }
    }
    

    function.json:

    {
      "bindings": [
        {
          "name": "queueItem",
          "type": "serviceBusTrigger",
          "direction": "in",
          "queueName": "myQueue",
          "connection": "myConnectionString_SERVICEBUS"
        }
      ]
    }
    

相关问题