1. 程式人生 > >Akka(36): Http:Client-side-Api,Client-Connections

Akka(36): Http:Client-side-Api,Client-Connections

on() ans sport 轉換成 strategy result ora method shu

Akka-http的客戶端Api應該是以HttpRequest操作為主軸的網上消息交換模式編程工具。我們知道:Akka-http是搭建在Akka-stream之上的。所以,Akka-http在客戶端構建與服務器的連接通道也可以用Akka-stream的Flow來表示。這個Flow可以通過調用Http.outgoingConnection來獲取:

  /**
   * Creates a [[akka.stream.scaladsl.Flow]] representing a prospective HTTP client connection to the given endpoint.
   * Every materialization of the produced flow will attempt to establish a new outgoing connection.
   *
   * To configure additional settings for requests made using this method,
   * use the `akka.http.client` config section or pass in a [[akka.http.scaladsl.settings.ClientConnectionSettings]] explicitly.
   
*/ def outgoingConnection(host: String, port: Int = 80, localAddress: Option[InetSocketAddress] = None, settings: ClientConnectionSettings = ClientConnectionSettings(system), log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = _outgoingConnection(host, port, settings.withLocalAddressOverride(localAddress), ConnectionContext.noEncryption(), ClientTransport.TCP, log)

我們看到:這個函數實現了對Server端地址host+port的設定,返回的結果類型是Flow[HttpRequest,HttpResponse,Future[OutgoingConnection]]。這個Flow代表將輸入的HttpRequest轉換成輸出的HttpResponse。這個轉換過程包括了與Server之間的Request,Response消息交換。下面我們試著用這個Flow來向Server端發送request,並獲取response:

  val connFlow: Flow[HttpRequest,HttpResponse,Future[Http.OutgoingConnection]] =
    Http().outgoingConnection(
"akka.io") def sendHttpRequest(req: HttpRequest) = { Source.single(req) .via(connFlow) .runWith(Sink.head) } sendHttpRequest(HttpRequest(uri="/")) .andThen{ case Success(resp) => println(s"got response: ${resp.status.intValue()}") case Failure(err) => println(s"request failed: ${err.getMessage}") } .andThen {case _ => sys.terminate()}

上面的這種模式就是所謂Connection-Level-Client-Side-Api。這種模式可以讓用戶有更大程度的自由度控制connection的構建、使用及在connection上發送request的方式。一般來講,當返回response的entity被完全消耗後系統會自動close connection,這套api還提供了一些手動方法可以在有需要的情況下手動進行connection close,如下:

 //close connection by cancelling response entity
  resp.entity.dataBytes.runWith(Sink.cancelled)
 //close connection by receiving response with close header
  Http().bindAndHandleSync(
    { req ? HttpResponse(headers = headers.Connection("close") :: Nil) },
    "akka.io",
    80)(mat)

Akka-http客戶端api還有一種實用的Host-Level-Client-Side-Api模式。這套api能自動針對每個端點維護一個連接池(connection-pool),用戶只需對連接池進行配置。系統按照連接池配置自動維護池內線程的生、死、動、停。akka-http.host-connection-pool配置中max-connections,max-open-requests,pipelining-limit等控制著connection、在途request的數量,需要特別註意。針對某個端點的連接池是通過Http().cachedHostConnectionPool(endPoint)獲取的。同樣,獲取的也是一個client-flow實例。因為系統自動維護著線程池,所以client-flow實例可以任意引用,無論調用次數與調用時間間隔。cachedHostConnectionPool()函數定義如下:

  /**
   * Same as [[#cachedHostConnectionPool]] but for encrypted (HTTPS) connections.
   *
   * If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used
   * for encryption on the connections.
   *
   * To configure additional settings for the pool (and requests made using it),
   * use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly.
   */
  def cachedHostConnectionPoolHttps[T](host: String, port: Int = 443,
                                       connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
                                       settings:          ConnectionPoolSettings = defaultConnectionPoolSettings,
                                       log:               LoggingAdapter         = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
    val cps = ConnectionPoolSetup(settings, connectionContext, log)
    val setup = HostConnectionPoolSetup(host, port, cps)
    cachedHostConnectionPool(setup)
  }

函數返回結果類型:Flow[(HttpRequest,T),(Try[HttpResponse],T),HostConnectionPool]。因為線程池內的線程是異步構建request和接收response的,而返回response的順序未必按照發送request的順序,所以需要一個tuple2的T類型標示request與返回的response進行匹配。線程池會根據idle-timeout自動終止,也可以手動通過HostConnectionPool.shutDown()實現:

  /**
   * Represents a connection pool to a specific target host and pool configuration.
   */
  final case class HostConnectionPool private[http] (setup: HostConnectionPoolSetup)(
    private[http] val gateway: PoolGateway) { // enable test access

    /**
     * Asynchronously triggers the shutdown of the host connection pool.
     *
     * The produced [[scala.concurrent.Future]] is fulfilled when the shutdown has been completed.
     */
    def shutdown()(implicit ec: ExecutionContextExecutor): Future[Done] = gateway.shutdown()

    private[http] def toJava = new akka.http.javadsl.HostConnectionPool {
      override def setup = HostConnectionPool.this.setup
      override def shutdown(executor: ExecutionContextExecutor): CompletionStage[Done] = HostConnectionPool.this.shutdown()(executor).toJava
    }
  }

也可以通過Http().shutdownAllConnectionPools()一次性終止ActorSystem內所有線程池:

  /**
   * Triggers an orderly shutdown of all host connections pools currently maintained by the [[akka.actor.ActorSystem]].
   * The returned future is completed when all pools that were live at the time of this method call
   * have completed their shutdown process.
   *
   * If existing pool client flows are re-used or new ones materialized concurrently with or after this
   * method call the respective connection pools will be restarted and not contribute to the returned future.
   */
  def shutdownAllConnectionPools(): Future[Unit] = {
    val shutdownCompletedPromise = Promise[Done]()
    poolMasterActorRef ! ShutdownAll(shutdownCompletedPromise)
    shutdownCompletedPromise.future.map(_ ? ())(system.dispatcher)
  }

我們用cachedHostConnectionPool獲取一個client-flow實例:

Flow[(HttpRequest,T),(Try[HttpResponse],T),HostConnectionPool]後就可以進行輸入HttpRequest到HttpResponse的轉換處理。如下面的例子:

  val pooledFlow: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool] =
      Http().cachedHostConnectionPool[Int](host="akka.io",port=80)

  def sendPoolRequest(req: HttpRequest, marker: Int) = {
    Source.single(req -> marker)
      .via(pooledFlow)
      .runWith(Sink.head)
  }

  sendPoolRequest(HttpRequest(uri="/"), 1)
    .andThen{
      case Success((tryResp, mk)) =>
        tryResp match {
          case Success(resp) => println(s"got response: ${resp.status.intValue()}")
          case Failure(err) => println(s"request failed: ${err.getMessage}")
        }
      case Failure(err) => println(s"request failed: ${err.getMessage}")
    }
    .andThen {case _ => sys.terminate()}

