我是一个新兴的引发流和 scala 并需要一些帮助来消费来自 kafka 的 Avro 消息并将其转换为 spark 数据帧。
请参考以下来自 Confluent kafka connect 的 Avro事件,其中包含Schema和Data-payload。
我需要使用它,然后从中创建一个包含“Data Rows”和“Schema”的数据帧。这听起来有点复杂,但是请您提供一些我可以使用的示例代码吗?
{
"schema": {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "id"
}, {
"type": "string",
"optional": false,
"field": "dataSourceName"
}, {
"type": "array",
"items": {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "dataEntityName"
}, {
"type": "array",
"items": {
"type": "string",
"optional": false
},
"optional": false,
"field": "keyFieldNames"
}, {
"type": "array",
"items": {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "name"
}, {
"type": "string",
"optional": false,
"field": "type"
}, {
"type": "string",
"optional": true,
"field": "value"
}],
"optional": false,
"name": "Field"
},
"optional": false,
"field": "fields"
}],
"optional": false,
"name": "Change"
},
"optional": false,
"field": "changes"
}, {
"type": "string",
"optional": false,
"field": "part"
}],
"optional": false,
"name": "AvroTestEvent"
},
"payload": {
"id": "D434000C",
"dataSourceName": "EmployeeDB",
"changes": [{
"dataEntityName": "dbo.employeeTable",
"keyFieldNames": ["id"],
"fields": [{
"name": "Employee_Id",
"type": "int",
"value": "6"
}, {
"name": "Employee_Name",
"type": "varchar",
"value": "test-employee"
}]
}, {
"dataEntityName": "dbo.departmentTable",
"keyFieldNames": ["Department_Id"],
"fields": [{
"name": "Department_Id",
"type": "smallint",
"value": "620"
}, {
"name": "Department_Name",
"type": "varchar",
"value": "ABCC"
}]
}],
"part": "FULL"
}
}