首页 文章

正确使用Akka http客户端连接池

提问于
浏览
14

我需要使用Akka的HTTP客户端(v2.0.2)来使用REST服务 . 逻辑方法是通过主机连接池执行此操作,因为我们期望大量的同时连接 . Flow 为此消耗 (HttpRequest, T) 并返回 (Try[HttpRequest, T) . documentation表示需要一些任意类型 T 来管理对请求的潜在无序响应,但是没有指出调用者应该对返回的 T 做什么 .

我的第一次尝试是使用 Int 作为 T 的下面的函数 . 从许多地方调用它以确保连接使用单个池 .

val pool = Http().cachedHostConnectionPool[Int]("127.0.0.1", 8888, ConnectionPoolSettings(system))

def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
  val unique = Random.nextInt
  Source.single(req → unique).via(pool).runWith(Sink.head).flatMap {
    case (Success(r: HttpResponse), `unique`) ⇒ Future.successful(r)
    case (Failure(f), `unique`) ⇒ Future.failed(f)
    case (_, i) ⇒ Future.failed(new Exception("Return does not match the request"))
  }
}

问题是客户应该如何使用 T ?有更清洁更有效的解决方案吗?最后,我的偏执是否可能无法实现偏执?

3 回答

  • 0

    我最初对此感到有些困惑,直到我几次阅读文档 . 如果您要将单个请求用于池中,无论共享同一个池有多少个不同的位置,您提供的 T (在您的情况下为 Int )无关紧要 . 因此,如果您一直使用 Source.single ,那么如果您真的需要,那么该密钥总是可以 1 .

    但它确实发挥作用的方法是,如果一段代码将使用池并一次向池中提交多个请求,并希望得到所有这些请求的响应 . 原因是响应按照从被调用的服务接收的顺序返回,而不是它们被提供给池的顺序 . 每个请求可能需要不同的时间,因此它们按照从池中收回的顺序向下游流向 Sink .

    假设我们有一个服务接受 GET 请求,其中包含以下形式的网址:

    /product/123
    

    123 部分是您要查找的产品的ID . 如果我想一次查找产品 1-10 ,并且每个产品都有单独的请求,那么这就是标识符变得重要的地方,这样我就可以将每个 HttpResponse 与它的产品ID相关联 . 此方案的简化代码示例如下:

    val requests = for(id <- 1 until 10) yield (HttpRequest(HttpMethods.GET, s"/product/$id"), id)
    val responsesMapFut:Future[Map[Int,HttpResponse]] = 
      Source(requests).
        via(pool).
        runFold(Map.empty[Int,HttpResponse]){
          case (m, (util.Success(resp), id)) => 
            m ++ Map(id -> resp)
    
          case (m, (util.Failure(ex), i)) =>
            //Log a failure here probably
              m
        }
    

    当我在 fold 中得到我的回复时,我也方便地拥有每个与之关联的id,因此我可以将它们添加到由id键入的 Map 中 . 如果没有这个功能,我可能不得不做一些事情,比如解析正文(如果它是json)来尝试找出哪个响应是哪个并且这不是理想的,并且这不包括失败的情况 . 在这个解决方案中,我知道哪些请求失败了,因为我仍然得到了标识符 .

    我希望能为你澄清一些事情 .

  • 23

    使用基于HTTP的资源时,Akka HTTP连接池是强大的盟友 . 如果您要一次执行单个请求,那么解决方案是:

    def exec(req: HttpRequest): Future[HttpResponse] = {
      Source.single(req → 1)
        .via(pool)
        .runWith(Sink.head).flatMap {
          case (Success(r: HttpResponse), _) ⇒ Future.successful(r)
          case (Failure(f), _) ⇒ Future.failed(f)
        }
    }
    

    因为您正在执行 single 请求,所以无需消除响应的歧义 . 然而,Akka流是聪明的 . 您可以同时向池提交多个请求 . 在这个例子中,我们传入一个 Iterable[HttpRequest] . 返回的 Iterable[HttpResponse] 使用 SortedMap 重新排序,与原始请求的顺序相同 . 你可以做一个 request zip response 来排序:

    def exec(requests: Iterable[HttpRequest]): Future[Iterable[Future[HttpResponse]]] = {
      Source(requests.zipWithIndex.toMap)
        .via(pool)
        .runFold(SortedMap[Int, Future[HttpResponse]]()) {
          case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
          case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
        }.map(r ⇒ r.values)
    }
    

    如果您需要以自己的方式解开东西,那么可迭代期货的期货很棒 . 通过扁平化事物可以获得更简单的响应 .

    def execFlatten(requests: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {
      Source(requests.zipWithIndex.toMap)
        .via(pool)
        .runFold(SortedMap[Int, Future[HttpResponse]]()) {
          case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
          case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
        }.flatMap(r ⇒ Future.sequence(r.values))
    }
    

    我使用所有导入和包装器制作了this gist,以使客户端使用HTTP服务 .

    特别感谢@cmbaxter他的简洁例子 .

  • 7

    有一个开放的票据,用于改进关于此的akka-http文档 . 请check this example

    val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80)
    val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew)
      .via(pool)
      .toMat(Sink.foreach({
         case ((Success(resp), p)) => p.success(resp)
        case ((Failure(e), p)) => p.failure(e)
      }))(Keep.left)
      .run
    
    
    val promise = Promise[HttpResponse]
    val request = HttpRequest(uri = "/") -> promise
    
    val response = queue.offer(request).flatMap(buffered => {
      if (buffered) promise.future
      else Future.failed(new RuntimeException())
    })
    

相关问题