1. 程式人生 > >Akka(38): Http:Entityof ByteString-數據傳輸基礎

Akka(38): Http:Entityof ByteString-數據傳輸基礎

import req pre header 必須 討論 throw method 只需要

我們說過Akka-http是一個好的系統集成工具,集成是通過數據交換方式實現的。Http是個在網上傳輸和接收的規範協議。所以,在使用Akka-http之前,可能我們還是需要把Http模式的網上數據交換細節了解清楚。數據交換雙方是通過Http消息類型Request和Response來實現的。在Akka-http中對應的是HttpRequest和HttpResponse。這兩個類型都具備HttpEntity類型來裝載需要交換的數據。首先,無論如何數據在線上的表現形式肯定是一串bytes。所以,數據交換兩頭Request,Response中的Entity也必須是以bytes來表達的。在Akka-http裏我們把需要傳輸的數據轉換成ByteString,通過網絡發送給接收端、接收端再把收到消息Entity中的ByteString轉換成目標類型的數據。這兩個轉換過程就是Akka-http的Marshalling和Unmarshalling過程了。我們先從HttpEntity的構建函數來了解它的定義:

object HttpEntity {
  implicit def apply(string: String): HttpEntity.Strict = apply(ContentTypes.`text/plain(UTF-8)`, string)
  implicit def apply(bytes: Array[Byte]): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, bytes)
  implicit def apply(data: ByteString): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, data)
  def apply(contentType: ContentType.NonBinary, 
string: String): HttpEntity.Strict = if (string.isEmpty) empty(contentType) else apply(contentType, ByteString(string.getBytes(contentType.charset.nioCharset))) def apply(contentType: ContentType, bytes: Array[Byte]): HttpEntity.Strict = if (bytes.length == 0) empty(contentType) else apply(contentType, ByteString(bytes)) def apply(contentType: ContentType, data: ByteString): HttpEntity.Strict
= if (data.isEmpty) empty(contentType) else HttpEntity.Strict(contentType, data) def apply(contentType: ContentType, contentLength: Long, data: Source[ByteString, Any]): UniversalEntity = if (contentLength == 0) empty(contentType) else HttpEntity.Default(contentType, contentLength, data) def apply(contentType: ContentType, data: Source[ByteString, Any]): HttpEntity.Chunked = HttpEntity.Chunked.fromData(contentType, data) ...

很明顯,HttpEntity可以分兩大類,一種是Strict類型的,它的data是ByteString。另一種是UniversalEntity類型,它的數據dataBytes是Source[ByteString,Any]。無論如何最終在線上的還是ByteString。HttpEntity的ContentType註明了傳輸數據格式,有:

object ContentTypes {
  val `application/json` = ContentType(MediaTypes.`application/json`)
  val `application/octet-stream` = ContentType(MediaTypes.`application/octet-stream`)
  val `text/plain(UTF-8)` = MediaTypes.`text/plain` withCharset HttpCharsets.`UTF-8`
  val `text/html(UTF-8)` = MediaTypes.`text/html` withCharset HttpCharsets.`UTF-8`
  val `text/xml(UTF-8)` = MediaTypes.`text/xml` withCharset HttpCharsets.`UTF-8`
  val `text/csv(UTF-8)` = MediaTypes.`text/csv` withCharset HttpCharsets.`UTF-8`

  // used for explicitly suppressing the rendering of Content-Type headers on requests and responses
  val NoContentType = ContentType(MediaTypes.NoMediaType)
}

