我有一个处理器从Apache Kafka读取消息并将数据发送到REST endpoints .
服务器只有4个内核和4 GB内存,其中最多2GB分配给java进程
消息以4k /秒的速率生成和消耗 .
运行几分钟后,程序内存不足 .
-
异步调用http rest endpoints 而不是等待响应的最佳方法是什么
-
如何管理httpClient连接?我的印象是我需要启动客户端永远不要关闭它,以便我可以重用连接
-
您是否看到以下代码存在任何问题?
公共类SomeProcesor实现BProcessor {
private ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
private CompletionService<Boolean> pool = new ExecutorCompletionService<Boolean>(exec);
CloseableHttpAsyncClient httpclient = null ;
@Override
public void begin() {
httpclient = HttpAsyncClients.createDefault();
RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(5000).setConnectTimeout(5000).setSocketTimeout(5000).build();
HttpAsyncClients.custom().setDefaultRequestConfig(requestConfig).build();
// Start the client
httpclient.start();
}
@Override
public void process(MessageAndMetadata<?, ?> mMData, List<Map> events) {
List<Map<String, Object>> listMap = new ArrayList<>();
// loop and extract the data from events into the above List
//..
//..
// submit to seperate thread to post to HTTP
pool.submit(new HttpThread(listMap);
}
private class HttpThread implements Callable<Boolean> {
List<Map<String, Object>> listMap = null;
public HttpThread(List<Map<String, Object>> listMap) {
this.listMap = listMap;
}
@Override
public Boolean call() throws Exception {
return postToHttp(listMap);
}
}
private Boolean postToHttp(List<Map<String, Object>> listMap) throws UnsupportedEncodingException {
for (Map<String, Object> map : listMap) {
try {
HttpPost postRequest = new HttpPost("https://myserver:8080/services/collector");
postRequest.addHeader(HttpHeaders.ACCEPT, "application/json");
postRequest.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
postRequest.addHeader(HttpHeaders.CONNECTION, "keep-alive");
StringEntity input = new StringEntity(methodToConvertMapToJSON(map));
input.setContentType("application/json");
postRequest.setEntity(input);
httpclient.execute(postRequest, null);
} catch (Exception e) {
return false;
} catch (Throwable th) {
return false;
}
}
return true;
}
}
1 回答
需要使用http响应或释放连接,否则连接将消耗资源 . 更改
至