首页 文章

Apache HttpClient - Out of Memory问题

提问于
浏览
2

我有一个处理器从Apache Kafka读取消息并将数据发送到REST endpoints .

服务器只有4个内核和4 GB内存,其中最多2GB分配给java进程

消息以4k /秒的速率生成和消耗 .

运行几分钟后,程序内存不足 .

  • 异步调用http rest endpoints 而不是等待响应的最佳方法是什么

  • 如何管理httpClient连接?我的印象是我需要启动客户端永远不要关闭它,以便我可以重用连接

  • 您是否看到以下代码存在任何问题?

公共类SomeProcesor实现BProcessor {

private ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
private CompletionService<Boolean> pool = new ExecutorCompletionService<Boolean>(exec);
CloseableHttpAsyncClient httpclient = null ; 

@Override
public void begin() {
    httpclient = HttpAsyncClients.createDefault();
    RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(5000).setConnectTimeout(5000).setSocketTimeout(5000).build();
    HttpAsyncClients.custom().setDefaultRequestConfig(requestConfig).build();
    // Start the client
    httpclient.start();

}

@Override
public void process(MessageAndMetadata<?, ?> mMData, List<Map> events) {

    List<Map<String, Object>> listMap = new ArrayList<>();  

    // loop and extract the data from events into the above List
    //..
    //..

    // submit to seperate thread to post to HTTP
    pool.submit(new HttpThread(listMap);
}

private class HttpThread implements Callable<Boolean> {
    List<Map<String, Object>> listMap = null;
    public HttpThread(List<Map<String, Object>> listMap) {
        this.listMap = listMap;
    }
    @Override
    public Boolean call() throws Exception {
        return postToHttp(listMap);
    }
}

private Boolean postToHttp(List<Map<String, Object>> listMap) throws UnsupportedEncodingException {
    for (Map<String, Object> map : listMap) {

        try {
            HttpPost postRequest = new HttpPost("https://myserver:8080/services/collector");
            postRequest.addHeader(HttpHeaders.ACCEPT, "application/json");
            postRequest.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
            postRequest.addHeader(HttpHeaders.CONNECTION, "keep-alive");

            StringEntity input = new StringEntity(methodToConvertMapToJSON(map));
            input.setContentType("application/json");
            postRequest.setEntity(input);

            httpclient.execute(postRequest, null);

        } catch (Exception e) {
            return false;
        } catch (Throwable th) {
            return false;
        }
    }
    return true;
}

}

1 回答

  • 2

    需要使用http响应或释放连接,否则连接将消耗资源 . 更改

    httpclient.execute(postRequest, null);
    

    HttpResponse response = httpclient.execute(postRequest, null).get();
    if(response.getStatusLine().getStatusCode() != 200) {
    // do something
    }
    // release the connection, better add to a finally clause
    postRequest.releaseConnection();
    

相关问题