1. 程式人生 > >Akka(4): Routers - 智能任務分配

Akka(4): Routers - 智能任務分配

相同 pac 線程 文件內容 fun bool fib can ceil

Actor模式最大的優點就是每個Actor都是一個獨立的任務運算器。這種模式讓我們很方便地把一項大型的任務分割成若幹細小任務然後分配給不同的Actor去完成。優點是在設計時可以專註實現每個Actor的功能,在實際運算時由於每個Actor都在獨立的線程裏運行,能充分利用多核CPU的優勢實現相似並行運算的效率。我們同樣可以把一個獨立的功能按不同的輸入分配給多個Actor去完成以達到同樣的效率提高目的,這就是Akka的Routing模式了。Routing模式的特點是所有運算Actor的運算邏輯都是相同的,分別對不同的輸入進行相同的運算。不過我們應該知道運算結果的順序是無法預計的,畢竟Actor模式是典型的無序運算。Routing模式由Router和Routee組成:Routee是負責具體運算的Actor(因為運算邏輯必須在Actor的receive裏實現),Router的主要功能是把外界發來的運算指令按照某種指定的方式分配給Routee去運算。可以說Router不是標準的Actor,因為它不需要實現任何其它的功能,基本功能是預設嵌入的。Router的信箱直接代表了任務分配邏輯,與標準Actor逐個運算信箱中消息相比,能大大提高任務分配效率。Akka自帶許多現成的任務分配模式,以不同的算法來滿足不同的任務分配要求。這些算法的配置可以在配置文件或者代碼中定義。Router又可分Pool和Group兩種模式:在Router-Pool模式中Router負責構建所有的Routee。如此所有Routee都是Router的直屬子級Actor,可以實現Router對Routees的直接監管。由於這種直接的監管關系,Router-Pool又可以按運算負載自動增減Routee,能更有效地分配利用計算資源。Router-Group模式中的Routees由外界其它Actor產生,特點是能實現靈活的Routee構建和監控,可以用不同的監管策略來管理一個Router下的Routees,比如可以使用BackoffSupervisor。從另一方面來講,Router-Group的缺點是Routees的構建和管理復雜化了,而且往往需要人為幹預。

下面我們先做個示範:

import akka.actor._
import akka.routing._
import scala.annotation.tailrec

object FibonacciRoutee {
  case class FibonacciNumber(nbr: Int)
  def props = Props[FibonacciRoutee]
}
class FibonacciRoutee extends Actor with ActorLogging {
 import FibonacciRoutee._

  override def receive: Receive = {
    
case FibonacciNumber(nbr) => val answer = fibonacci(nbr) log.info(s"${self.path.name}‘s answer: Fibonacci($nbr)=$answer") } private def fibonacci(n: Int): Int = { @tailrec def fib(n: Int, b: Int, a: Int): Int = n match { case 0 => a case _ => fib(n
- 1, a + b, b) } fib(n, 1, 0) } } object RouterDemo extends App { import FibonacciRoutee._ val routingSystem = ActorSystem("routingSystem") val router = routingSystem.actorOf( FromConfig.props(FibonacciRoutee.props) ,"balance-pool-router") router ! FibonacciNumber(10) router ! FibonacciNumber(13) router ! FibonacciNumber(15) router ! FibonacciNumber(17) scala.io.StdIn.readLine() routingSystem.terminate() }

在這個例子裏我們用3個Routees來根據指示計算Fibonacci。FibonacciRoutee只有一項功能:就是按輸入計算Fibonacci數。我們看到,Router構建過程十分簡單。在我們的例子裏只需要讀出配置文件內容就可以了。balance-pool-router是配置文件裏的一個定義項:

akka {
  prio-dispatcher {
    mailbox-type = "PriorityMailbox"
  }
  actor {
    deployment {
      /balance-pool-router {
        router = balancing-pool
        nr-of-instances = 3
        pool-dispatcher {
          executor = "fork-join-executor"
          # Configuration for the fork join pool
          fork-join-executor {
            # Min number of threads to cap factor-based parallelism number to
            parallelism-min = 3
            # Parallelism (threads) ... ceil(available processors * factor)
            parallelism-factor = 2.0
            # Max number of threads to cap factor-based parallelism number to
            parallelism-max = 3
          }
          # Throughput defines the maximum number of messages to be
          # processed per actor before the thread jumps to the next actor.
          # Set to 1 for as fair as possible.
          throughput = 1
        }
      }
    }
  }

}

