我正在尝试在NiFi中创建一个Python脚本:
-
从传入的流文件中读取一些属性
-
读取流文件的json内容并提取特定字段
-
将属性写入传出流文件
-
使用在脚本中创建的新内容覆盖传入的流文件(例如,返回新json的API调用)并将其发送到SUCCESS关系或删除旧的流文件并使用所需的内容创建新的
到目前为止我做了什么:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback,InputStreamCallback, OutputStreamCallback
class OutputWrite(OutputStreamCallback, obj):
def __init__(self):
self.obj = obj
def process(self, outputStream):
outputStream.write(bytearray(json.dumps(self.obj).encode('utf')))
###end class###
flowfile = session.get()
if flowfile != None:
**#1) Get flowfile attributes**
headers = {
'Accept-Encoding': 'gzip, deflate, br',
'Accept': 'application/json, text/plain, */*',
'Cache-Control': 'no-cache',
'Ocp-Apim-Trace': 'true',
'Authorization': flowfile.getAttribute('Authorization')
}
collection = flowfile.getAttribute('collection')
dataset = flowfile.getAttribute('dataset')
**#2)Get flowfile content**
stream_content = session.read(flowfile)
text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
json_content = json.loads(text_content)
records = json_content['result']['count']
pages = records/10000
**#3) Write flowfile attributes**
flowfile = session.putAttribute(flowfile, 'collection', collection)
flowfile = session.putAttribute(flowfile, 'dataset', dataset)
**#API operations: output_json with desired data**
output_json = {some data}
**#4) Write final JSON data to output flowfile**
flowfile = session.write(flowfile, OutputWrite(output_json))
session.transfer(flowfile, REL_SUCCESS)
session.commit()
我的问题是我找不到一种方法来传递对所需的output_json对象的引用作为OutputStreamCallback类中的参数 . 关于如何解决这个问题或者更好的方法的任何想法?
在这种情况下,在类的进程函数中执行所有API操作是否更容易,但是如何访问进程函数中的传入流文件属性(需要会话或流文件对象)?
任何帮助非常感谢!
2 回答
我在下面包含了示例Python代码,它允许一个自定义的
PyStreamCallback
类,该类实现了从主题上的Matt Burgess' blog article转换流文件内容中的JSON的逻辑,但我建议您考虑使用本机处理器UpdateAttribute
和EvaluateJSONPath
来执行相关活动并且只使用自定义代码来执行NiFi无法开箱即用的任务 .Update:
要在回调中访问流文件的属性,只需将其作为参数传递给构造函数,将其存储为字段,并在
process
方法中引用它 . 下面是一个非常简单的示例,它将属性my_attr
的值连接到传入的流文件内容并将其写回:传入的流文件:
传出流文件:
你可以试试这样的东西 -