我正在尝试将数据从S3复制到HDFS,观察了几个问题并且几乎没有问题 .
问题
- Processor ConvertJSONToAvro - 如果流文件不是有效的JSON,则处理器陷入无限循环,并出现以下错误 .
ConvertJSONToAvro[id=c09f4c27-0160-1000-6c29-1a31afc5a8d4] ConvertJSONToAvro[id=c09f4c27-0160-1000-6c29-1a31afc5a8d4] failed to process session due to java.lang.RuntimeException: Unexpected character ('"' (code 34)): was expecting comma to separate OBJECT entries
at [Source: org.apache.nifi.controller.repository.io.FlowFileAccessInputStream@2ad7d50d; line: 8, column: 14]: Unexpected character ('"' (code 34)): was expecting comma to separate OBJECT entries
at [Source: org.apache.nifi.controller.repository.io.FlowFileAccessInputStream@2ad7d50d; line: 8, column: 14]
16:45:35 UTC
WARNING
c09f4c27-0160-1000-6c29-1a31afc5a8d4
ConvertJSONToAvro[id=c09f4c27-0160-1000-6c29-1a31afc5a8d4] Processor Administratively Yielded for 1 sec due to processing failure
- Processor FetchS3Object - 无论设置为“Object key”的值如何,它总是选择$ 的值 . 例如,如果“Object key”设置为“$ ”,它将忽略值集并仅选择$ .
问题
-
是否可以从以前的处理器中引用流文件?我的用例是FetchS3Object(file1) - > EvaluateJsonPath - > FetchS3Object(file2) - > PutHDFS - > FetchS3Object(file1) - > PutHDFS . 在这种情况下,不是多次加载file1,是否可以通过流存储和引用它 .
-
在上面,文件file1和file2是一个单元 . 是否有任何选项可以复制这两个文件或两者都失败
-
ListS3处理器根据时间戳加载文件 . 如果文件已加载并且在任何其他步骤中失败,则需要再次加载该文件以进行重新处理 . 一个选项是更新文件的时间戳,因此在下次轮询期间可以使用ListS3 . 我们如何在S3中更新文件的时间戳?还是有任何其他选项来处理这样的用例 .
1 回答
问题
这是一篇描述构建定制NiFi处理器的过程的文章:https://community.hortonworks.com/articles/4318/build-custom-nifi-processor.html
如果您想使用现有的处理器/代码并根据自己的目的进行修改,那么可以在此处找到许多标准处理器:https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard
问题
使用Nifi UpdateAtttribute将属性添加到流文件中 . 请参阅https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.attributes.UpdateAttribute/index.html
上面的UpdateAttribute处理器应该可以解决您的问题 .
向LoadS3添加失败关系,然后将其传递给ExecuteScript处理器,对其进行相应转换并将其反馈给LoadS3 Processor .