首页 文章

Avro Map-Reduce on oozie

提问于
浏览
1

我一直试图在oozie上运行Avro map-reduce . 我在workflow.xml中指定了mapper和reducer类,并提供了其他配置 . 但它给出了一个

java.lang.RunTime Exception - class mr.sales.avro.etl.SalesMapper not org.apache.hadoop.mapred.Mapper

直接在hadoop集群上运行(而不是通过oozie)时的相同作业将完成并提供所需的输出 . 所以我似乎可能错过了一些oozie配置 . 我从异常中猜测oozie要求映射器是 org.apache.hadoop.mapred.Mapper 的子类,但Avro映射器具有不同的签名 - 它们扩展了org.apache.avro.mapred.AvroMapper,这可能是错误的原因 .

所以我的问题是如何配置oozie工作流/属性文件以允许它运行Avro map-reduce作业 .

3 回答

  • 1

    使用AVRO,您需要配置一些额外的属性:

    • org.apache.avro.mapred.HadoopMapper 是您需要设置的实际映射器类(这实现了Mapper接口)

    • avro.mapper 属性应该命名为 SalesMapper

    组合器和减速器还有其他属性 - 检查AvroJob源和实用程序方法 .

    另一种方法是从您手动提交的作业中检查job.xml,并将相关配置属性复制到您的oozie workflow.xml

  • 0

    本周我一直有同样的问题 . 这是我的workflow.xml(已修改):

    <workflow-app name='sample-wf' xmlns="uri:oozie:workflow:0.2">
    <start to='start_here'/>
    <action name='start_here'>
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/${wf:user()}/output"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/user/${wf:user()}/input</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/user/${wf:user()}/output</value>
                </property>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.avro.mapred.HadoopMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.avro.mapred.HadoopReducer</value>
                </property>
                <property>
                    <name>avro.mapper</name>
                    <value>package.for.my.Mapper</value>
                </property>
                <property>
                    <name>avro.reducer</name>
                    <value>package.for.my.Reducer</value>
                </property>
                <property>
                    <name>mapred.input.format.class</name>
                    <value>org.apache.avro.mapred.AvroUtf8InputFormat</value>
                </property>
                <property>
                    <name>mapred.output.format.class</name>
                    <value>org.apache.avro.mapred.AvroOutputFormat</value>
                </property>
                <property>
                    <name>mapred.output.key.class</name>
                    <value>org.apache.avro.mapred.AvroWrapper</value>
                </property>
                <property>
                    <name>mapred.mapoutput.key.class</name>
                    <value>org.apache.avro.mapred.AvroKey</value>
                </property>
                <property>
                    <name>mapred.mapoutput.value.class</name>
                    <value>org.apache.avro.mapred.AvroValue</value>
                </property>
                <property>
                    <name>avro.map.output.schema</name>
                    <value>{put your schema here from job.xml via manual run}</value>
                </property>
                <property>
                    <name>avro.input.schema</name>
                    <value>"string"</value>
                </property>
                <property>
                    <name>avro.output.schema</name>
                    <value>{put your schema here from job.xml via manual run}</value>
                </property>
                <property>
                    <name>mapred.output.key.comparator.class</name>
                    <value>org.apache.avro.mapred.AvroKeyComparator</value>
                </property>
                <property>
                    <name>io.serializations</name>
                    <value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.avro.mapred.AvroSerialization</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to='end'/>
        <error to='fail'/>
    </action>
    <kill name='fail'>
        <message>MapReduce failed, error message[$sf:errorMessage(sf:lastErrorNode())}]</message>
    </kill>
    <end name='end'/>
    

    根据map-reduce作业的输入和输出,您可能需要稍微修改一下 .

  • 1

    你也可以发布mapper和reducer类吗?我的oozie工作流程工作正常,但o / p文件不是.avro文件 . 这是我的工作流程:

    <workflow-app name='sample-wf' xmlns="uri:oozie:workflow:0.2">
    <start to='start_here'/>
    <action name='start_here'>
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/hadoop/${workFlowRoot}/final-output-data"/>
            </prepare>
            <configuration>
    
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                      <name>mapred.reducer.new-api</name>
                      <value>true</value>
                    </property>
                    <property>
                      <name>mapred.mapper.new-api</name>
                      <value>true</value>
                    </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/user/hadoop/${workFlowRoot}/input-data</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/user/hadoop/${workFlowRoot}/final-output-data</value>
                </property>
    
    
                <property>
                    <name>mapreduce.mapper.class</name>
                    <value>org.apache.avro.mapred.HadoopMapper</value>
                </property>
                <property>
                    <name>mapreduce.reducer.class</name>
                    <value>org.apache.avro.mapred.HadoopReducer</value>
                </property>
                <property>
                    <name>avro.mapper</name>
                    <value>com.flipkart.flap.data.batch.mapred.TestAvro$CFDetectionMapper</value>
                </property>
                <property>
                    <name>avro.reducer</name>
                    <value>com.flipkart.flap.data.batch.mapred.TestAvro$CFDetectionReducer</value>
                </property>
                <property>
                    <name>mapreduce.input.format.class</name>
                    <value>org.apache.avro.mapreduce.AvroKeyInputFormat</value>
                </property>
                <property>
                    <name>avro.schema.input.key</name>
                    <value>{... schema ...}</value>
                </property>
               
                <property>
                    <name>mapreduce.mapoutput.key.class</name>
                    <value>org.apache.hadoop.io.AvroKey</value>
                </property>
                <property>
                    <name>avro.map.output.schema.key</name>
                    <value>{... schema ...}</value>
                </property>
    
                
                <property>
                    <name>mapreduce.mapoutput.value.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                 <property>
                    <name>mapreduce.output.format.class</name>
                    <value>org.apache.avro.mapred.AvroKeyValueOutputFormat</value>
                </property>
                <property>
                    <name>mapreduce.output.key.class</name>
                    <value>org.apache.avro.mapred.AvroKey</value>
                </property>
    
                <property>
                    <name>mapreduce.output.value.class</name>
                    <value>org.apache.avro.mapred.AvroValue</value>
                </property>
               
                
                <property>
                    <name>avro.schema.output.key</name>
                    <value>{ ....   schema .... }</value>
                </property>
                 <property>
                    <name>avro.schema.output.value</name>
                    <value>"string"</value>
                </property>
                <property>
                    <name>mapreduce.output.key.comparator.class</name>
                    <value>org.apache.avro.mapred.AvroKeyComparator</value>
                </property>
                <property>
                    <name>io.serializations</name>
                    <value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.avro.mapred.AvroSerialization
                    </value>
                </property>
            </configuration>
        </map-reduce>
        <ok to='end'/>
        <error to='fail'/>
    </action>
    <kill name='fail'>
        <message>MapReduce failed, error message[$sf:errorMessage(sf:lastErrorNode())}]</message>
    </kill>
    <end name='end'/>
    </workflow-app>
    

    我的mapper和reducer定义如下:

    public static class CFDetectionMapper extends
                    Mapper<AvroKey<AdClickFraudSignalsEntity>, NullWritable, AvroKey<AdClickFraudSignalsEntity>, Text> {}
    
     public static class CFDetectionReducer extends
                   Reducer<AvroKey<AdClickFraudSignalsEntity>, Text, AvroKey<AdClickFraudSignalsEntity>, AvroValue<CharSequence>>
    

相关问题