我正在编写一个使用 Kafka 数据的控制台应用程序。我在 Postman 中运行它时测试了端点和工作。我的问题是在我的应用程序中,订阅方法明显失败(我没有得到任何错误的 API 错误或响应)因为我不断得到 Web 错误:{“”:50002,“message”:“Kafka 错误:消费者在尝试轮询记录时,未订阅任何主题或分配任何分区:}:
这是我的代码:
创造消费者(这是有效的)
public void CreateConsumer(string name,string url)
{
_logger.Debug($"Creating Consumer...");
try
{
var request = (HttpWebRequest) WebRequest.Create(url);
_logger.Debug($"Consumer Url: {url}");
var body = new RequestEntity
{
Name = $"{name}_consumer",
Format = "json",
AutoOffsetReset = "earliest"
};
var requestJson = JsonConvert.SerializeObject(body);
_logger.Debug($"Consumer Body: {requestJson}");
request.Method = WebRequestMethods.Http.Post;
request.ContentType = "application/vnd.kafka.v2+json";
var requestStream = request.GetRequestStream();
requestStream.WriteTimeout = 300000;
using (var streamWriter = new StreamWriter(requestStream))
{
streamWriter.Write(requestJson);
}
using (var response = (HttpWebResponse) request.GetResponse())
using (var reader = new StreamReader(response.GetResponseStream()))
{
var json = reader.ReadToEnd();
_logger.Error($"Consumer Response: {json}");
}
}
catch (Exception exception)
{
_logger.Error($"{exception.Message}", exception);
}
}
订阅(这显然不起作用)
public void Subscribe(string url,string topic)
{
_logger.Debug($"Starting Subscription");
try
{
var subUrl = url + $"/instances/{topic}_consumer/subscription";
var request = (HttpWebRequest)WebRequest.Create(subUrl);
_logger.Debug($"Subscriber Url: {subUrl}");
var topicList = new List<string> { topic };
var body = new SubscribeEntity
{
Topics = topicList
};
var requestJson = JsonConvert.SerializeObject(body);
_logger.Debug($"Subscription Body: {requestJson}");
request.Headers.Clear();
request.Method = WebRequestMethods.Http.Post;
request.ContentType = "application/vnd.kafka.v2+json";
var streamWriter = new StreamWriter(request.GetRequestStream());
try
{
streamWriter.Write(requestJson);
}
catch (Exception exception)
{
_logger.Error($"{exception.Message}", exception);
_logger.Error($"{exception.InnerException}", exception);
}
finally {
streamWriter.Dispose();
}
_logger.Debug($"Did the request have a response?: {request.HaveResponse}");
}
catch (Exception exception)
{
_logger.Error($"{exception.Message}", exception);
}
}
请求记录(有效,但返回“未订阅”错误):
public List<JsonEntity> RequestRecords(string url,string topic)
{
try
{
var subUrl = url + $"/instances/{topic}_consumer/records";
_logger.Debug($"Polling records");
List<JsonEntity> jsonEntities;
var request = (HttpWebRequest) WebRequest.Create(subUrl);
_logger.Debug($"Records Url: {subUrl}");
request.Method = WebRequestMethods.Http.Get;
request.Accept = "application/vnd.kafka.json.v2+json";
using (var response = (HttpWebResponse) request.GetResponse())
using (var reader = new StreamReader(response.GetResponseStream()))
{
_logger.Debug($"Response Headers: {response.Headers}");
var json = reader.ReadToEnd();
jsonEntities = JsonConvert.DeserializeObject<List<JsonEntity>>(json);
}
return jsonEntities;
}
catch(WebException exception)
{
if (exception.Response != null)
{
using (var response = (HttpWebResponse)exception.Response)
using(var reader = new StreamReader(response.GetResponseStream()))
{
var error = reader.ReadToEnd();
_logger.Debug($"Web Error: {error}");
}
}
_logger.Error($"{exception.Message}", exception);
return null;
}
}
这是日志:
创建消费者...消费者 URL:http://localhost/consumers/care_consumer
Consumer Body: {"name":"care_chat_metrics_consumer","format":"json","auto.offset.reset":"earliest"}
Consumer Response:
{"instance_id":"care_chat_metrics_consumer","base_uri":"http://localhost/consumers/care_consumer/instances/care_chat_metrics_consumer"}
Starting Subscription
Subscriber Url:
http://localhost/consumers/care_consumer/instances/care_chat_metrics_consumer/subscription
Subscription Body: {"topics":["care_chat_metrics"]}
Did the request have a response?: False
Polling records
Records Url:
http://localhost/consumers/care_consumer/instances/care_chat_metrics_consumer/records
Web Error: {"error_code":50002,"message":"Kafka error: Consumer is
not subscribed to any topics or assigned any partitions"}
The remote server returned an error: (500) Internal Server Error.