首页 文章

来自 kafka 主题的 Java bean 的 Storm Kafkaspout KryoSerialization 问题

提问于
浏览
1

嗨,我是 Storm 和 Kafka 的新手。我正在使用 storm 1.0.1 和 kafka 0.10.0 我们有一个 kafkaspout,可以从 kafka 主题接收 java bean。我花了几个小时来寻找合适的方法。找到一些有用的文章,但迄今为止没有一种方法对我有用。

以下是我的代码:

StormTopology:

public class StormTopology {

public static void main(String[] args) throws Exception {
    //Topo test /zkroot test
    if (args.length == 4) {
        System.out.println("started");
        BrokerHosts hosts = new ZkHosts("localhost:2181");

        SpoutConfig kafkaConf1 = new SpoutConfig(hosts, args[1], args[2],
                args[3]);

        kafkaConf1.zkRoot = args[2];
        kafkaConf1.useStartOffsetTimeIfOffsetOutOfRange = true;
        kafkaConf1.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
        kafkaConf1.scheme = new SchemeAsMultiScheme(new KryoScheme());
        KafkaSpout kafkaSpout1 = new KafkaSpout(kafkaConf1);

        System.out.println("started");

        ShuffleBolt shuffleBolt = new ShuffleBolt(args[1]);
        AnalysisBolt analysisBolt = new AnalysisBolt(args[1]);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("kafkaspout", kafkaSpout1, 1);
        //builder.setBolt("counterbolt2", countbolt2, 3).shuffleGrouping("kafkaspout");
        //This is for field grouping in bolt we need two bolt for field grouping or it wont work
        topologyBuilder.setBolt("shuffleBolt", shuffleBolt, 3).shuffleGrouping("kafkaspout");
        topologyBuilder.setBolt("analysisBolt", analysisBolt, 5).fieldsGrouping("shuffleBolt", new Fields("trip"));
        Config config = new Config();
        config.registerSerialization(VehicleTrip.class, VehicleTripKyroSerializer.class);
        config.setDebug(true);
        config.setNumWorkers(1);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(args[0], config, topologyBuilder.createTopology());

        // StormSubmitter.submitTopology(args[0], config,
        // builder.createTopology());

    } else {
        System.out
                .println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID");
    }
}

}

我正在使用 kryo 在 kafka 上序列化数据

KafkaProducer:

public class StreamKafkaProducer {

private static Producer producer;
private final Properties props = new Properties();
private static final StreamKafkaProducer KAFKA_PRODUCER = new StreamKafkaProducer();

private StreamKafkaProducer(){
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "com.abc.serializer.MySerializer");
    producer = new org.apache.kafka.clients.producer.KafkaProducer(props);
}

public static StreamKafkaProducer getStreamKafkaProducer(){
    return KAFKA_PRODUCER;
}

public void produce(String topic, VehicleTrip vehicleTrip){
    ProducerRecord<String,VehicleTrip> producerRecord = new ProducerRecord<>(topic,vehicleTrip);
    producer.send(producerRecord);
    //producer.close();
}

public static void closeProducer(){
    producer.close();
}

}

Kyro Serializer:

public class DataKyroSerializer extends Serializer<Data> implements Serializable {
@Override
public void write(Kryo kryo, Output output, VehicleTrip vehicleTrip) {
    output.writeLong(data.getStartedOn().getTime());
    output.writeLong(data.getEndedOn().getTime());
}

@Override
public Data read(Kryo kryo, Input input, Class<VehicleTrip> aClass) {
    Data data = new Data();
    data.setStartedOn(new Date(input.readLong()));
    data.setEndedOn(new Date(input.readLong()));
    return data;
}

我需要将数据返回给 Data bean。

根据一些文章,我需要提供一个自定义方案,并使其成为拓扑的一部分,但到目前为止,我没有运气

螺栓和方案代码

方案:

public class KryoScheme implements Scheme {

    private ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
        protected Kryo initialValue() {
            Kryo kryo = new Kryo();
            kryo.addDefaultSerializer(Data.class, new DataKyroSerializer());
            return kryo;
        };
    };

    @Override
    public List<Object> deserialize(ByteBuffer ser) {
        return Utils.tuple(kryos.get().readObject(new ByteBufferInput(ser.array()), Data.class));
    }

    @Override
    public Fields getOutputFields( ) {
        return new Fields( "data" );
    }

}

和螺栓:

public class AnalysisBolt implements IBasicBolt {
/**
 *
 */
private static final long serialVersionUID = 1L;
private String topicname = null;

public AnalysisBolt(String topicname) {
    this.topicname = topicname;
}

public void prepare(Map stormConf, TopologyContext topologyContext) {
    System.out.println("prepare");
}

public void execute(Tuple input, BasicOutputCollector collector) {
    System.out.println("execute");

    Fields fields = input.getFields();
    try {   

        JSONObject eventJson = (JSONObject) JSONSerializer.toJSON((String) input
                .getValueByField(fields.get(1)));
        String StartTime = (String) eventJson.get("startedOn");
        String EndTime = (String) eventJson.get("endedOn");
        String Oid = (String) eventJson.get("_id");
        int V_id =  (Integer) eventJson.get("vehicleId");
        //call method getEventForVehicleWithinTime(Long vehicleId, Date startTime, Date endTime)

        System.out.println("==========="+Oid+"| "+V_id+"| "+StartTime+"| "+EndTime);

} catch (Exception e) {
    e.printStackTrace();

}

}

但如果我提交风暴拓扑,我会收到错误:

java.lang.IllegalStateException: Spout 'kafkaspout' contains a
non-serializable field of type com.abc.topology.KryoScheme$1, which
was instantiated prior to topology creation.
com.minda.iconnect.topology.KryoScheme$1 should be instantiated within
the prepare method of 'kafkaspout at the earliest.

感谢帮助调试问题并指导正确的道路。

谢谢

2 回答

  • 1

    您的 ThreadLocal 不可序列化。最好的解决方案是使序列化程序既可序列化又可线程化。如果这是不可能的,那么我会看到 2 个替代品,因为没有准备方法,因为你会得到一个螺栓。

    • 将其声明为静态,这本质上是瞬态的。

    • 将其声明为瞬态并通过私有 get 方法访问它。然后,您可以在第一次访问时初始化变量。

  • 0

    在 Storm 生命周期内,拓扑被实例化,然后在执行拓扑之前序列化为字节格式以存储在 ZooKeeper 中。在此步骤中,如果拓扑中的 spout 或 bolt 具有已初始化的不可序列化属性,则序列化将失败。

    如果需要一个不可序列化的字段,请在 bolt 或 spout 的 prepare 方法中初始化它,该方法在拓扑传递给 worker 之后运行。

    来源:实施 Apache Storm 的最佳实践

相关问题