首页 文章

Apache Ignite:与数据流相关的序列化错误

提问于
浏览
0

我正在尝试研究Apache Ignite流的工作原理 . 我有2个节点集群设置(都在localhost上),我启动了一个客户端节点,它使用StreamTransformer和EntryProcessor运行流代码 . 因此,我得到的一个节点无法反序列化异常 . 我的代码是来自Ignite文档的简化WordCount示例:

public class StreamingExample {`
public static class StreamingExampleCacheEntryProcessor implements CacheEntryProcessor<String, Long, Object> {
    @Override
    public Object process(MutableEntry<String, Long> e, Object... arg) throws EntryProcessorException {
        Long val = e.getValue();
        e.setValue(val == null ? 1L : val + 1);
        return null;
    }
}

public static void main(String[] args) throws IgniteException, IOException {
    Ignition.setClientMode(true);
    try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
        IgniteCache<String, Long> stmCache = ignite.getOrCreateCache("mycache");
        try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
            stmr.allowOverwrite(true);
            stmr.receiver(StreamTransformer.from(new StreamingExampleCacheEntryProcessor()));
            stmr.addData("word", 1L);
            System.out.println("Finished");
        }
    }
}

}

例外我得到两个节点中的一个是

[23:38:23]拓扑快照[ver = 5,servers = 2,clients = 1,CPUs = 4,heap = 3.3GB]线程中的异常“pub-#9%null%”类org.apache.ignite .binary.BinaryObjectException:无法在org.apache.ignite.internal.binary.binial.binial.binial.binal.BialReaderExImpl.deserialize(BinaryReaderExImpl . )的org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1595)中使用优化的marshaller解组对象 . java or :. or位于org.apache.ignite的org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access $ 000(DataStreamProcessor.java:50)的.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:278) . org.apache.ignite.internal.managers.communicatio的internal.processors.datastreamer.DataStreamProcessor $ 1.onMessage(DataStreamProcessor.java:80) n.GridIoManager.invokeListener(GridIoManager.java:1238)位于org.apache.ignite.internal.managers.communication.GridIoManager的org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:866) . 在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)的org.apache.ignite.internal.managers.communication.GridIoManager $ 5.run(GridIoManager.java:829)访问$ 1700(GridIoManager.java:106) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)at java.lang.Thread.run(Thread.java:745)引起:class org.apache.ignite.IgniteCheckedException:找不到类失败使用给定的类加载器进行解组(确保所有类的相同版本都可以在所有节点上使用或启用对等类加载):sun.misc.Launcher$AppClassLoader@4e857327 at org.apache.ignite.marshaller.optimized.OptimizedMarshaller .unmarshal(OptimizedMarshaller.java:224)在org.apache.ignite.internal.binary.BinaryUtils.doReadOpt imized(BinaryUtils.java:1592)... 13更多引起:java.lang.ClassNotFoundException:gridgaingames.StreamingExample $ java.net.URLClassLoader $ StrerExampleCacheEntryProcessor $ java.URLClassLoader $ 1run(URLClassLoader.java:366)$ 1 .run(URLClassLoader.java:355)java.security.AccessController.doPrivileged(Native Method),java.net.URLClassLoader.findClass(URLClassLoader.java:354),位于java.lang.ClassLoader.loadClass(ClassLoader.java:425) )at.misc.Launcher $ AppClassLoader.loadClass(Launcher.java:308)at java.lang.ClassLoader.loadClass(ClassLoader.java:358)at java.lang的java.lang.Class.forName0(Native Method) . org.apache.ignite.inite.MarshallerContextAdapter.getClass(MarshallerContextAdapter.java:185)中org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:8350)的Class.forName(Class.java:274) at org.apache.ignite.marshaller.optimized.OtimtimizedMarshallerUtils.classDescriptor(OptimizedMarshallerUtils.java:266)org.apache.ignite.marshaller.optimized.Opti位于org.apache的org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:491)的java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)中的mizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:318)位于org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream的org.apache.ignite.marshaller.optimized.OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:841)中的.ignite.marshaller.optimized.OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.java:579)位于org.apache.ignite.marshaller.optimized.OptimizedMarshaller.unmarshal(OptimizedMarshaller.java:218)的java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)中的.readObjectOverride(OptimizedObjectInputStream.java:324)... 14更多

有几件事是我无法得到的 .

1)我该如何解决?

2)由于这不是“广播”或其他东西,我认为Ignite只在呼叫节点上运行流代码 . 看起来我错了 . 所以我的流媒体代码在哪里执行?

3)打印“完成”行后,我的代码不会停止 . 为什么?看起来一些非守护程序线程仍然存在 . 这是一个阻止我的客户端节点退出的流代码吗?

PS

启用了对等类加载 . 如果我在广播中运行一些在许多节点上执行代码的例子 - 它可以正常工作 .

1 回答

  • 3

    基本上 IgniteDataStreamer 在发送方(在您的示例中为客户端)准备数据批处理,并立即将它们发送到应存储特定键值元组的目标节点 . 记住这些问题的答案如下:

    • 在将条目放入缓存之前,在目标节点(服务器节点)上执行变换器 . 这意味着服务器节点必须在其类路径中具有转换器的类,或者您必须启用对等类加载 . 就个人而言,后者更灵活,更可取的解决方案 .

    • 如上所述,发件人只需准备发送到部署了缓存的所有服务器的批处理 . 服务器仅接收包含服务器为主服务器或备用服务器的元组的批处理 .

    • 批次的刷新发生在后台,因为 IgniteDataStreamer 用于快速数据预加载或复杂流处理(CEP) . 有许多参数可以让您调整刷新 - autoFlustFrequencyperNodeBufferSize .

    最后,对于预加载需求(当缓存为空并且您需要填充它们时),我建议将 allowOverwrite 设置为 false ,这将允许流式传输器分别为主节点和备份节点准备和发送批次 . 如果此参数设置为 true ,则批次仅在主节点上发送,主节点在更新其数据版本和相应备份后,将使用基本 cache.put 操作注入数据 . 如果您只需要预先加载缓存,这种方法会更慢 .

相关问题