首页 文章

在nifi 6.0中将ExecuteSQL处理器的结果与Json内容合并

提问于
浏览
3

我正在处理包含地理坐标点的json对象 . 我想对我在本地的postgis服务器运行这些点来评估多边形匹配中的点 .

我希望用预先存在的处理器做到这一点 - 我使用“EvaluateJsonPath”处理器成功地将lat / lon坐标提取到属性中,并使用“ExecuteSQL”成功向我的本地postgis数据存储区发出查询 . 这给我留下了avro响应,然后我可以使用“ConvertAvroToJSON”处理器将其转换为JSON .

我在如何将查询结果与原始JSON对象一起合并时遇到了概念上的麻烦 . 实际上,我有两个具有相同片段ID的流文件,理论上我可以将它与“mergecontent”合并在一起,但这让我:

{"my":"original json", "coordinates":[47.38, 179.22]}{"polygon_match":"a123"}

是否有任何建议的策略将SQL查询的结果合并到原始的json结构中,所以我的结果将是这样的:

{"my":"original json", "coordinates":[47.38, 179.22], "polygon_match":"a123"}

我正在运行nifi 6.0,postgres 9.5.2和postgis 2.2.1 .

我看到了在https://community.hortonworks.com/questions/22090/issue-merging-content-in-nifi.html中使用replaceText处理器的一些参考 - 但这似乎是将属性中的内容合并到内容的主体中 . 我忽略了合并原始内容的内容以及SQL响应的内容,或者在没有内容的情况下从SQL响应中提取的属性 .

编辑:

Groovy脚本似乎可以执行所需操作 . 我不是一个常规编码器,所以欢迎任何改进 .

import org.apache.commons.io.IOUtils
import java.nio.charset.*
import groovy.json.JsonSlurper

def flowFile = session.get();
if (flowFile == null) {
    return;
}
def slurper = new JsonSlurper()

flowFile = session.write(flowFile,
    { inputStream, outputStream ->
        def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        def obj = slurper.parseText(text)
        def originaljsontext = flowFile.getAttribute('original.json')
        def originaljson = slurper.parseText(originaljsontext)
        originaljson.put("point_polygon_info", obj)
        outputStream.write(groovy.json.JsonOutput.toJson(originaljson).getBytes(StandardCharsets.UTF_8))
    } as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

1 回答

  • 4

    如果您的原始JSON相对较小,可能的方法可能如下......

    • 在到达ExecuteSQL之前使用ExtractText将原始JSON复制到属性中 .

    • 在ExecuteSQL之后,在ConvertAvroToJSON之后,使用ExecuteScript处理器创建一个新的JSON文档,该文档将属性中的原始内容与内容中的结果相结合 .

    我不确定脚本中需要做什么,但我知道其他人通过ExecuteScript处理器使用Groovy和JsonSlurper取得了成功 .

相关问题