我需要帮助了解NiFi中 ValidateRecord
和 ConvertRecord
处理器的理解和功能 .
My requirement
我有一个具有各种属性的JSON流文件内容,很少有字段是必需的,很少是可选的,也很少有字段是时间戳字段和少数双字型 . 有些字段应该具有LOV中的值
我使用了 AvroSchemaRegistry
及以下的架构定义:
{
"namespace": "nifi",
"name": "test_json",
"type": "record",
"fields": [{
"name": "sn",
"type": "string"
}, {
"name": "result",
"type": {
"name": "Result",
"type": "enum",
"symbols": ["PASS", "FAIL"]
}
}, {
"name": "product",
"type": "string",
"maxLength": 8
}, {
"name": "test_station_name",
"type": "string",
"maxLength": 32
}, {
"name": "station_id",
"type": "string"
}, {
"name": "mac_address",
"type": "string"
}, {
"name": "start_time",
"type": {
"type": "string",
"logicalType": "timestamp-millis"
}
}, {
"name": "stop_time",
"type": {
"type": "string",
"logicalType": "timestamp-millis"
}
}, {
"name": "f_p_dip_wave",
"type": ["null", "double"]
}, {
"name": "f_p_dip_depth",
"type": ["null", "double"]
}, {
"name": "f_p_dip_height",
"type": ["null", "double"]
}, {
"name": "radius",
"type": ["null", "double"]
}, {
"name": "diameter",
"type": ["null", "double"]
}, {
"name": "gain_cavity_offset_nm",
"type": ["null", "double"]
}]
}
字段结果应具有PASS或FAIL值,product和test_station_name可以对值具有最大长度限制,并且具有默认值“null”的字段是必需的 .
源端,客户端应用程序可以在json中发送任何属性集,我正在尝试使用validaterecord使用jsonreader和jsonwriter来验证记录,然后使用convertrecord根据模式适当地转换json .
我可以看到NiFi能够检测字段名称并能够对数据类型(字符串,双精度和时间戳)执行验证,但不能使除了PASS或FAIL 2之外的结果属性的基于记录的枚举值无效 . 不会使product和test_station_name字段的记录长度大于架构中定义的最大长度的记录无效 . 3.此外,即使JSON没有强制字段的属性和值,验证记录也会考虑这些字段的“空” . 而是所有JSON记录都已成功验证 .
Question
NiFi ValidateRecord
和 ConvertRecord
可以用于验证并将传入的JSON转换为传出的json,并使用上面解释的一些验证规则 . 如果没有,是否有替代方法使用groovy脚本根据模式执行此类验证和传入json的转换 .
请建议 . 任何帮助是极大的赞赏 .
1 回答
1)目前,Avro架构的枚举转换为NiFi内部记录架构中的字符串类型,这就是为什么任何值都要传递的原因 . 在NiFi的记录模式中需要有一个枚举类型,它从Avro模式中捕获了允许的值 .
2)我在Avro规范中找不到关于maxLength的任何内容 - https://avro.apache.org/docs/current/spec.html这是真的吗?如果是,那么NiFi可以考虑加入它 .
3)如果一个字段没有值,那么它应该是无效的,除非该字段的类型是带有“null”的联合,例如“type”:[“null”,“double”],这将意味着该字段不是必需的,允许为null或double .