首页 文章

flink sink只支持bio吗?

提问于
浏览
2

沉没的 invoke 方法似乎无法制作异步io?例如返回 Future

例如,redis连接器使用jedis lib同步执行redis命令:

https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java

然后它将阻止flink的任务线程等待来自redis服务器的网络响应命令?!其他运营商是否可以在同一个线程中运行接收器?如果是这样,那么它也会阻止它们?

我知道flink有asyncio api,但似乎没有被sink impl使用?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html

1 回答

  • 1

    正如@Dexter所提到的,你可以使用 RichAsyncFunction . 这是一个示例代码(可能需要进一步更新才能使其正常工作;)

    AsyncDataStream.orderedWait(ds, new RichAsyncFunction<Tuple2<String,MyEvent>, String>() {
            transient private RedisClient client;
            transient private RedisAsyncCommands<String, String> commands;
            transient private ExecutorService executor;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
    
                client = RedisClient.create("redis://localhost");
                commands = client.connect().async();
                executor = Executors.newFixedThreadPool(10);
            }
    
            @Override
            public void close() throws Exception {
                // shut down the connection and thread pool.
                client.shutdown();
                executor.shutdown();
    
                super.close();
            }
    
            public void asyncInvoke(Tuple2<String, MyEvent> input, final AsyncCollector<String> collector) throws Exception {
                // eg.g get something from redis in async
                final RedisFuture<String> future = commands.get("key");
                future.thenAccept(new Consumer<String>() {
                    @Override
                    public void accept(String value) {
                         collector.collect(Collections.singletonList(future.get()));
                    }
                });
            }
        }, 1000, TimeUnit.MILLISECONDS);
    

相关问题