首页 文章

使用Apache 'HttpClient'在PUT操作期间更快地检测到中断的连接

提问于
浏览
3

我正在使用Apache HttpClient 4与REST API进行通信,并且大部分时间我都在进行冗长的PUT操作 . 由于这些可能发生在不稳定的Internet连接上,我需要检测连接是否中断并且可能需要重试(使用恢复请求) .

为了在现实世界中尝试我的例程,我开始了PUT操作,然后我翻转了笔记本电脑的Wi-Fi开关,导致任何数据流立即完全中断 . 然而,它需要一个很长的时间(可能是5分钟左右),直到最终抛出SocketException .

How can I speed up to process? I'd like to set a timeout of maybe something around 30 seconds.

更新:

为了澄清,我的请求是PUT操作 . 因此,在很长一段时间(可能是几小时)内,唯一的操作是write()操作,并且没有读操作 . 有一个timeout setting for read() operations,但我找不到一个用于写操作 .

我正在使用自己的实体实现,因此我直接写入一个OutputStream,一旦Internet连接中断,它几乎会立即阻塞 . 如果OutputStreams有一个超时参数,那么我可以写 out.write(nextChunk, 30000); 我自己可以检测到这样的问题 . 其实我试过了:

public class TimeoutHttpEntity extends HttpEntityWrapper {

  public TimeoutHttpEntity(HttpEntity wrappedEntity) {
    super(wrappedEntity);
  }

  @Override
  public void writeTo(OutputStream outstream) throws IOException {
    try(TimeoutOutputStreamWrapper wrapper = new TimeoutOutputStreamWrapper(outstream, 30000)) {
      super.writeTo(wrapper);
    }
  }
}


public class TimeoutOutputStreamWrapper extends OutputStream {
  private final OutputStream delegate;
  private final long timeout;
  private final ExecutorService executorService = Executors.newSingleThreadExecutor();

  public TimeoutOutputStreamWrapper(OutputStream delegate, long timeout) {
    this.delegate = delegate;
    this.timeout = timeout;
  }

  @Override
  public void write(int b) throws IOException {
    executeWithTimeout(() -> {
      delegate.write(b);
      return null;
    });
  }

  @Override
  public void write(byte[] b) throws IOException {
    executeWithTimeout(() -> {
      delegate.write(b);
      return null;
    });
  }

  @Override
  public void write(byte[] b, int off, int len) throws IOException {
    executeWithTimeout(() -> {
      delegate.write(b, off, len);
      return null;
    });
  }

  @Override
  public void close() throws IOException {
    try {
      executeWithTimeout(() -> {
        delegate.close();
        return null;
      });
    } finally {
      executorService.shutdown();
    }
  }

  private void executeWithTimeout(final Callable<?> task) throws IOException {
    try {
      executorService.submit(task).get(timeout, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
      throw new IOException(e);
    } catch (ExecutionException e) {
      final Throwable cause = e.getCause();
      if (cause instanceof IOException) {
        throw (IOException)cause;
      }
      throw new Error(cause);
    } catch (InterruptedException e) {
      throw new Error(e);
    }
  }
}

public class TimeoutOutputStreamWrapperTest {
  private static final byte[] DEMO_ARRAY = new byte[]{1,2,3};
  private TimeoutOutputStreamWrapper streamWrapper;
  private OutputStream delegateOutput;

  public void setUp(long timeout) {
    delegateOutput = mock(OutputStream.class);
    streamWrapper = new TimeoutOutputStreamWrapper(delegateOutput, timeout);
  }

  @AfterMethod
  public void teardown() throws Exception {
    streamWrapper.close();
  }

  @Test
  public void write_writesByte() throws Exception {
    // Setup
    setUp(Long.MAX_VALUE);

    // Execution
    streamWrapper.write(DEMO_ARRAY);

    // Evaluation
    verify(delegateOutput).write(DEMO_ARRAY);
  }

  @Test(expectedExceptions = DemoIOException.class)
  public void write_passesThruException() throws Exception {
    // Setup
    setUp(Long.MAX_VALUE);
    doThrow(DemoIOException.class).when(delegateOutput).write(DEMO_ARRAY);

    // Execution
    streamWrapper.write(DEMO_ARRAY);

    // Evaluation performed by expected exception
  }

  @Test(expectedExceptions = IOException.class)
  public void write_throwsIOException_onTimeout() throws Exception {
    // Setup
    final CountDownLatch executionDone = new CountDownLatch(1);
    setUp(100);
    doAnswer(new Answer<Void>() {
      @Override
      public Void answer(InvocationOnMock invocation) throws Throwable {
        executionDone.await();
        return null;
      }
    }).when(delegateOutput).write(DEMO_ARRAY);

    // Execution
    try {
      streamWrapper.write(DEMO_ARRAY);
    } finally {
      executionDone.countDown();
    }

    // Evaluation performed by expected exception
  }

  public static class DemoIOException extends IOException {

  }
}

这有点复杂,但在单元测试中效果很好 . 它也可以在现实生活中使用,除了 HttpRequestExecutor 捕获第127行中的异常并尝试关闭连接 . 但是,当尝试关闭连接时,它首先尝试刷新再次阻塞的连接 .

我可能能够深入挖掘HttpClient并弄清楚如何防止这种刷新操作,但它已经是一个不太漂亮的解决方案,而且它将变得更糟 .

UPDATE

看起来这不能在Java级别上完成 . 我可以在另一个级别上做吗? (我正在使用Linux) .

3 回答

  • 0

    Java阻塞I / O不支持写操作的套接字超时 . 您完全受操作系统/ JRE的支配,可以解除阻塞写操作阻塞的线程 . 此外,此行为往往是特定于OS / JRE .

    这可能是考虑使用基于非阻塞I / O(NIO)的HTTP客户端(例如Apache HttpAsyncClient)的合法案例 .

  • 0

    您可以使用RequestConfig配置套接字超时:

    RequestConfig myRequestConfig = RequestConfig.custom()
        .setSocketTimeout(5000)  // 5 seconds
        .build();
    

    当您进行呼叫时,只需分配新配置即可 . 例如,

    HttpPut httpPut = new HttpPut("...");
    httpPut.setConfig(requestConfig);
    ...
    HttpClientContext context = HttpClientContext.create();
    ....
    httpclient.execute(httpPut, context);
    

    有关重新启动超时配置的更多信息,here有一个很好的解释 .

  • 5

    她是我遇到的链接之一 connection eviction policyhere

    public static class IdleConnectionMonitorThread extends Thread {
    
    private final HttpClientConnectionManager connMgr;
    private volatile boolean shutdown;
    
    public IdleConnectionMonitorThread(HttpClientConnectionManager connMgr) {
        super();
        this.connMgr = connMgr;
    }
    
    @Override
    public void run() {
        try {
            while (!shutdown) {
                synchronized (this) {
                    wait(5000);
                    // Close expired connections
                    connMgr.closeExpiredConnections();
                    // Optionally, close connections
                    // that have been idle longer than 30 sec
                    connMgr.closeIdleConnections(30, TimeUnit.SECONDS);
                }
            }
        } catch (InterruptedException ex) {
            // terminate
        }
    }
    
    public void shutdown() {
        shutdown = true;
        synchronized (this) {
            notifyAll();
        }
    }}
    

    我想你可能想看看这个 .

相关问题