Routing模式設置的完整標識是akka.actor.deployment{/balance-pool-router}。完成構建router後我們直接向router發送計算指令,運算結果如下:

[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-5] [akka://routingSystem/user/balance-pool-router/$b] $b‘s answer: Fibonacci(13)=233
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-7] [akka://routingSystem/user/balance-pool-router/$a] $a‘s answer: Fibonacci(10)=55
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-6] [akka://routingSystem/user/balance-pool-router/$c] $c‘s answer: Fibonacci(15)=610
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-6] [akka://routingSystem/user/balance-pool-router/$c] $c‘s answer: Fibonacci(17)=1597

我們看到,router按配置自動構建了3個FibonacciRoutee。Routee的構建過程是無法人工幹預的。向router發送的計算指令被分配給b,a,c,c去運算了。從顯示順序可以證明每個參與的Actor占用運算時間不同,產生了無序的運算結果。

下面我們在Routee裏加一個延遲效應。這樣運算結果顯示會更自然些:

object FibonacciRoutee {
  case class FibonacciNumber(nbr: Int, msDelay: Int)  //增加延遲參數
  case class GetAnswer(nbr: Int)

  class RouteeException extends Exception

  def props = Props[FibonacciRoutee]
}
class FibonacciRoutee extends Actor with ActorLogging {
 import FibonacciRoutee._
 import context.dispatcher

  override def receive: Receive = {
    case FibonacciNumber(nbr,ms) =>
      context.system.scheduler.scheduleOnce(ms second,self,GetAnswer(nbr))
    case GetAnswer(nbr) =>
      if (Random.nextBoolean())
        throw new RouteeException
      else {
        val answer = fibonacci(nbr)
        log.info(s"${self.path.name}‘s answer: Fibonacci($nbr)=$answer")
      }
  }
  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ =>
        fib(n - 1, a + b, b)
    }
    fib(n, 1, 0)
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting ${self.path.name} on ${reason.getMessage}")
    message foreach {m => self ! m}
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable): Unit = {
    log.info(s"Restarted ${self.path.name} on ${reason.getMessage}")
    super.postRestart(reason)
  }

  override def postStop(): Unit = {
    log.info(s"Stopped ${self.path.name}!")
    super.postStop()
  }

}

因為在Actor內部不能使用Thread.sleep,所以我們用了個scheduleOnce在延遲時間後向自己發送一個喚醒消息。註意,scheduleOnce是無阻塞non-blocking代碼,調用後程序不會停留等待計劃動作。在上面修改後的代碼裏增加了監管策略SupervisorStrategy的使用測試。Router的默認監管策略是Esculate,即把某個Routee發生的異常提交給Router的直屬父級處理。如果Router直屬父級對Routee異常的處理方式是重啟的話,那麽首先重啟Router,然後是作為直屬子級的所有Routees都會被重啟,結果並不是我們想要的。所以必須人為的設定Router監管策略。由於Router的SupervisorStrategy無法在設置文件中定義,所以這次我們只有用代碼方式來設置routing模式了:

object RouterDemo extends App {
  import FibonacciRoutee._
  import scala.concurrent.ExecutionContext.Implicits.global
  val routingSystem = ActorSystem("routingSystem")
  /* cannot set SupervisorStrategy in config file
  val router = routingSystem.actorOf(
    FromConfig.props(FibonacciRoutee.props)
    ,"balance-pool-router")
    */
  val routingDecider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: RouteeException => SupervisorStrategy.Restart
  }
  val routerSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
    routingDecider.orElse(SupervisorStrategy.defaultDecider)
  )
  val router = routingSystem.actorOf(
    BalancingPool(nrOfInstances = 3
      ,supervisorStrategy=routerSupervisorStrategy    //set SupervisorStrategy here
      ).withDispatcher("akka.pool-dispatcher")
      .props(FibonacciRoutee.props)
    ,"balance-pool-router"
  )

  router ! FibonacciNumber(10,5)
  router ! FibonacciNumber(13,2)
  router ! FibonacciNumber(15,3)
  router ! FibonacciNumber(17,1)

  scala.io.StdIn.readLine()

  routingSystem.terminate()

}

