Akka(38): Http:Entityof ByteString-數據傳輸基礎
我們說過Akka-http是一個好的系統集成工具,集成是通過數據交換方式實現的。Http是個在網上傳輸和接收的規範協議。所以,在使用Akka-http之前,可能我們還是需要把Http模式的網上數據交換細節了解清楚。數據交換雙方是通過Http消息類型Request和Response來實現的。在Akka-http中對應的是HttpRequest和HttpResponse。這兩個類型都具備HttpEntity類型來裝載需要交換的數據。首先,無論如何數據在線上的表現形式肯定是一串bytes。所以,數據交換兩頭Request,Response中的Entity也必須是以bytes來表達的。在Akka-http裏我們把需要傳輸的數據轉換成ByteString,通過網絡發送給接收端、接收端再把收到消息Entity中的ByteString轉換成目標類型的數據。這兩個轉換過程就是Akka-http的Marshalling和Unmarshalling過程了。我們先從HttpEntity的構建函數來了解它的定義:
object HttpEntity {
implicit def apply(string: String): HttpEntity.Strict = apply(ContentTypes.`text/plain(UTF-8)`, string)
implicit def apply(bytes: Array[Byte]): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, bytes)
implicit def apply(data: ByteString): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, data)
def apply(contentType: ContentType.NonBinary, string: String): HttpEntity.Strict =
if (string.isEmpty) empty(contentType) else apply(contentType, ByteString(string.getBytes(contentType.charset.nioCharset)))
def apply(contentType: ContentType, bytes: Array[Byte]): HttpEntity.Strict =
if (bytes.length == 0) empty(contentType) else apply(contentType, ByteString(bytes))
def apply(contentType: ContentType, data: ByteString): HttpEntity.Strict =
if (data.isEmpty) empty(contentType) else HttpEntity.Strict(contentType, data)
def apply(contentType: ContentType, contentLength: Long, data: Source[ByteString, Any]): UniversalEntity =
if (contentLength == 0) empty(contentType) else HttpEntity.Default(contentType, contentLength, data)
def apply(contentType: ContentType, data: Source[ByteString, Any]): HttpEntity.Chunked =
HttpEntity.Chunked.fromData(contentType, data)
...
很明顯,HttpEntity可以分兩大類,一種是Strict類型的,它的data是ByteString。另一種是UniversalEntity類型,它的數據dataBytes是Source[ByteString,Any]。無論如何最終在線上的還是ByteString。HttpEntity的ContentType註明了傳輸數據格式,有:
object ContentTypes {
val `application/json` = ContentType(MediaTypes.`application/json`)
val `application/octet-stream` = ContentType(MediaTypes.`application/octet-stream`)
val `text/plain(UTF-8)` = MediaTypes.`text/plain` withCharset HttpCharsets.`UTF-8`
val `text/html(UTF-8)` = MediaTypes.`text/html` withCharset HttpCharsets.`UTF-8`
val `text/xml(UTF-8)` = MediaTypes.`text/xml` withCharset HttpCharsets.`UTF-8`
val `text/csv(UTF-8)` = MediaTypes.`text/csv` withCharset HttpCharsets.`UTF-8`
// used for explicitly suppressing the rendering of Content-Type headers on requests and responses
val NoContentType = ContentType(MediaTypes.NoMediaType)
}
註意:ContentType只是一種備註,不影響線上數據表達形式,線上的數據永遠是ByteString。但是,其中的application/octet-stream類型代表數據必須是Source[ByteString,Any]。我們下面就通過客戶端的例子來理解HttpEntity。下面是一個客戶端測試函數:
def runService(request: HttpRequest, rentity: RequestEntity) = {
val futResp = for {
entity <- Future.successful(rentity)
resp <- Http(sys).singleRequest(
request.copy(entity = rentity)
)
} yield resp
futResp
.andThen {
case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
entity.dataBytes.map(_.utf8String).runForeach(println)
case Success(r@HttpResponse(code, _, _, _)) =>
println(s"Download request failed, response code: $code")
r.discardEntityBytes()
case Success(_) => println("Unable to download rows!")
case Failure(err) => println(s"Download failed: ${err.getMessage}")
}
}
我們只需要對這個函數傳入RequestEntity就可以了解返回Response裏Entity的許多細節了。首先我們要求服務端發送一個純字符串Hello World。服務端代碼如下:
} ~ path("text") { get { complete("Hello World!") } ~
雖然complete("Hello World!")有些迷糊,不過應該complete做了些字符串到ByteString的轉換。我們可以從上面這個runService函數得到證實。下面是這個例子的調用:
val reqText = HttpRequest(uri = s"http://localhost:8011/text")
runService(reqText,HttpEntity.Empty)
.andThen{case _ => sys.terminate()}
從顯示的結果可以得出runService函數中的entity.dataBytes.map(_.utf8String)已經把ByteString轉換成了String,也就是說服務器端發送的Entity裏的數據是ByteString。
我們再試著發送一些數據給服務端,然後讓服務端把結果通過response entity返回來:
} ~ path("text") {
get {
complete("Hello World!")
} ~
post {
withoutSizeLimit {
extractDataBytes { bytes =>
val data = bytes.runFold(ByteString())(_ ++ _)
onComplete(data) { t =>
complete(t)
}
}
}
}
我們看到服務端對request entity的操作是以ByteString進行的。客戶端上傳一串字符的request如下:
val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text")
val uploadText = HttpEntity(
ContentTypes.`text/plain(UTF-8)`,
// transform each number to a chunk of bytes
ByteString("hello world again")
)
runService(postText,uploadText)
.andThen{case _ => sys.terminate()}
我們可以看到放進entity裏的數據是ByteString。
我們知道Akka-http是基於Akka-Stream的,具備Reactive-Stream功能特性。下面我們就示範一下如何進行stream的上傳下載。首先定制一個Source:
val numbers = Source.fromIterator(() =>
Iterator.continually(Random.nextInt()))
.map(n => ByteString(s"$n\n"))
//make conform to withoutSizeLimit constrain
val source = limitableByteSource(numbers)
服務端也是用HttpEntity來裝載這個Source然後通過HttpRequest傳給客戶端的:
path("random") {
get {
complete(
HttpEntity(
ContentTypes.`application/octet-stream`,
// transform each number to a chunk of bytes
source.take(10000)
)
)
} ~
我們在客戶端還是用runService來解析傳過來的entity。由於接收一個大型的Source,所以需要修改一下接收方式代碼:
futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println) Await.result(futEnt, Duration.Inf) // throws if binding fails println("End of stream!!!") case Success(r@HttpResponse(code, _, _, _)) => println(s"Download request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to download rows!") case Failure(err) => println(s"Download failed: ${err.getMessage}") }
用下面的方式調用:
val reqRandom = HttpRequest(uri = s"http://localhost:8011/random")
runService(reqRandom,HttpEntity.Empty)
.andThen{case _ => sys.terminate()}
再示範一下在客戶端用Source上傳數據。服務端代碼:
post {
withoutSizeLimit {
extractDataBytes { bytes =>
val data = bytes.runFold(ByteString())(_ ++ _)
onComplete(data) { t =>
complete(t)
}
}
}
}
客戶端上傳數據範例:
val numbers = Source.fromIterator(() =>
Iterator.continually(Random.nextInt()))
.map(n => ByteString(s"$n\n"))
//make conform to withoutSizeLimit constrain
val source = limitableByteSource(numbers)
val bytes = HttpEntity(
ContentTypes.`application/octet-stream`,
// transform each number to a chunk of bytes
source.take(10000)
)
val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random")
runService(postRandom,bytes)
.andThen{case _ => sys.terminate()}
從上面討論我們了解了在Marshal,Unmarshal下層只是ByteString的操作和轉換。下面是本次討論示範源代碼:
服務端:
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.util.ByteString
import akka.http.scaladsl.model.HttpEntity._
import scala.util.Random
object ServerEntity extends App {
implicit val httpSys = ActorSystem("httpSystem")
implicit val httpMat = ActorMaterializer()
implicit val httpEC = httpSys.dispatcher
val numbers = Source.fromIterator(() =>
Iterator.continually(Random.nextInt()))
.map(n => ByteString(s"$n\n"))
//make conform to withoutSizeLimit constrain
val source = limitableByteSource(numbers)
val route =
path("random") {
get {
withoutSizeLimit {
complete(
HttpEntity(
ContentTypes.`application/octet-stream`,
// transform each number to a chunk of bytes
source.take(1000))
)
}
} ~
post {
withoutSizeLimit {
extractDataBytes { bytes =>
val data = bytes.runFold(ByteString())(_ ++ _)
onComplete(data) { t =>
complete(t)
}
}
}
}
} ~ path("text") {
get {
complete("Hello World!")
} ~
post {
withoutSizeLimit {
extractDataBytes { bytes =>
val data = bytes.runFold(ByteString())(_ ++ _)
onComplete(data) { t =>
complete(t)
}
}
}
}
}
val (port, host) = (8011,"localhost")
val bindingFuture = Http().bindAndHandle(route,host,port)
println(s"Server running at $host $port. Press any key to exit ...")
scala.io.StdIn.readLine()
bindingFuture.flatMap(_.unbind())
.onComplete(_ => httpSys.terminate())
}
客戶端:
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.limitableByteSource
import akka.http.scaladsl.model._
import scala.concurrent.duration._
import akka.util.ByteString
import scala.concurrent._
import scala.util._
object ClientEntity extends App {
implicit val sys = ActorSystem("ClientSys")
implicit val mat = ActorMaterializer()
implicit val ec = sys.dispatcher
def runService(request: HttpRequest, rentity: RequestEntity) = {
val futResp = for {
entity <- Future.successful(rentity)
resp <- Http(sys).singleRequest(
request.copy(entity = rentity)
)
} yield resp
futResp
.andThen {
case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println)
Await.result(futEnt, Duration.Inf) // throws if binding fails
println("End of stream!!!")
case Success(r@HttpResponse(code, _, _, _)) =>
println(s"Download request failed, response code: $code")
r.discardEntityBytes()
case Success(_) => println("Unable to download rows!")
case Failure(err) => println(s"Download failed: ${err.getMessage}")
}
}
val reqText = HttpRequest(uri = s"http://localhost:8011/text")
// runService(reqText,HttpEntity.Empty)
// .andThen{case _ => sys.terminate()}
val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text")
val uploadText = HttpEntity(
ContentTypes.`text/plain(UTF-8)`,
// transform each number to a chunk of bytes
ByteString("hello world again")
)
// runService(postText,uploadText)
// .andThen{case _ => sys.terminate()}
val reqRandom = HttpRequest(uri = s"http://localhost:8011/random")
// runService(reqRandom,HttpEntity.Empty)
// .andThen{case _ => sys.terminate()}
val numbers = Source.fromIterator(() =>
Iterator.continually(Random.nextInt()))
.map(n => ByteString(s"$n\n"))
//make conform to withoutSizeLimit constrain
val source = limitableByteSource(numbers)
val bytes = HttpEntity(
ContentTypes.`application/octet-stream`,
// transform each number to a chunk of bytes
source.take(10000)
)
val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random")
runService(postRandom,bytes)
.andThen{case _ => sys.terminate()}
}
Akka(38): Http:Entityof ByteString-數據傳輸基礎