好奇,如果有人有这个工作,因为我目前正在努力 .
我创建了简单的Source和Sink应用程序来发送和接收基于Avro架构的消息 . 消息的架构保存在Confluent架构注册表中 . 这两个应用程序都配置为使用ConfluentSchemaRegistryClient类,但我认为这里可能存在一个错误 . 这就是我看到的让我惊讶的地方 .
如果我与Confluent注册表的REST API交互,我可以看到只有一个版本的架构有问题(轻微编辑以模糊我正在处理的内容):
$ curl -i "http://schemaregistry:8081/subjects/somesubject/versions"
HTTP/1.1 200 OK
Date: Fri, 05 May 2017 16:13:37 GMT
Content-Type: application/vnd.schemaregistry.v1+json
Content-Length: 3
Server: Jetty(9.2.12.v20150709)
[1]
当源应用程序通过Kafka发送其消息时,我注意到 Headers 中的版本看起来有点时髦:
contentType"application/octet-stream"originalContentType/"application/vnd.somesubject.v845+avro"
我不是100%清楚为什么 application/vnd.somesubject.v845+avro
内容类型被包含在 application/octet-stream
中但忽略了这一点,请注意它是说版本845而不是版本1 .
查看 ConfluentSchemaRegistryClient
实现,我看到它 POST
到 /subjects/(string: subject)/versions
并返回架构的id而不是版本 . 然后将其放入 SchemaReference
的 version
字段:https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/ConfluentSchemaRegistryClient.java#L81
当Sink应用程序尝试根据标头获取邮件的架构时,它会失败,因为它尝试获取版本845,它从 Headers 中拔出:https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/ConfluentSchemaRegistryClient.java#L87
有人对此有何看法?提前致谢 .
更新
确定这是一个错误 . 取了 ConfluentSchemaRegistryClient
并稍微修改 register
方法到 POST
到 /subjects/(string: subject)
(即删除尾随的 /versions
),每个Confluent REST API文档返回一个包含 version
的有效负载 . 奇迹般有效:
public SchemaRegistrationResponse register(String subject, String format, String schema) {
Assert.isTrue("avro".equals(format), "Only Avro is supported");
String path = String.format("/subjects/%s", subject);
HttpHeaders headers = new HttpHeaders();
headers.put("Accept",
Arrays.asList("application/vnd.schemaregistry.v1+json", "application/vnd.schemaregistry+json",
"application/json"));
headers.add("Content-Type", "application/json");
Integer version = null;
try {
String payload = this.mapper.writeValueAsString(Collections.singletonMap("schema", schema));
HttpEntity<String> request = new HttpEntity<>(payload, headers);
ResponseEntity<Map> response = this.template.exchange(this.endpoint + path, HttpMethod.POST, request,
Map.class);
version = (Integer) response.getBody().get("version");
}
catch (JsonProcessingException e) {
e.printStackTrace();
}
SchemaRegistrationResponse schemaRegistrationResponse = new SchemaRegistrationResponse();
schemaRegistrationResponse.setId(version);
schemaRegistrationResponse.setSchemaReference(new SchemaReference(subject, version, "avro"));
return schemaRegistrationResponse;
}