首页 文章

NiFi中的Python ExecuteScript:转换流文件属性和内容

提问于
浏览
0

我正在尝试在NiFi中创建一个Python脚本:

  • 从传入的流文件中读取一些属性

  • 读取流文件的json内容并提取特定字段

  • 将属性写入传出流文件

  • 使用在脚本中创建的新内容覆盖传入的流文件(例如,返回新json的API调用)并将其发送到SUCCESS关系或删除旧的流文件并使用所需的内容创建新的

到目前为止我做了什么:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback,InputStreamCallback, OutputStreamCallback

class OutputWrite(OutputStreamCallback, obj):

def __init__(self):
    self.obj = obj

def process(self, outputStream):

    outputStream.write(bytearray(json.dumps(self.obj).encode('utf')))

###end class###

flowfile = session.get()

if flowfile != None:

**#1) Get flowfile attributes**

    headers = {
        'Accept-Encoding': 'gzip, deflate, br',
        'Accept': 'application/json, text/plain, */*',
        'Cache-Control': 'no-cache',
        'Ocp-Apim-Trace': 'true',
        'Authorization': flowfile.getAttribute('Authorization')
    }

    collection = flowfile.getAttribute('collection')
    dataset = flowfile.getAttribute('dataset')

    **#2)Get flowfile content**

    stream_content = session.read(flowfile)
    text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
    json_content = json.loads(text_content)

    records = json_content['result']['count']
    pages = records/10000

    **#3) Write flowfile attributes**

    flowfile = session.putAttribute(flowfile, 'collection', collection)
    flowfile = session.putAttribute(flowfile, 'dataset', dataset)

    **#API operations: output_json with desired data**

    output_json = {some data}

    **#4) Write final JSON data to output flowfile**

    flowfile = session.write(flowfile, OutputWrite(output_json))

    session.transfer(flowfile, REL_SUCCESS)
    session.commit()

我的问题是我找不到一种方法来传递对所需的output_json对象的引用作为OutputStreamCallback类中的参数 . 关于如何解决这个问题或者更好的方法的任何想法?

在这种情况下,在类的进程函数中执行所有API操作是否更容易,但是如何访问进程函数中的传入流文件属性(需要会话或流文件对象)?

任何帮助非常感谢!

2 回答

  • 0

    我在下面包含了示例Python代码,它允许一个自定义的 PyStreamCallback 类,该类实现了从主题上的Matt Burgess' blog article转换流文件内容中的JSON的逻辑,但我建议您考虑使用本机处理器 UpdateAttributeEvaluateJSONPath 来执行相关活动并且只使用自定义代码来执行NiFi无法开箱即用的任务 .

    import json
    import java.io
    from org.apache.commons.io import IOUtils
    from java.nio.charset import StandardCharsets
    from org.apache.nifi.processor.io import StreamCallback
    
    class PyStreamCallback(StreamCallback):
      def __init__(self):
            pass
      def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        obj = json.loads(text)
        newObj = {
              "Range": 5,
              "Rating": obj['rating']['primary']['value'],
              "SecondaryRatings": {}
            }
        for key, value in obj['rating'].iteritems():
          if key != "primary":
            newObj['SecondaryRatings'][key] = {"Id": key, "Range": 5, "Value": value['value']}
    
        outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8'))) 
    
    flowFile = session.get()
    if (flowFile != None):
      flowFile = session.write(flowFile,PyStreamCallback())
      flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
      session.transfer(flowFile, REL_SUCCESS)
    

    Update:

    要在回调中访问流文件的属性,只需将其作为参数传递给构造函数,将其存储为字段,并在 process 方法中引用它 . 下面是一个非常简单的示例,它将属性 my_attr 的值连接到传入的流文件内容并将其写回:

    import json
    import java.io
    from org.apache.commons.io import IOUtils
    from java.nio.charset import StandardCharsets
    from org.apache.nifi.processor.io import StreamCallback
    
    class PyStreamCallback(StreamCallback):
        def __init__(self, flowfile):
            self.ff = flowfile
            pass
        def process(self, inputStream, outputStream):
            text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
            text += self.ff.getAttribute('my_attr')
            outputStream.write(bytearray(text.encode('utf-8')))
    
    flowFile = session.get()
    if (flowFile != None):
        flowFile = session.write(flowFile,PyStreamCallback(flowFile))
        session.transfer(flowFile, REL_SUCCESS)
    

    传入的流文件:

    --------------------------------------------------
    Standard FlowFile Attributes
    Key: 'entryDate'
        Value: 'Tue Mar 13 13:10:48 PDT 2018'
    Key: 'lineageStartDate'
        Value: 'Tue Mar 13 13:10:48 PDT 2018'
    Key: 'fileSize'
        Value: '30'
    FlowFile Attribute Map Content
    Key: 'filename'
        Value: '1690494181462176'
    Key: 'my_attr'
        Value: 'This is an attribute value.'
    Key: 'path'
        Value: './'
    Key: 'uuid'
        Value: 'dc93b715-50a0-43ce-a4db-716bd9ec3205'
    --------------------------------------------------
    This is some flowfile content.
    

    传出流文件:

    --------------------------------------------------
    Standard FlowFile Attributes
    Key: 'entryDate'
        Value: 'Tue Mar 13 13:10:48 PDT 2018'
    Key: 'lineageStartDate'
        Value: 'Tue Mar 13 13:10:48 PDT 2018'
    Key: 'fileSize'
        Value: '57'
    FlowFile Attribute Map Content
    Key: 'filename'
        Value: '1690494181462176'
    Key: 'my_attr'
        Value: 'This is an attribute value.'
    Key: 'path'
        Value: './'
    Key: 'uuid'
        Value: 'dc93b715-50a0-43ce-a4db-716bd9ec3205'
    --------------------------------------------------
    This is some flowfile content.This is an attribute value.
    
  • 0

    你可以试试这样的东西 -

    import json
    import sys
    import traceback
    from java.nio.charset import StandardCharsets
    from org.apache.commons.io import IOUtils
    from org.apache.nifi.processor.io import StreamCallback
    from org.python.core.util import StringUtil
    
    class PyStreamCallback(StreamCallback):
        def __init__(self):
            pass
    
        def process(self, inputStream, outputStream):
            try:
                # Read input FlowFile content
                input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
                input_obj = json.loads(input_text)
                # Transform content
                output_obj = input_obj   #your input content
    
                #perform Data tranformation on output_obj
    
                # Write output content
                output_text = json.dumps(outputJson)
                outputStream.write(StringUtil.toBytes(output_text))
            except:
                traceback.print_exc(file=sys.stdout)
                raise
    
    
    flowFile = session.get()
    if flowFile != None:
        flowFile = session.write(flowFile, TransformCallback())
    
        # Finish by transferring the FlowFile to an output relationship
        session.transfer(flowFile, REL_SUCCESS)
    

相关问题