註意:ContentType只是一種備註,不影響線上數據表達形式,線上的數據永遠是ByteString。但是,其中的application/octet-stream類型代表數據必須是Source[ByteString,Any]。我們下面就通過客戶端的例子來理解HttpEntity。下面是一個客戶端測試函數:

  def runService(request: HttpRequest, rentity: RequestEntity) = {
   val futResp = for {
     entity <- Future.successful(rentity)
     resp <- Http(sys).singleRequest(
       request.copy(entity = rentity)
     )
   } yield resp

   futResp
    .andThen {
      case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
        entity.dataBytes.map(_.utf8String).runForeach(println)
      case Success(r@HttpResponse(code, _, _, _)) =>
        println(s"Download request failed, response code: $code")
        r.discardEntityBytes()
      case Success(_) => println("Unable to download rows!")
      case Failure(err) => println(s"Download failed: ${err.getMessage}")

    }
  }

我們只需要對這個函數傳入RequestEntity就可以了解返回Response裏Entity的許多細節了。首先我們要求服務端發送一個純字符串Hello World。服務端代碼如下:

 } ~ path("text") {
      get {
        complete("Hello World!")
      } ~

雖然complete("Hello World!")有些迷糊,不過應該complete做了些字符串到ByteString的轉換。我們可以從上面這個runService函數得到證實。下面是這個例子的調用:

  val reqText = HttpRequest(uri = s"http://localhost:8011/text")
  runService(reqText,HttpEntity.Empty)
    .andThen{case _ => sys.terminate()}

從顯示的結果可以得出runService函數中的entity.dataBytes.map(_.utf8String)已經把ByteString轉換成了String,也就是說服務器端發送的Entity裏的數據是ByteString。

我們再試著發送一些數據給服務端,然後讓服務端把結果通過response entity返回來:

    } ~ path("text") {
      get {
        complete("Hello World!")
      } ~
        post {
          withoutSizeLimit {
            extractDataBytes { bytes =>
              val data = bytes.runFold(ByteString())(_ ++ _)
              onComplete(data) { t =>
                complete(t)
              }
            }
          }
        }

我們看到服務端對request entity的操作是以ByteString進行的。客戶端上傳一串字符的request如下:

  val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text")
  val uploadText = HttpEntity(
    ContentTypes.`text/plain(UTF-8)`,
    // transform each number to a chunk of bytes
    ByteString("hello world again")
  )
  runService(postText,uploadText)
    .andThen{case _ => sys.terminate()}

我們可以看到放進entity裏的數據是ByteString。

我們知道Akka-http是基於Akka-Stream的,具備Reactive-Stream功能特性。下面我們就示範一下如何進行stream的上傳下載。首先定制一個Source:

  val numbers = Source.fromIterator(() =>
    Iterator.continually(Random.nextInt()))
    .map(n => ByteString(s"$n\n"))
  //make conform to withoutSizeLimit constrain
  val source = limitableByteSource(numbers)

服務端也是用HttpEntity來裝載這個Source然後通過HttpRequest傳給客戶端的:

  path("random") {
      get {
        complete(
          HttpEntity(
            ContentTypes.`application/octet-stream`,
            // transform each number to a chunk of bytes
            source.take(10000)
          )
        )
      } ~
  

我們在客戶端還是用runService來解析傳過來的entity。由於接收一個大型的Source,所以需要修改一下接收方式代碼:

   futResp
    .andThen {
      case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
        val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println)
             Await.result(futEnt, Duration.Inf) // throws if binding fails
             println("End of stream!!!")
      case Success(r@HttpResponse(code, _, _, _)) =>
        println(s"Download request failed, response code: $code")
        r.discardEntityBytes()
      case Success(_) => println("Unable to download rows!")
      case Failure(err) => println(s"Download failed: ${err.getMessage}")

    }
 

用下面的方式調用

  val reqRandom = HttpRequest(uri = s"http://localhost:8011/random")
    runService(reqRandom,HttpEntity.Empty)
     .andThen{case _ => sys.terminate()}

再示範一下在客戶端用Source上傳數據。服務端代碼:

       post {
          withoutSizeLimit {
            extractDataBytes { bytes =>
              val data = bytes.runFold(ByteString())(_ ++ _)
              onComplete(data) { t =>
                complete(t)
              }
            }
          }
        }

客戶端上傳數據範例:

 val numbers = Source.fromIterator(() =>
    Iterator.continually(Random.nextInt()))
    .map(n => ByteString(s"$n\n"))
  //make conform to withoutSizeLimit constrain
  val source = limitableByteSource(numbers)

  val bytes = HttpEntity(
    ContentTypes.`application/octet-stream`,
    // transform each number to a chunk of bytes
    source.take(10000)
  )
  val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random")
  runService(postRandom,bytes)
    .andThen{case _ => sys.terminate()}

從上面討論我們了解了在Marshal,Unmarshal下層只是ByteString的操作和轉換。下面是本次討論示範源代碼:

服務端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.util.ByteString
import akka.http.scaladsl.model.HttpEntity._

import scala.util.Random

object ServerEntity extends App {

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher


  val numbers = Source.fromIterator(() =>
    Iterator.continually(Random.nextInt()))
    .map(n => ByteString(s"$n\n"))
  //make conform to withoutSizeLimit constrain
  val source = limitableByteSource(numbers)



  val route =
    path("random") {
      get {
        withoutSizeLimit {
          complete(
            HttpEntity(
              ContentTypes.`application/octet-stream`,
              // transform each number to a chunk of bytes
              source.take(1000))
          )
        }
      } ~
        post {
          withoutSizeLimit {
            extractDataBytes { bytes =>
              val data = bytes.runFold(ByteString())(_ ++ _)
              onComplete(data) { t =>
                complete(t)
              }
            }
          }
        }
    } ~ path("text") {
      get {
        complete("Hello World!")
      } ~
        post {
          withoutSizeLimit {
            extractDataBytes { bytes =>
              val data = bytes.runFold(ByteString())(_ ++ _)
              onComplete(data) { t =>
                complete(t)
              }
            }
          }
        }
    }


  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

客戶端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.limitableByteSource
import akka.http.scaladsl.model._

import scala.concurrent.duration._
import akka.util.ByteString

import scala.concurrent._
import scala.util._

object ClientEntity extends App {

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  def runService(request: HttpRequest, rentity: RequestEntity) = {
   val futResp = for {
     entity <- Future.successful(rentity)
     resp <- Http(sys).singleRequest(
       request.copy(entity = rentity)
     )
   } yield resp

   futResp
    .andThen {
      case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
        val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println)
             Await.result(futEnt, Duration.Inf) // throws if binding fails
             println("End of stream!!!")
      case Success(r@HttpResponse(code, _, _, _)) =>
        println(s"Download request failed, response code: $code")
        r.discardEntityBytes()
      case Success(_) => println("Unable to download rows!")
      case Failure(err) => println(s"Download failed: ${err.getMessage}")

    }
  }

  val reqText = HttpRequest(uri = s"http://localhost:8011/text")
//  runService(reqText,HttpEntity.Empty)
//    .andThen{case _ => sys.terminate()}

  val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text")
  val uploadText = HttpEntity(
    ContentTypes.`text/plain(UTF-8)`,
    // transform each number to a chunk of bytes
    ByteString("hello world again")
  )
//  runService(postText,uploadText)
//    .andThen{case _ => sys.terminate()}

  val reqRandom = HttpRequest(uri = s"http://localhost:8011/random")
 //   runService(reqRandom,HttpEntity.Empty)
 //    .andThen{case _ => sys.terminate()}

  val numbers = Source.fromIterator(() =>
    Iterator.continually(Random.nextInt()))
    .map(n => ByteString(s"$n\n"))
  //make conform to withoutSizeLimit constrain
  val source = limitableByteSource(numbers)

  val bytes = HttpEntity(
    ContentTypes.`application/octet-stream`,
    // transform each number to a chunk of bytes
    source.take(10000)
  )
  val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random")
  runService(postRandom,bytes)
    .andThen{case _ => sys.terminate()}


}

Akka(38): Http:Entityof ByteString-數據傳輸基礎