At the java server side 在一些进程之后,我通过restful webservice将日志数据(json格式)从服务器发布到kafka .
At the hdfs side 我的水槽类型是avro . 因此,为了解析json(源)到avro(目标)我使用morphline和avro架构 .
如果发布的数据不适合morphline或avro架构,通常我会得到以下错误,
引起:com.fasterxml.jackson.core.JsonParseException:非法引用的字符((CTRL-CHAR,代码10)):必须使用反斜杠进行转义才能包含在字符串值中
此外,如果我得到这一次,偏移不再移动 . 简单地说,如果kafka只获得此错误一次,它就不能再下沉发布的数据了 .
为了避免这个错误,我想有2个解决方案 . 第一个是在服务器端编写用于大数据端的avro架构的json验证器 . 我首选的第二种方法是跳过并且不会接收未格式化为请求的avro架构的日志数据 . 但是在跳过损坏的数据后,如果kafka获得了合适的数据,它应该将其丢弃 .
我想如果我在flume或kafka配置文件中添加一些参数是可能的 . 那么当发布的数据不适合所请求的模式或请求的morphline时,如何跳过接收器步骤?
1 回答
我在morphline方面解决了这个问题,
在morphline中添加了try-catch代码块
在
tryRules
中,我强制代码捕获所有异常 .在
rules:
您可以随意编写"command:"
block,如果其中一个抛出除最后一个命令块之外的异常,则最后一个命令将运行 . 请记住,最后一个是"catch" . 我的意思是我的情况,如果第一个命令块失败,最后(第二个)命令将运行 . 如果第一个命令运行完美,则最后一个命令将不起作用,因为最后一个命令块像catch块一样工作 .因此,当代码
readJson {}
在第一个命令块中失败时,它会抛出异常,最后一个命令(catch块)会处理它,因此它不会尝试在kafka主题中接收当前数据,因为它将运行dropRecord {}
.有关详细文档,您可以访问kitesdk .