在以上這個例子裏實際同樣會遇到Connection-Level-Api所遇的的問題,這是因為獲取的線程池內的線程還是有限的,只能緩解因為request速率超出response速率所造成的request積壓。目前最有效的方法還是通過使用一個queue來暫存request後再逐個處理:

    val QueueSize = 10
    // This idea came initially from this blog post:
    // http://kazuhiro.github.io/scala/akka/akka-http/akka-streams/2016/01/31/connection-pooling-with-akka-http-and-source-queue.html
    val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
    val queue =
      Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
        .via(poolClientFlow)
        .toMat(Sink.foreach({
          case ((Success(resp), p)) => p.success(resp)
          case ((Failure(e), p))    => p.failure(e)
        }))(Keep.left)
        .run()

    def queueRequest(request: HttpRequest): Future[HttpResponse] = {
      val responsePromise = Promise[HttpResponse]()
      queue.offer(request -> responsePromise).flatMap {
        case QueueOfferResult.Enqueued    => responsePromise.future
        case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
        case QueueOfferResult.Failure(ex) => Future.failed(ex)
        case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
      }
    }

    val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/"))
    responseFuture.andThen {
      case Success(resp) => println(s"got response: ${resp.status.intValue()}")
      case Failure(err) => println(s"request failed: ${err.getMessage}")
    }.andThen {case _ => sys.terminate()}

下面是本次Akka-http-client-side-connection討論的示範源代碼:

import akka.actor._
import akka.http.javadsl.{HostConnectionPool, OutgoingConnection}
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._

import scala.concurrent._
import scala.util._

object ClientApiDemo extends App {
  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher
/*
  val connFlow: Flow[HttpRequest,HttpResponse,Future[Http.OutgoingConnection]] =
    Http().outgoingConnection("www.sina.com")

  def sendHttpRequest(req: HttpRequest) = {
    Source.single(req)
      .via(connFlow)
      .runWith(Sink.head)
  }

  sendHttpRequest(HttpRequest(uri="/"))
      .andThen{
        case Success(resp) =>
          //close connection by cancelling response entity
          resp.entity.dataBytes.runWith(Sink.cancelled)
          println(s"got response: ${resp.status.intValue()}")
        case Failure(err) => println(s"request failed: ${err.getMessage}")
      }
  //   .andThen {case _ => sys.terminate()}


 //close connection by receiving response with close header
  Http().bindAndHandleSync(
    { req ? HttpResponse(headers = headers.Connection("close") :: Nil) },
    "akka.io",
    80)(mat)

  val pooledFlow: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool] =
      Http().cachedHostConnectionPool[Int](host="akka.io",port=80)

  def sendPoolRequest(req: HttpRequest, marker: Int) = {
    Source.single(req -> marker)
      .via(pooledFlow)
      .runWith(Sink.head)
  }

  sendPoolRequest(HttpRequest(uri="/"), 1)
    .andThen{
      case Success((tryResp, mk)) =>
        tryResp match {
          case Success(resp) => println(s"got response: ${resp.status.intValue()}")
          case Failure(err) => println(s"request failed: ${err.getMessage}")
        }
      case Failure(err) => println(s"request failed: ${err.getMessage}")
    }
    .andThen {case _ => sys.terminate()}
*/

    val QueueSize = 10
    // This idea came initially from this blog post:
    // http://kazuhiro.github.io/scala/akka/akka-http/akka-streams/2016/01/31/connection-pooling-with-akka-http-and-source-queue.html
    val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
    val queue =
      Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
        .via(poolClientFlow)
        .toMat(Sink.foreach({
          case ((Success(resp), p)) => p.success(resp)
          case ((Failure(e), p))    => p.failure(e)
        }))(Keep.left)
        .run()

    def queueRequest(request: HttpRequest): Future[HttpResponse] = {
      val responsePromise = Promise[HttpResponse]()
      queue.offer(request -> responsePromise).flatMap {
        case QueueOfferResult.Enqueued    => responsePromise.future
        case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
        case QueueOfferResult.Failure(ex) => Future.failed(ex)
        case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
      }
    }

    val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/"))
    responseFuture.andThen {
      case Success(resp) => println(s"got response: ${resp.status.intValue()}")
      case Failure(err) => println(s"request failed: ${err.getMessage}")
    }.andThen {case _ => sys.terminate()}

}

Akka(36): Http:Client-side-Api,Client-Connections