首页 文章

发布数据损坏时跳过kafka中的接收步骤

提问于
浏览
0

At the java server side 在一些进程之后,我通过restful webservice将日志数据(json格式)从服务器发布到kafka .

At the hdfs side 我的水槽类型是avro . 因此,为了解析json(源)到avro(目标)我使用morphline和avro架构 .

如果发布的数据不适合morphline或avro架构,通常我会得到以下错误,

引起:com.fasterxml.jackson.core.JsonParseException:非法引用的字符((CTRL-CHAR,代码10)):必须使用反斜杠进行转义才能包含在字符串值中

此外,如果我得到这一次,偏移不再移动 . 简单地说,如果kafka只获得此错误一次,它就不能再下沉发布的数据了 .

为了避免这个错误,我想有2个解决方案 . 第一个是在服务器端编写用于大数据端的avro架构的json验证器 . 我首选的第二种方法是跳过并且不会接收未格式化为请求的avro架构的日志数据 . 但是在跳过损坏的数据后,如果kafka获得了合适的数据,它应该将其丢弃 .

我想如果我在flume或kafka配置文件中添加一些参数是可能的 . 那么当发布的数据不适合所请求的模式或请求的morphline时,如何跳过接收器步骤?

1 回答

  • 0

    我在morphline方面解决了这个问题,

    在morphline中添加了try-catch代码块

    morphlines: [
      {
        id: convertJsonToAvro
        importCommands: [ "org.kitesdk.**" ]
        commands: [
           {
             tryRules {
                  catchExceptions : true
               rules : [
                 {
                   commands : [
                     # save initial state
                     { readJson {} }
                    # extract JSON objects into fields
                  { extractJsonPaths {
                    flatten: true
                    paths: {
                PROJECT_NAME: /PROJECT_NAME
                WSDL_NAME: /WSDL_NAME
                ....
                ....
                ....
                MESSAGE_OUT: /MESSAGE_OUT
            }
          } }
          # convert the extracted fields to an avro object
          # described by the schema in this field
          { toAvro {
            schemaFile:/u0x/myPath/myAvroSchema.avsc
          } }
          # serialize the object as avro
          { writeAvroToByteArray: {
            format: containerlessBinary
                  } }
               ]
             }
             {
              commands : [
                { logWarn { format : "Ignoring record with unsupported input format in myLogService: {}", args : ["@{}"] } }
                { dropRecord {} }    
                ]
             }
           ]
         }   
        }    
       ]
      }
    ]
    

    tryRules 中,我强制代码捕获所有异常 .

    rules: 您可以随意编写 "command:" block,如果其中一个抛出除最后一个命令块之外的异常,则最后一个命令将运行 . 请记住,最后一个是"catch" . 我的意思是我的情况,如果第一个命令块失败,最后(第二个)命令将运行 . 如果第一个命令运行完美,则最后一个命令将不起作用,因为最后一个命令块像catch块一样工作 .

    因此,当代码 readJson {} 在第一个命令块中失败时,它会抛出异常,最后一个命令(catch块)会处理它,因此它不会尝试在kafka主题中接收当前数据,因为它将运行 dropRecord {} .

    有关详细文档,您可以访问kitesdk .

相关问题