我是一个新兴的引发流和 scala 并需要一些帮助来消费来自 kafka 的 Avro 消息并将其转换为 spark 数据帧。

请参考以下来自 Confluent kafka connect 的 Avro事件,其中包含SchemaData-payload

我需要使用它,然后从中创建一个包含“Data Rows”和“Schema”的数据帧。这听起来有点复杂,但是请您提供一些我可以使用的示例代码吗?

{
"schema": {
    "type": "struct",
    "fields": [{
        "type": "string",
        "optional": false,
        "field": "id"
    }, {
        "type": "string",
        "optional": false,
        "field": "dataSourceName"
    }, {
        "type": "array",
        "items": {
            "type": "struct",
            "fields": [{
                "type": "string",
                "optional": false,
                "field": "dataEntityName"
            }, {
                "type": "array",
                "items": {
                    "type": "string",
                    "optional": false
                },
                "optional": false,
                "field": "keyFieldNames"
            }, {
                "type": "array",
                "items": {
                    "type": "struct",
                    "fields": [{
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    }, {
                        "type": "string",
                        "optional": false,
                        "field": "type"
                    }, {
                        "type": "string",
                        "optional": true,
                        "field": "value"
                    }],
                    "optional": false,
                    "name": "Field"
                },
                "optional": false,
                "field": "fields"
            }],
            "optional": false,
            "name": "Change"
        },
        "optional": false,
        "field": "changes"
    }, {
        "type": "string",
        "optional": false,
        "field": "part"
    }],
    "optional": false,
    "name": "AvroTestEvent"
},
"payload": {
    "id": "D434000C",
    "dataSourceName": "EmployeeDB",
    "changes": [{
        "dataEntityName": "dbo.employeeTable",
        "keyFieldNames": ["id"],
        "fields": [{
            "name": "Employee_Id",
            "type": "int",
            "value": "6"
        }, {
            "name": "Employee_Name",
            "type": "varchar",
            "value": "test-employee"
        }]
    }, {
        "dataEntityName": "dbo.departmentTable",
        "keyFieldNames": ["Department_Id"],
        "fields": [{
            "name": "Department_Id",
            "type": "smallint",
            "value": "620"
        }, {
            "name": "Department_Name",
            "type": "varchar",
            "value": "ABCC"
        }]
    }],
    "part": "FULL"
}

}