Akka(30): Http:High-Level-Api,Routing DSL
在上篇我們介紹了Akka-http Low-Level-Api。實際上這個Api提供了Server對進來的Http-requests進行處理及反應的自定義Flow或者轉換函數的接入界面。我們看看下面官方文檔給出的例子:
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import scala.io.StdIn
object WebServer {
def main(args: Array[String]) {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future map/flatmap in the end
implicit val executionContext = system.dispatcher
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
HttpResponse(entity = HttpEntity(
ContentTypes.`text/html(UTF-8)`,
"<html><body>Hello world!</body></html>"))
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
sys.error("BOOM!")
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
val bindingFuture = Http().bindAndHandleSync(requestHandler, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
}
我們看到上面例子裏的requestHandler函數用模式匹配方式對可能收到的HttpRequest進行了相關HttpResponse的對應。在對應的過程中可能還會按request要求進行一些Server端的運算作為例如Rest-Api這樣的服務。不過對於大型的服務,模式匹配方式就會顯得篇幅臃腫及模式僵化。Akka-http提供了一套routing DSL作為High-Level-Api的主要組成部分。用routing DSL代替Low-Level-Api的模式匹配方式可以更簡練的編制HttpRequest到HttpResponse的轉換服務,能更靈活高效的實現現代大型Rest-Api服務。routing DSL實現Rest-Api服務的方式是通過構建一個由組件Directives組合而成的多個多層三明治結構Route。Route是一個類型:
type Route = RequestContext ⇒ Future[RouteResult]
下面是個Route例子:
val route: Flow[HttpRequest, HttpResponse, NotUsed]=
get {
pathSingleSlash {
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,"<html><body>Hello world!</body></html>"))
} ~
path("ping") {
complete("PONG!")
} ~
path("crash") {
sys.error("BOOM!")
}
}
在上期討論的例子裏我們可以這樣使用route:
val futBinding: Future[Http.ServerBinding] =
connSource.to { Sink.foreach{ connection =>
println(s"client address ${connection.remoteAddress}")
// connection handleWith flow
// connection handleWithSyncHandler syncHandler
//connection handleWithAsyncHandler asyncHandler
connection handleWith route
}}.run()
handleWith(flow)的參數應該是Flow[HttpRequest,HttpResponse,_]才對呀?這個我們先看看RouteResult對象:
/**
* The result of handling a request.
*
* As a user you typically don‘t create RouteResult instances directly.
* Instead, use the methods on the [[RequestContext]] to achieve the desired effect.
*/
sealed trait RouteResult extends javadsl.server.RouteResult
object RouteResult {
final case class Complete(response: HttpResponse) extends javadsl.server.Complete with RouteResult {
override def getResponse = response
}
final case class Rejected(rejections: immutable.Seq[Rejection]) extends javadsl.server.Rejected with RouteResult {
override def getRejections = rejections.map(r ⇒ r: javadsl.server.Rejection).toIterable.asJava
}
implicit def route2HandlerFlow(route: Route)(implicit
routingSettings: RoutingSettings,
parserSettings: ParserSettings,
materializer: Materializer,
routingLog: RoutingLog,
executionContext: ExecutionContext = null,
rejectionHandler: RejectionHandler = RejectionHandler.default,
exceptionHandler: ExceptionHandler = null): Flow[HttpRequest, HttpResponse, NotUsed] =
Route.handlerFlow(route)
}
這裏有個隱式轉換route2HandlerFlow把Route轉換成Flow[HttpRequest,HttpResponse,NotUsed],問題解決了。
從type Route=RequestContext => Future[RouteResult]可以看到:Route就是一個把RequestContext轉換成Future[RouteResult]的函數。RequestContext實質上封裝了個Request以及對Request進行操作的環境、配置和工具:
/**
* This class is not meant to be extended by user code.
*
* Immutable object encapsulating the context of an [[akka.http.scaladsl.model.HttpRequest]]
* as it flows through a akka-http Route structure.
*/
@DoNotInherit
trait RequestContext {
/** The request this context represents. Modelled as a `val` so as to enable an `import ctx.request._`. */
val request: HttpRequest
/** The unmatched path of this context. Modelled as a `val` so as to enable an `import ctx.unmatchedPath._`. */
val unmatchedPath: Uri.Path
/**
* The default ExecutionContext to be used for scheduling asynchronous logic related to this request.
*/
implicit def executionContext: ExecutionContextExecutor
...
}
Route是一種可組合組件。我們可以用簡單的Route組合成更多層次的Route。下面是組合Route的幾種方式:
1、Route轉化:對輸入的request,輸出的response進行轉化處理後把實際運算托付給下一層內部(inner)Route
2、篩選Route:只容許符合某種條件的Route通過並拒絕其它不符合條件的Route
3、鏈接Route:假如一個Route被拒絕,嘗試下一個Route。這個是通過 ~ 操作符號實現的
在Akka-http的routing DSL裏這些Route組合操作是通過Directive實現的。Akka-http提供了大量現成的Directive,我們也可以自定義一些特殊功能的Directive,詳情可以查詢官方文件或者api文件。
Directive的表達形式如下:
dirname(arguments) { extractions =>
... // 內層inner route
}
下面是Directive的一些用例:
下面的三個route效果相等:
val route: Route = { ctx =>
if (ctx.request.method == HttpMethods.GET)
ctx.complete("Received GET")
else
ctx.complete("Received something else")
}
val route =
get {
complete("Received GET")
} ~
complete("Received something else")
val route =
get { ctx =>
ctx.complete("Received GET")
} ~
complete("Received something else")
下面列出一些Directive的組合例子:
val route: Route =
path("order" / IntNumber) { id =>
get {
complete {
"Received GET request for order " + id
}
} ~
put {
complete {
"Received PUT request for order " + id
}
}
}
def innerRoute(id: Int): Route =
get {
complete {
"Received GET request for order " + id
}
} ~
put {
complete {
"Received PUT request for order " + id
}
}
val route: Route = path("order" / IntNumber) { id => innerRoute(id) }
val route =
path("order" / IntNumber) { id =>
(get | put) { ctx =>
ctx.complete(s"Received ${ctx.request.method.name} request for order $id")
}
}
val route =
path("order" / IntNumber) { id =>
(get | put) {
extractMethod { m =>
complete(s"Received ${m.name} request for order $id")
}
}
}
val getOrPut = get | put
val route =
path("order" / IntNumber) { id =>
getOrPut {
extractMethod { m =>
complete(s"Received ${m.name} request for order $id")
}
}
}
val route =
(path("order" / IntNumber) & getOrPut & extractMethod) { (id, m) =>
complete(s"Received ${m.name} request for order $id")
}
val orderGetOrPutWithMethod =
path("order" / IntNumber) & (get | put) & extractMethod
val route =
orderGetOrPutWithMethod { (id, m) =>
complete(s"Received ${m.name} request for order $id")
}
上面例子裏的~ & | 定義如下:
object RouteConcatenation extends RouteConcatenation {
class RouteWithConcatenation(route: Route) {
/**
* Returns a Route that chains two Routes. If the first Route rejects the request the second route is given a
* chance to act upon the request.
*/
def ~(other: Route): Route = { ctx ⇒
import ctx.executionContext
route(ctx).fast.flatMap {
case x: RouteResult.Complete ⇒ FastFuture.successful(x)
case RouteResult.Rejected(outerRejections) ⇒
other(ctx).fast.map {
case x: RouteResult.Complete ⇒ x
case RouteResult.Rejected(innerRejections) ⇒ RouteResult.Rejected(outerRejections ++ innerRejections)
}
}
}
}
}
/**
* Joins two directives into one which runs the second directive if the first one rejects.
*/
def |[R >: L](that: Directive[R]): Directive[R] =
recover(rejections ⇒ directives.BasicDirectives.mapRejections(rejections ++ _) & that)(that.ev)
/**
* Joins two directives into one which extracts the concatenation of its base directive extractions.
* NOTE: Extraction joining is an O(N) operation with N being the number of extractions on the right-side.
*/
def &(magnet: ConjunctionMagnet[L]): magnet.Out = magnet(this)
我們可以從上面這些示範例子得出結論:Directive的組合能力是routing DSL的核心。來看看Directive的組合能力是如何實現的。Directive類定義如下:
//#basic
abstract class Directive[L](implicit val ev: Tuple[L]) {
/**
* Calls the inner route with a tuple of extracted values of type `L`.
*
* `tapply` is short for "tuple-apply". Usually, you will use the regular `apply` method instead,
* which is added by an implicit conversion (see `Directive.addDirectiveApply`).
*/
def tapply(f: L ⇒ Route): Route
...
}
/**
* Constructs a directive from a function literal.
*/
def apply[T: Tuple](f: (T ⇒ Route) ⇒ Route): Directive[T] =
new Directive[T] { def tapply(inner: T ⇒ Route) = f(inner) }
/**
* A Directive that always passes the request on to its inner route (i.e. does nothing).
*/
val Empty: Directive0 = Directive(_(()))
...
implicit class SingleValueModifiers[T](underlying: Directive1[T]) extends AnyRef {
def map[R](f: T ⇒ R)(implicit tupler: Tupler[R]): Directive[tupler.Out] =
underlying.tmap { case Tuple1(value) ⇒ f(value) }
def flatMap[R: Tuple](f: T ⇒ Directive[R]): Directive[R] =
underlying.tflatMap { case Tuple1(value) ⇒ f(value) }
def require(predicate: T ⇒ Boolean, rejections: Rejection*): Directive0 =
underlying.filter(predicate, rejections: _*).tflatMap(_ ⇒ Empty)
def filter(predicate: T ⇒ Boolean, rejections: Rejection*): Directive1[T] =
underlying.tfilter({ case Tuple1(value) ⇒ predicate(value) }, rejections: _*)
}
}
註意implicit ev: Tuple[L]是給compiler的證例,它要求Tuple[L]存在於可視域。Akka-http提供了所有22個TupleXX[L]的隱形實例。再註意implicit class singleValueModifiers[T]:它提供了多層Directive的自動展平,能夠實現下面的自動轉換結果:
Directive1[T] = Directive[Tuple1[T]]
Directive1[Tuple2[M,N]] = Directive[Tuple1[Tuple2[M,N]]] = Directive[Tuple2[M,N]]
Directive1[Tuple3[M,N,G]] = ... = Directive[Tuple3[M,N,G]]
Directive1[Tuple4[M1,M2,M3,M4]] = ... = Directive[Tuple4[M1,M2,M3,M4]]
...
Directive1[Unit] = Directive0
Directive1,Directive0:
type Directive0 = Directive[Unit]
type Directive1[T] = Directive[Tuple1[T]]
任何類型值到Tuple的自動轉換是通過Tupler類實現的:
/**
* Provides a way to convert a value into an Tuple.
* If the value is already a Tuple then it is returned unchanged, otherwise it‘s wrapped in a Tuple1 instance.
*/
trait Tupler[T] {
type Out
def OutIsTuple: Tuple[Out]
def apply(value: T): Out
}
object Tupler extends LowerPriorityTupler {
implicit def forTuple[T: Tuple]: Tupler[T] { type Out = T } =
new Tupler[T] {
type Out = T
def OutIsTuple = implicitly[Tuple[Out]]
def apply(value: T) = value
}
}
private[server] abstract class LowerPriorityTupler {
implicit def forAnyRef[T]: Tupler[T] { type Out = Tuple1[T] } =
new Tupler[T] {
type Out = Tuple1[T]
def OutIsTuple = implicitly[Tuple[Out]]
def apply(value: T) = Tuple1(value)
}
}
我的理解是:Route裏Directive的主要功能可以分成兩部分:一是如程序菜單揀選,二是對Request,Response,Entity的讀寫。我們把第二項功能放在以後的討論裏,下面就提供一些RestApi的菜單揀選樣例:
trait UsersApi extends JsonMappings{
val usersApi =
(path("users") & get ) {
complete (UsersDao.findAll.map(_.toJson))
}~
(path("users"/IntNumber) & get) { id =>
complete (UsersDao.findById(id).map(_.toJson))
}~
(path("users") & post) { entity(as[User]) { user =>
complete (UsersDao.create(user).map(_.toJson))
}
}~
(path("users"/IntNumber) & put) { id => entity(as[User]) { user =>
complete (UsersDao.update(user, id).map(_.toJson))
}
}~
(path("users"/IntNumber) & delete) { userId =>
complete (UsersDao.delete(userId).map(_.toJson))
}
}
trait CommentsApi extends JsonMappings{
val commentsApi =
(path("users"/IntNumber/"posts"/IntNumber/"comments") & get ) {(userId, postId) =>
complete (CommentsDao.findAll(userId, postId).map(_.toJson))
}~
(path("users"/IntNumber/"posts"/IntNumber/"comments"/IntNumber) & get) { (userId, postId, commentId) =>
complete (CommentsDao.findById(userId, postId, commentId).map(_.toJson))
}~
(path("comments") & post) { entity(as[Comment]) { comment =>
complete (CommentsDao.create(comment).map(_.toJson))
}
}~
(path("users"/IntNumber/"posts"/IntNumber/"comments"/IntNumber) & put) { (userId, postId, commentId) => entity(as[Comment]) { comment =>
complete (CommentsDao.update(comment, commentId).map(_.toJson))
}
}~
(path("comments"/IntNumber) & delete) { commentId =>
complete (CommentsDao.delete(commentId).map(_.toJson))
}
}
trait PostsApi extends JsonMappings{
val postsApi =
(path("users"/IntNumber/"posts") & get){ userId =>
complete (PostsDao.findUserPosts(userId).map(_.toJson))
}~
(path("users"/IntNumber/"posts"/IntNumber) & get) { (userId,postId) =>
complete (PostsDao.findByUserIdAndId(userId, postId).map(_.toJson))
}~
(path("users"/IntNumber/"posts") & post) { userId => entity(as[Post]) { post =>
complete (PostsDao.create(post).map(_.toJson))
}}~
(path("users"/IntNumber/"posts"/IntNumber) & put) { (userId, id) => entity(as[Post]) { post =>
complete (PostsDao.update(post, id).map(_.toJson))
}}~
(path("users"/IntNumber/"posts"/IntNumber) & delete) { (userId, postId) =>
complete (PostsDao.delete(postId).map(_.toJson))
}
}
val routes =
pathPrefix("v1") {
usersApi ~
postsApi ~
commentsApi
} ~ path("")(getFromResource("public/index.html"))
Akka(30): Http:High-Level-Api,Routing DSL