首页 文章

Spring WebFlux和WebClient进行了许多API调用

提问于
浏览
1

最近我问了问题Spring WebFlux create pool of no-blocking threads

我得到了答案,阅读提供的链接,但仍然不明白正确的方式做这样的东西 .

我使用 Spring WebFlux (WebClient) 来编写我的REST服务 . 对于每个传入的请求,我向另一个REST服务发出了数百个请求,因此为了尽可能快地使它们,我想使用无阻塞线程 .

让我收到客户的请求,我必须进行600个API调用:

List<String> urls = Arrays.asList("www.example-rest.com/url1", "www.example-rest.com/url2", ..., "www.example-rest.com/url600");

  • 我想以并行方式制作它们并使用无阻塞线程(如Python中的eventlet)

  • 我想用这样的线程创建单独的共享工作池,以便不为每个传入请求创建一个 .

这是关于 schedulers http://projectreactor.io/docs/core/release/reference/#schedulers的文档

我找到了有关 elastic thread pool 的信息:

弹性线程池(Schedulers.elastic()) . 它根据需要创建新的工作池,并重用空闲的工作池 . 例如,这是I / O阻塞工作的不错选择 . Schedulers.elastic()是一种为阻塞进程提供自己的线程的便捷方式,因此它不会占用其他资源 .

但我不能为每个请求创建新的工作池,并且该池中的线程仍然以阻塞方式工作 .

如果有人用 Spring WebClient 做了类似的任务,请提供一个例子,解释什么是正确的方法 .

1 回答

  • 0

    我做了类似的事情......我的目标是创建一个带有这样签名的方法:

    Flux<BasicIssue> getIssues(WebClient webClient);
    

    因为我打电话的网站只提供了分页界面,所以我需要将多个REST调用的结果提供给一个Flux . 以下是我的实施 . 请注意我使用CachedThreadPool .

    Flux<BasicIssue> getIssues(WebClient webClient) {
        return Flux.generate(
            () -> new IssuesState(webClient),
            (state, sink) -> {
                BasicIssue ret = state.getNext();
                if (ret == null) {
                    sink.complete();
                } else {
                    sink.next(ret);
                }
                return state;
            }
    }
    
    
    class IssuesState {
    
        private final AtomicBoolean isDone = new AtomicBoolean(false);
        private final AtomicInteger threadCount = new AtomicInteger(1);
        private final Executor executor = Executors.newCachedThreadPool();
        private final LinkedBlockingQueue<BasicIssue> issueQueue = new LinkedBlockingQueue();
    
        public IssuesState(WebClient webClient) {
            executor.execute(() -> getNextBlock(webClient, 0));
        }
    
        private void getNextBlock(final WebClient webClient, final int startAt) {
            webClient
                .get()
                .uri(...)
                .header("Authorization", "Basic " + Base64Utils.encodeToString(("username:password".getBytes(UTF_8))))
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(PageableIssue.class)
                .subscribe(pageableIssue -> {
                    int maxResults = pageableIssue.getMaxResults();
                    int total = pageableIssue.getTotal();
                    if (startAt == 0) {
                        for (int i = startAt + maxResults; i < total; i += maxResults) {
                            threadCount.incrementAndGet();
                            final int x = i;
                            executor.execute(() -> getNextBlock(webClient, x));
                        }
                    }
                    synchronized (issueQueue) {
                        for (BasicIssue issue : pageableIssue.getIssues()) {
                            issueQueue.add(issue);
                        }
    
                        if (threadCount.decrementAndGet() == 0) {
                            isDone.set(true);
                        }
                    }
                });
        }
    
        public BasicIssue getNext() {
            synchronized (issueQueue) {
                if (isDone.get() && issueQueue.isEmpty()) {
                    return null;
                }
            }
            try {
                return issueQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
    

    使用上面的方法..

    getIssues(webClient)
        .subscribe(basicIssue -> System.out.println(basicIssue.getName());
    

相关问题