首页 文章

NiFi SpringContextProcessor中断流程

提问于
浏览
1

我在我的NiFi流程中添加了一个SpringContextProcessor,它按预期执行并更新FlowFile内容和属性 . 但是在NiFi的数据来源部分而不是看到SEND / RECEIVE,我看到了

03/27/2017 11:47:57.164 MDT RECEIVE 42fa1c3f-edde-4cb7-8e73-ce752f7e3d66
03/27/2017 11:47:57.163 MDT DROP    667094a7-8eef-4657-981a-dc9fdc6c4056
03/27/2017 11:47:57.163 MDT SEND    667094a7-8eef-4657-981a-dc9fdc6c4056

看起来原始邮件被删除并被新邮件替换 . 我没有在其他组件中看到这种行为,即它们似乎都保留了原始的Flow File UUID . Spring处理器代码的简化版本:

@ServiceActivator(inputChannel = "fromNiFi", outputChannel = "toNiFi")
public Message<byte[]> process1(Message<byte[]> inMessage) {
    String inMessagePayload = new String(inMessage.getPayload());
    String userId = getUserIdFromDb(inMessagePayload);
    String outMessagePayload = inMessagePayload + userId;

    return MessageBuilder.withPayload(outMessagePayload.getBytes())
            .copyHeaders(inMessage.getHeaders())
            .setHeader("userId", userId)
            .build();
}

有没有办法在传出消息中保留原始流文件UUID?

1 回答

  • 1

    这可能是我们的结果,所以是的,请提出一个JIRA . 但是,作为一种解决方法,您可以尝试从传入的Message标头中提取FlowFile属性,然后将它们传播回传出消息 .

    public Message<byte[]> process1(Message<byte[]> inMessage) {
        String myHeader = inMessage.getHeader("someHeader");
        . . .
        return MessageBuilder.withPayload(outMessagePayload.getBytes())
                .copyHeaders(inMessage.getHeaders())
                .setHeader("userId", userId)
                .setHeader("someHeader", myHeader)
                .build();
    }
    

相关问题