我正在编写一个使用 Kafka 数据的控制台应用程序。我在 Postman 中运行它时测试了端点和工作。我的问题是在我的应用程序中,订阅方法明显失败(我没有得到任何错误的 API 错误或响应)因为我不断得到 Web 错误:{“”:50002,“message”:“Kafka 错误:消费者在尝试轮询记录时,未订阅任何主题或分配任何分区:}:

这是我的代码:

创造消费者(这是有效的)

public void CreateConsumer(string name,string url)
{
    _logger.Debug($"Creating Consumer...");
    try
    {
        var request = (HttpWebRequest) WebRequest.Create(url);
        _logger.Debug($"Consumer Url: {url}");
        var body = new RequestEntity
        {
            Name = $"{name}_consumer",
            Format = "json",
            AutoOffsetReset = "earliest"
        };
        var requestJson = JsonConvert.SerializeObject(body);

        _logger.Debug($"Consumer Body: {requestJson}");

        request.Method = WebRequestMethods.Http.Post;
        request.ContentType = "application/vnd.kafka.v2+json";
        var requestStream = request.GetRequestStream();
        requestStream.WriteTimeout = 300000;
        using (var streamWriter = new StreamWriter(requestStream))
        {
            streamWriter.Write(requestJson);

        }
        using (var response = (HttpWebResponse) request.GetResponse())
        using (var reader = new StreamReader(response.GetResponseStream()))
        {
            var json = reader.ReadToEnd();
            _logger.Error($"Consumer Response: {json}");
        }

    }
    catch (Exception exception)
    {
        _logger.Error($"{exception.Message}", exception);
    }
}

订阅(这显然不起作用)

public void Subscribe(string url,string topic)
{
    _logger.Debug($"Starting Subscription");
    try
    {
        var subUrl = url + $"/instances/{topic}_consumer/subscription";
        var request = (HttpWebRequest)WebRequest.Create(subUrl);
        _logger.Debug($"Subscriber Url: {subUrl}");
        var topicList = new List<string> { topic };
        var body = new SubscribeEntity
        {
            Topics = topicList
        };
        var requestJson = JsonConvert.SerializeObject(body);
        _logger.Debug($"Subscription Body: {requestJson}");
        request.Headers.Clear();
        request.Method = WebRequestMethods.Http.Post;
        request.ContentType = "application/vnd.kafka.v2+json";

        var streamWriter = new StreamWriter(request.GetRequestStream());

            try
            {

                streamWriter.Write(requestJson);
            }
            catch (Exception exception)
            {
                _logger.Error($"{exception.Message}", exception);
                _logger.Error($"{exception.InnerException}", exception);
            }
            finally { 
            streamWriter.Dispose();
            }
        _logger.Debug($"Did the request have a response?: {request.HaveResponse}");

    }
    catch (Exception exception)
    {
        _logger.Error($"{exception.Message}", exception);
    }
}

请求记录(有效,但返回“未订阅”错误):

public List<JsonEntity> RequestRecords(string url,string topic)
{
    try
    {
        var subUrl = url + $"/instances/{topic}_consumer/records";
        _logger.Debug($"Polling records");
        List<JsonEntity> jsonEntities;
        var request = (HttpWebRequest) WebRequest.Create(subUrl);
        _logger.Debug($"Records Url: {subUrl}");
        request.Method = WebRequestMethods.Http.Get;
        request.Accept = "application/vnd.kafka.json.v2+json";
        using (var response = (HttpWebResponse) request.GetResponse())
        using (var reader = new StreamReader(response.GetResponseStream()))
        {
            _logger.Debug($"Response Headers: {response.Headers}");
            var json = reader.ReadToEnd();
            jsonEntities = JsonConvert.DeserializeObject<List<JsonEntity>>(json);
        }
        return jsonEntities;
    }
    catch(WebException exception)
    {
        if (exception.Response != null)
        {
            using (var response = (HttpWebResponse)exception.Response)
            using(var reader = new StreamReader(response.GetResponseStream()))
            {
                var error = reader.ReadToEnd();
                _logger.Debug($"Web Error: {error}");
            }

        }
        _logger.Error($"{exception.Message}", exception);
        return null;
    }
}

这是日志:

创建消费者...消费者 URL:http://localhost/consumers/care_consumer

Consumer Body: {"name":"care_chat_metrics_consumer","format":"json","auto.offset.reset":"earliest"}

  Consumer Response:
  {"instance_id":"care_chat_metrics_consumer","base_uri":"http://localhost/consumers/care_consumer/instances/care_chat_metrics_consumer"}

  Starting Subscription

  Subscriber Url:
  http://localhost/consumers/care_consumer/instances/care_chat_metrics_consumer/subscription

  Subscription Body: {"topics":["care_chat_metrics"]}

  Did the request have a response?: False

  Polling records

  Records Url:
  http://localhost/consumers/care_consumer/instances/care_chat_metrics_consumer/records

  Web Error: {"error_code":50002,"message":"Kafka error: Consumer is
  not subscribed to any topics or assigned any partitions"}

  The remote server returned an error: (500) Internal Server Error.