註意:我們在FibonacciRoutee的preRestart接口中增加了向自己補發產生異常消息的過程。運算結果顯示:雖然出現了多次異常,router重啟了f發生異常的Routee,所有消息都得到了處理。

Akka中有些routing模式支持Router-Pool Routee的自動增減。由於BalancingPool不支持此項功能,下面我們就用RoundRobinPool來做個示範。由於需要定義監管策略,只有在代碼中設置Resizer了:

 val resizer = DefaultResizer(
    lowerBound = 2, upperBound = 5, pressureThreshold = 1
    ,rampupRate = 1, backoffRate = 0.25
    ,backoffThreshold = 0.25, messagesPerResize = 1
  )
  val router = routingSystem.actorOf(
    RoundRobinPool(nrOfInstances = 2
    ,resizer = Some(resizer)
    ,supervisorStrategy = routerSupervisorStrategy)
      .props(FibonacciRoutee.props)
    ,"roundrobin-pool-router"
  )

以上resizer設置為:Routee最少2個,可以自動增加到5個。運行後routingSystem自動增加了兩個Routee: c,d。

下面是本次示範的完整源代碼:

import akka.actor._
import akka.routing._
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.util.Random

object FibonacciRoutee {
  case class FibonacciNumber(nbr: Int, msDelay: Int)  //增加延遲參數
  case class GetAnswer(nbr: Int)

  class RouteeException extends Exception

  def props = Props[FibonacciRoutee]
}
class FibonacciRoutee extends Actor with ActorLogging {
 import FibonacciRoutee._
 import context.dispatcher

  override def receive: Receive = {
    case FibonacciNumber(nbr,ms) =>
      context.system.scheduler.scheduleOnce(ms second,self,GetAnswer(nbr))
    case GetAnswer(nbr) =>
      if (Random.nextBoolean())
        throw new RouteeException
      else {
        val answer = fibonacci(nbr)
        log.info(s"${self.path.name}‘s answer: Fibonacci($nbr)=$answer")
      }
  }
  private def fibonacci(n: Int): Int = {
    @tailrec
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ =>
        fib(n - 1, a + b, b)
    }
    fib(n, 1, 0)
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting ${self.path.name} on ${reason.getMessage}")
    message foreach {m => self ! m}
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable): Unit = {
    log.info(s"Restarted ${self.path.name} on ${reason.getMessage}")
    super.postRestart(reason)
  }

  override def postStop(): Unit = {
    log.info(s"Stopped ${self.path.name}!")
    super.postStop()
  }

}
object RouterDemo extends App {
  import FibonacciRoutee._
  import scala.concurrent.ExecutionContext.Implicits.global
  val routingSystem = ActorSystem("routingSystem")
  /* cannot set SupervisorStrategy in config file
  val router = routingSystem.actorOf(
    FromConfig.props(FibonacciRoutee.props)
    ,"balance-pool-router")
    */
  val routingDecider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: RouteeException => SupervisorStrategy.Restart
  }
  val routerSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
    routingDecider.orElse(SupervisorStrategy.defaultDecider)
  )
  /* does not support resizing routees
  val router = routingSystem.actorOf(
    BalancingPool(nrOfInstances = 3
      ,supervisorStrategy=routerSupervisorStrategy    //set SupervisorStrategy here
      ).withDispatcher("akka.pool-dispatcher")
      .props(FibonacciRoutee.props)
    ,"balance-pool-router"
  ) */

  val resizer = DefaultResizer(
    lowerBound = 2, upperBound = 5, pressureThreshold = 1
    ,rampupRate = 1, backoffRate = 0.25
    ,backoffThreshold = 0.25, messagesPerResize = 1
  )
  val router = routingSystem.actorOf(
    RoundRobinPool(nrOfInstances = 2
    ,resizer = Some(resizer)
    ,supervisorStrategy = routerSupervisorStrategy)
      .props(FibonacciRoutee.props)
    ,"roundrobin-pool-router"
  )

  router ! FibonacciNumber(10,5)
  router ! FibonacciNumber(13,2)
  router ! FibonacciNumber(15,3)
  router ! FibonacciNumber(17,1)
  router ! FibonacciNumber(27,1)
  router ! FibonacciNumber(37,1)
  router ! FibonacciNumber(47,1)

  scala.io.StdIn.readLine()

  routingSystem.terminate()

}

Akka(4): Routers - 智能任務分配