Akka-Cluster(5)- load-balancing with backoff-supervised stateless computation - 無狀態任務叢集節點均衡分配
分散式程式運算是一種水平擴充套件(scale-out)運算模式,其核心思想是能夠充分利用伺服器叢集中每個伺服器節點的計算資源,包括:CPU、記憶體、硬碟、IO匯流排等。首先對計算任務進行分割,然後把細分的任務分派給各節點去運算。細分的任務相互之間可以有關聯或者各自為獨立運算,使用akka-cluster可以把任務按照各節點運算資源的負載情況進行均勻的分配,從而達到資源的合理充分利用以實現運算效率最大化的目的。如果一項工作可以被分割成多個獨立的運算任務,那麼我們只需要關注如何合理地對細分任務進行分配以實現叢集節點的負載均衡,這實際上是一種對無需維護內部狀態的運算任務的分配方式:fire and forget。由於承擔運算任務的目標actor具體的部署位置是由演算法決定的,所以我們一般不需要控制指定的actor或者讀取它的內部狀態。當然,如果需要的話我們還是可以通過嵌入訊息的方式來實現這樣的功能。
叢集節點負載均衡是一種任務中央分配方式,其實是在叢集環境下的router/routees運算模式,只是現在的router可以把任務傳送給跨伺服器上的actor。當然,任務分派是通過演算法實現的,包括所有普通router的routing演算法如:round-robin, random等等。 akka提供了一種基於節點運算資源負載的演算法,在配置檔案中定義:
akka.extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ]
下面的例子可以提供metrics基本作用的解釋:
akka.actor.deployment { /frontend/dispatcher = { # Router type provided by metrics extension. router = cluster-metrics-adaptive-group # Router parameter specific for metrics extension. # metrics-selector = heap # metrics-selector = load # metrics-selector = cpu metrics-selector = mix # routees.paths = ["/user/backend"] cluster { enabled = on use-role = backend allow-local-routees = off } } }
dispatcher代表router, backend/目錄下的actor代表routees。
假如我們把一個大型的資料處理程式分割成多個獨立的資料庫操作。為了保證每項操作都能在任何情況下安全進行,包括出現異常,我們可以用BackoffSupervisor來支援負責操作的actor,如下:
val supervisor = BackoffSupervisor.props( Backoff.onFailure( // Backoff.OnStop childProps = workerProps(client), childName = "worker", minBackoff = 1 second, maxBackoff = 10 seconds, randomFactor = 0.20 ).withAutoReset(resetBackoff = 5 seconds) .withSupervisorStrategy( OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)( decider.orElse(SupervisorStrategy.defaultDecider) ) ) )
在這裡要特別註明一下Backoff.OnFailure和Backoff.OnStop的使用場景和作用,這部分與官方文件有些出入。首先,這兩種方法都不會造成childActor的重啟動作(restart),而是重新建立並啟動一個新的例項。具體情況請參考下面測試程式的輸出:
package my.akka
import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props}
import akka.pattern.{Backoff, BackoffSupervisor, ask}
import scala.concurrent.Await
import scala.concurrent.duration._
class Child extends Actor {
println(s"[Child]: created. (path = ${this.self.path}, instance = ${this})")
override def preStart(): Unit = {
println(s"[Child]: preStart called. (path = ${this.self.path}, instance = ${this})")
super.preStart()
}
override def postStop(): Unit = {
println(s"[Child]: postStop called. (path = ${this.self.path}, instance = ${this})")
super.postStop()
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
println(s"[Child]: preRestart called with ($reason, $message). (path = ${this.self.path}, instance = ${this})")
super.preRestart(reason, message)
}
override def postRestart(reason: Throwable): Unit = {
println(s"[Child]: postRestart called with ($reason). (path = ${this.self.path}, instance = ${this})")
super.postRestart(reason)
}
def receive = {
case "boom" =>
throw new Exception("kaboom")
case "get ref" =>
sender() ! self
case a: Any =>
println(s"[Child]: received ${a}")
}
}
object Child {
def props: Props
= Props(new Child)
def backOffOnFailureProps: Props
= BackoffSupervisor.props(
Backoff.onFailure(
Child.props,
childName = "myEcho",
minBackoff = 1.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
))
def backOffOnStopProps: Props
= BackoffSupervisor.props(
Backoff.onStop(
Child.props,
childName = "myEcho",
minBackoff = 1.seconds,
maxBackoff = 10.seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
))
}
object BackoffSuperVisorApp {
def defaultSuperVisorCase(): Unit = {
println(
"""
|default ---------------------------
""".stripMargin)
val system = ActorSystem("app")
try{
/**
* Let's see if "hello" message is received by the child
*/
val child = system.actorOf(Child.props, "child")
Thread.sleep(100)
child ! "hello"
//[Child]: received hello
/**
* Now restart the child with an exception within its receive method
* and see if the `child` ActorRef is still valid (i.e. ActorRef incarnation remains same)
*/
child ! "boom"
Thread.sleep(200)
child ! "hello after normal exception"
//[Child]: received hello after normal exception
/**
* PoisonPill causes the child actor to `Stop`, different from restart.
* The ActorRef incarnation gets updated.
*/
child ! PoisonPill
Thread.sleep(200)
/**
* This causes delivery to deadLetter, since the "incarnation" of ActorRef `child` became obsolete
* after child is "Stopped"
*
* An incarnation is tied to an ActorRef (NOT to its internal actor instance)
* and the same incarnation means "you can keep using the same ActorRef"
*/
child ! "hello after PoisonPill"
// [akka://app/user/parent/child-1] Message [java.lang.String] without sender to Actor[akka://app/user/child#-767539042]
// was not delivered. [1] dead letters encountered.
Thread.sleep(200)
}
finally{
system.terminate()
Thread.sleep(500)
}
}
def backOffOnStopCase(): Unit ={
println(
"""
|backoff onStop ---------------------------
""".stripMargin)
val system = ActorSystem("app")
try{
/**
* Let's see if "hello" message is forwarded to the child
* by the backoff supervisor onStop
*/
implicit val futureTimeout: akka.util.Timeout = 1.second
val backoffSupervisorActor = system.actorOf(Child.backOffOnStopProps, "child")
Thread.sleep(100)
backoffSupervisorActor ! "hello to backoff supervisor" //forwarded to child
//[Child]: received hello to backoff supervisor
/**
* Now "Restart" the child with an exception from its receive method.
* As with the default supervisory strategy, the `child` ActorRef remains valid. (i.e. incarnation kept same)
*/
val child = Await.result(backoffSupervisorActor ? "get ref", 1.second).asInstanceOf[ActorRef]
child ! "boom"
Thread.sleep(2000)
child ! "hello to child after normal exception"
//[Child]: received hello to child after normal exception
/**
* Backoff Supervisor can still forward the message
*/
backoffSupervisorActor ! "hello to backoffSupervisorActor after normal exception"
//[Child]: received hello to backoffSupervisorActor after normal exception
Thread.sleep(200)
/**
* PoisonPill causes the child actor to `Stop`, different from restart.
* The `child` ActorRef incarnation gets updated.
*/
child ! PoisonPill
Thread.sleep(2000)
child ! "hello to child ref after PoisonPill"
//delivered to deadLetters
/**
* Backoff Supervisor can forward the message to its child with the new incarnation
*/
backoffSupervisorActor ! "hello to backoffSupervisorActor after PoisonPill"
//[Child]: received hello to backoffSupervisorActor after PoisonPill
Thread.sleep(200)
}
finally{
system.terminate()
Thread.sleep(500)
}
}
def backOffOnFailureCase(): Unit ={
println(
"""
|backoff onFailure ---------------------------
""".stripMargin)
val system = ActorSystem("app")
try{
/**
* Let's see if "hello" message is forwarded to the child
* by the backoff supervisor onFailure
*/
implicit val futureTimeout: akka.util.Timeout = 1.second
val backoffSupervisorActor = system.actorOf(Child.backOffOnFailureProps, "child")
Thread.sleep(100)
backoffSupervisorActor ! "hello to backoff supervisor" //forwarded to child
//[Child]: received hello to backoff supervisor
/**
* Now "Stop" the child with an exception from its receive method.
* You'll see the difference between "Restart" and "Stop" from here:
*/
val child = Await.result(backoffSupervisorActor ? "get ref", 1.second).asInstanceOf[ActorRef]
child ! "boom"
Thread.sleep(2000)
/**
* Note that this is after normal exception, not after PoisonPill,
* but child is completely "Stopped" and its ActorRef "incarnation" became obsolete
*
* So, the message to the `child` ActorRef is delivered to deadLetters
*/
child ! "hello to child after normal exception"
//causes delivery to deadLetter
/**
* Backoff Supervisor can still forward the message to the new child ActorRef incarnation
*/
backoffSupervisorActor ! "hello to backoffSupervisorActor after normal exception"
//[Child]: received hello to backoffSupervisorActor after normal exception
/**
* You can get a new ActorRef which represents the new incarnation
*/
val newChildRef = Await.result(backoffSupervisorActor ? "get ref", 1.second).asInstanceOf[ActorRef]
newChildRef ! "hello to new child ref after normal exception"
//[Child]: received hello to new child ref after normal exception
Thread.sleep(200)
/**
* No matter whether the supervisory strategy is default or backoff,
* PoisonPill causes the actor to "Stop", not "Restart"
*/
newChildRef ! PoisonPill
Thread.sleep(3000)
newChildRef ! "hello to new child ref after PoisonPill"
//delivered to deadLetters
Thread.sleep(200)
}
finally{
system.terminate()
Thread.sleep(500)
}
}
def main(args: Array[String]): Unit ={
defaultSuperVisorCase()
backOffOnStopCase()
backOffOnFailureCase()
}
}
OnStop:不響應child-actor發生的異常,採用SupervisorStrategy異常處理方式。對正常停止動作,如PoisonPill, context.stop作用:重新構建新的例項並啟動。
OnFailure:不響應child-actor正常停止,任其終止。發生異常時重新構建新的例項並啟動。
很明顯,通常我們需要在運算髮生異常時重新啟動運算,所以用OnFailure才是正確的選擇。
下面是我之前介紹關於BackoffSupervisor時用的一個例子的程式碼示範:
package backoffSupervisorDemo
import akka.actor._
import akka.pattern._
import backoffSupervisorDemo.InnerChild.TestMessage
import scala.concurrent.duration._
object InnerChild {
case class TestMessage(msg: String)
class ChildException extends Exception
def props = Props[InnerChild]
}
class InnerChild extends Actor with ActorLogging {
import InnerChild._
override def receive: Receive = {
case TestMessage(msg) => //模擬子級功能
log.info(s"Child received message: ${msg}")
}
}
object Supervisor {
def props: Props = { //在這裡定義了監管策略和child Actor構建
def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
case _: InnerChild.ChildException => SupervisorStrategy.Restart
}
val options = Backoff.onFailure(InnerChild.props, "innerChild", 1 second, 5 seconds, 0.0)
.withManualReset
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
decider.orElse(SupervisorStrategy.defaultDecider)
)
)
BackoffSupervisor.props(options)
}
}
//注意:下面是Supervisor的父級,不是InnerChild的父級
object ParentalActor {
case class SendToSupervisor(msg: InnerChild.TestMessage)
case class SendToInnerChild(msg: InnerChild.TestMessage)
case class SendToChildSelection(msg: InnerChild.TestMessage)
def props = Props[ParentalActor]
}
class ParentalActor extends Actor with ActorLogging {
import ParentalActor._
//在這裡構建子級Actor supervisor
val supervisor = context.actorOf(Supervisor.props,"supervisor")
supervisor ! BackoffSupervisor.getCurrentChild //要求supervisor返回當前子級Actor
var innerChild: Option[ActorRef] = None //返回的當前子級ActorRef
val selectedChild = context.actorSelection("/user/parent/supervisor/innerChild")
override def receive: Receive = {
case BackoffSupervisor.CurrentChild(ref) => //收到子級Actor資訊
innerChild = ref
case SendToSupervisor(msg) => supervisor ! msg
case SendToChildSelection(msg) => selectedChild ! msg
case SendToInnerChild(msg) => innerChild foreach(child => child ! msg)
}
}
object BackoffSupervisorDemo extends App {
import ParentalActor._
val testSystem = ActorSystem("testSystem")
val parent = testSystem.actorOf(ParentalActor.props,"parent")
Thread.sleep(1000) //wait for BackoffSupervisor.CurrentChild(ref) received
parent ! SendToSupervisor(TestMessage("Hello message 1 to supervisor"))
parent ! SendToInnerChild(TestMessage("Hello message 2 to innerChild"))
parent ! SendToChildSelection(TestMessage("Hello message 3 to selectedChild"))
scala.io.StdIn.readLine()
testSystem.terminate()
}
好了,現在我們就開始實現一個在叢集中進行資料庫操作的例子,看看akka-cluster是如何把一串操作分派給各節點上去操作的。首先是這個Worker:
import akka.actor._
import scala.concurrent.duration._
object Backend {
case class SaveFormula(op1: Int, op2: Int)
def workerProps = Props(new Worker)
}
class Worker extends Actor with ActorLogging {
import Backend._
context.setReceiveTimeout(500 milliseconds)
override def receive: Receive = {
case SaveFormula(op1,op2) => {
val res = op1 * op2
// saveToDB(op1,op2,res)
log.info(s"******* $op1 X $op2 = $res save to DB by $self *******")
}
case ReceiveTimeout =>
log.info(s"******* $self receive timout! *******")
throw new RuntimeException("Worker idle timeout!")
}
}
這應該是一個最普通的actor了。我們把它放在一個BackoffSupervisor下面:
def superProps: Props = {
def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
case _: DBException => SupervisorStrategy.Restart
}
val options = Backoff.onFailure(
childProps = workerProps,
childName = "worker",
minBackoff = 1 second,
maxBackoff = 5 seconds,
randomFactor = 0.20
).withAutoReset(resetBackoff = 10 seconds)
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
decider.orElse(SupervisorStrategy.defaultDecider)
)
)
BackoffSupervisor.props(options)
}
def create(port: Int): Unit = {
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
.withFallback(ConfigFactory.parseString(s"akka.cluster.roles=[backend]"))
.withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem", config)
val Backend = system.actorOf(superProps,"backend")
}
下面是負責分配任務的router,或者前端frontend的定義:
import akka.actor._
import akka.routing._
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.util._
import akka.cluster._
object Frontend {
private var _frontend: ActorRef = _
case class Multiply(op1: Int, op2: Int)
def create(port: Int) = {
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.posrt=$port")
.withFallback(ConfigFactory.parseString(s"akka.cluster.roles=[frontend]"))
.withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem",config)
Cluster(system).registerOnMemberUp{
_frontend = system.actorOf(Props[Frontend],"frontend")
}
system.actorOf(Props[Frontend],"frontend")
}
def getFrontend = _frontend
}
class Frontend extends Actor with ActorLogging {
import Frontend._
import Backend._
import context.dispatcher
//just lookup routees, routing strategy is responsible for deployment
val backend = context.actorOf(FromConfig.props(/* Props.empty */),"dispatcher")
context.system.scheduler.schedule(3.seconds, 3.seconds, self,
Multiply(Random.nextInt(100), Random.nextInt(100)))
override def receive: Receive = {
case Multiply(op1,op2) =>
backend ! SaveFormula(op1,op2)
case msg @ _ =>
log.info(s"******* unrecognized message: $msg! ******")
}
}
我們需要在Frontend裡構建Backend。但是,Backend actor 即routees ,我們已經在Backend構建時進行了部署,所以在這裡只需要用FromConfig.props(Props.empty)能lookup routees就可以了,不需要重複部署。
下面是具體的資料庫儲存操作示範:
def superProps: Props = {
def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
case _: DBException => SupervisorStrategy.Restart
}
val clientSettings: MongoClientSettings = MongoClientSettings.builder()
.applyToClusterSettings {b =>
b.hosts(List(new ServerAddress("localhost:27017")).asJava)
}.build()
val client: MongoClient = MongoClient(clientSettings)
val options = Backoff.onFailure(
childProps = workerProps(client),
childName = "worker",
minBackoff = 1 second,
maxBackoff = 10 seconds,
randomFactor = 0.20
).withAutoReset(resetBackoff = 5 seconds)
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
decider.orElse(SupervisorStrategy.defaultDecider)
)
)
BackoffSupervisor.props(options)
}
注意,我們是在superProps裡做資料庫的連線的。這樣Backend在例項化或者因為某種原因重啟的話,特別是換了另一個JVM時可以正確的構建MongoClient。資料庫操作是標準的MongoEngine方式:
import monix.execution.Scheduler.Implicits.global
implicit val mongoClient = client;
val ctx = MGOContext("testdb","mulrecs")
def saveToDB(op1: Int, op2: Int, by: String) = {
val doc = Document("by" -> by, "op1" -> op1, "op2" -> op2, "res" -> op1 * op2)
val cmd = ctx.setCommand(MGOCommands.Insert(Seq(doc)))
val task = mgoUpdate[Completed](cmd).toTask
task.runOnComplete {
case Success(s) => log.info("operations completed successfully.")
case Failure(exception) => log.error(s"error: ${exception.getMessage}")
}
}
資料庫操作是在另一個ExecutionContext裡進行的。
下面是本次示範的完整原始碼:
project/scalapb.sbt
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
libraryDependencies ++= Seq(
"com.thesamet.scalapb" %% "compilerplugin" % "0.7.4"
)
build.sbt
import scalapb.compiler.Version.scalapbVersion
import scalapb.compiler.Version.grpcJavaVersion
name := "cluster-load-balance"
version := "0.1"
scalaVersion := "2.12.8"
scalacOptions += "-Ypartial-unification"
libraryDependencies ++= {
val akkaVersion = "2.5.19"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-metrics" % akkaVersion,
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
//for mongodb 4.0
"org.mongodb.scala" %% "mongo-scala-driver" % "2.4.0",
"com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.20",
//other dependencies
"co.fs2" %% "fs2-core" % "0.9.7",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.typelevel" %% "cats-core" % "0.9.0",
"io.monix" %% "monix-execution" % "3.0.0-RC1",
"io.monix" %% "monix-eval" % "3.0.0-RC1"
)
}
PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value
)
resources/application.conf
akka {
actor {
provider = "cluster"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551",
"akka.tcp://[email protected]:2552"]
# auto-down-unreachable-after = 10s
}
}
akka.cluster.min-nr-of-members = 3
akka.cluster.role {
frontend.min-nr-of-members = 1
backend.min-nr-of-members = 2
}
akka.actor.deployment {
/frontend/dispatcher = {
# Router type provided by metrics extension.
router = cluster-metrics-adaptive-group
# Router parameter specific for metrics extension.
# metrics-selector = heap
# metrics-selector = load
# metrics-selector = cpu
metrics-selector = mix
#
routees.paths = ["/user/backend"]
cluster {
enabled = on
use-role = backend
allow-local-routees = off
}
}
}
protobuf/sdp.proto
syntax = "proto3";
import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";
option (scalapb.options) = {
// use a custom Scala package name
// package_name: "io.ontherocks.introgrpc.demo"
// don't append file name to package
flat_package: true
// generate one Scala file for all messages (services still get their own file)
single_file: true
// add imports to generated file
// useful when extending traits or using custom types
// import: "io.ontherocks.hellogrpc.RockingMessage"
// code to put at the top of generated file
// works only with `single_file: true`
//preamble: "sealed trait SomeSealedTrait"
};
package sdp.grpc.services;
message ProtoDate {
int32 yyyy = 1;
int32 mm = 2;
int32 dd = 3;
}
message ProtoTime {
int32 hh = 1;
int32 mm = 2;
int32 ss = 3;
int32 nnn = 4;
}
message ProtoDateTime {
ProtoDate date = 1;
ProtoTime time = 2;
}
message ProtoAny {
bytes value = 1;
}
protobuf/mgo.proto
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";
option (scalapb.options) = {
// use a custom Scala package name
// package_name: "io.ontherocks.introgrpc.demo"
// don't append file name to package
flat_package: true
// generate one Scala file for all messages (services still get their own file)
single_file: true
// add imports to generated file
// useful when extending traits or using custom types
// import: "io.ontherocks.hellogrpc.RockingMessage"
// code to put at the top of generated file
// works only with `single_file: true`
//preamble: "sealed trait SomeSealedTrait"
};
/*
* Demoes various customization options provided by ScalaPBs.
*/
package sdp.grpc.services;
import "misc/sdp.proto";
message ProtoMGOBson {
bytes bson = 1;
}
message ProtoMGODocument {
bytes document = 1;
}
message ProtoMGOResultOption { //FindObservable
int32 optType = 1;
ProtoMGOBson bsonParam = 2;
int32 valueParam = 3;
}
message ProtoMGOAdmin{
string tarName = 1;
repeated ProtoMGOBson bsonParam = 2;
ProtoAny options = 3;
string objName = 4;
}
message ProtoMGOContext { //MGOContext
string dbName = 1;
string collName = 2;
int32 commandType = 3;
repeated ProtoMGOBson bsonParam = 4;
repeated ProtoMGOResultOption resultOptions = 5;
repeated string targets = 6;
ProtoAny options = 7;
repeated ProtoMGODocument documents = 8;
google.protobuf.BoolValue only = 9;
ProtoMGOAdmin adminOptions = 10;
}
message ProtoMultiply {
int32 op1 = 1;
int32 op2 = 2;
}
Backend.scala
import akka.actor._
import com.typesafe.config.ConfigFactory
import akka.pattern._
import scala.concurrent.duration._
import sdp.grpc.services._
import org.mongodb.scala._
import sdp.mongo.engine.MGOClasses._
import sdp.mongo.engine.MGOEngine._
import sdp.result.DBOResult._
import scala.collection.JavaConverters._
import scala.util._
object Backend {
case class SaveFormula(op1: Int, op2: Int)
case class SavedToDB(res: Int)
class DBException(errmsg: String) extends Exception(errmsg)
def workerProps(client: MongoClient) = Props(new Worker(client))
def superProps: Props = {
def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
case _: DBException => SupervisorStrategy.Restart
}
val clientSettings: MongoClientSettings = MongoClientSettings.builder()
.applyToClusterSettings {b =>
b.hosts(List(new ServerAddress("localhost:27017")).asJava)
}.build()
val client: MongoClient = MongoClient(clientSettings)
val options = Backoff.onFailure(
childProps = workerProps(client),
childName = "worker",
minBackoff = 1 second,
maxBackoff = 10 seconds,
randomFactor = 0.20
).withAutoReset(resetBackoff = 5 seconds)
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
decider.orElse(SupervisorStrategy.defaultDecider)
)
)
BackoffSupervisor.props(options)
}
def create(port: Int): Unit = {
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
.withFallback(ConfigFactory.parseString(s"akka.cluster.roles=[backend]"))
.withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem", config)
val Backend = system.actorOf(superProps,"backend")
}
}
class Worker(client: MongoClient) extends Actor with ActorLogging {
import Backend._
//use allocated threads for io
// implicit val executionContext = context.system.dispatchers.lookup("dbwork-dispatcher")
import monix.execution.Scheduler.Implicits.global
implicit val mongoClient = client;
val ctx = MGOContext("testdb","mulrecs")
def saveToDB(op1: Int, op2: Int, by: String) = {
val doc = Document("by" -> by, "op1" -> op1, "op2" -> op2, "res" -> op1 * op2)
val cmd = ctx.setCommand(MGOCommands.Insert(Seq(doc)))
val task = mgoUpdate[Completed](cmd).toTask
task.runOnComplete {
case Success(s) => log.info("operations completed successfully.")
case Failure(exception) => log.error(s"error: ${exception.getMessage}")
}
}
context.setReceiveTimeout(20 seconds)
override def receive: Receive = {
case ProtoMultiply(op1,op2) => {
val res = op1 * op2
saveToDB(op1, op2, s"$self")
log.info(s"******* $op1 X $op2 = $res save to DB by $self *******")
}
case SavedToDB(res) =>
log.info(s"******* result of ${res} saved to database. *******")
case ReceiveTimeout =>
log.info(s"******* $self receive timout! *******")
throw new DBException("worker idle timeout!")
}
}
Frontend.scala
import akka.actor._
import akka.routing._
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.util._
import akka.cluster._
import sdp.grpc.services._
object Frontend {
private var _frontend: ActorRef = _
case class Multiply(op1: Int, op2: Int)
def create(port: Int) = {
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.posrt=$port")
.withFallback(ConfigFactory.parseString(s"akka.cluster.roles=[frontend]"))
.withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem",config)
Cluster(system).registerOnMemberUp{
_frontend = system.actorOf(Props[Frontend],"frontend")
}
}
def getFrontend = _frontend
}
class Frontend extends Actor with ActorLogging {
import Frontend._
import Backend._
import context.dispatcher
//just lookup routees, routing strategy is responsible for deployment
val backend = context.actorOf(FromConfig.props(/* Props.empty */),"dispatcher")
context.system.scheduler.schedule(3.seconds, 3.seconds, self,
Multiply(Random.nextInt(100), Random.nextInt(100)))
override def receive: Receive = {
case Multiply(op1,op2) =>
backend ! ProtoMultiply(op1,op2)
case msg @ _ =>
log.info(s"******* unrecognized message: $msg! ******")
}
}
LoadBalanceDemo.scala
object LoadBalancingApp extends App {
//
//
//initiate three nodes from backend
Backend.create(2551)
//
Backend.create(2552)
//
Backend.create(2561)
//
//initiate frontend node
Frontend.create(2571)
//
}
converters/BytesConverter.scala
package protobuf.bytes
import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
import com.google.protobuf.ByteString
object Converter {
def marshal(value: Any): ByteString = {
val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(stream)
oos.writeObject(value)
oos.close()
ByteString.copyFrom(stream.toByteArray())
}
def unmarshal[A](bytes: ByteString): A = {
val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
val value = ois.readObject()
ois.close()
value.asInstanceOf[A]
}
}
converters/DBOResultType.scala
package sdp.result
import cats._
import cats.data.EitherT
import cats.data.OptionT
import monix.eval.Task
import cats.implicits._
import scala.concurrent._
import scala.collection.TraversableOnce
object DBOResult {
type DBOError[A] = EitherT[Task,Throwable,A]
type DBOResult[A] = OptionT[DBOError,A]
implicit def valueToDBOResult[A](a: A): DBOResult[A] =
Applicative[DBOResult].pure(a)
implicit def optionToDBOResult[A](o: Option[A]): DBOResult[A] =
OptionT((o: Option[A]).pure[DBOError])
implicit def eitherToDBOResult[A](e: Either[Throwable,A]): DBOResult[A] = {
// val error: DBOError[A] = EitherT[Task,Throwable, A](Task.eval(e))
OptionT.liftF(EitherT.fromEither[Task](e))
}
implicit def futureToDBOResult[A](fut: Future[A]): DBOResult[A] = {
val task = Task.fromFuture[A](fut)
val et = EitherT.liftF[Task,Throwable,A](task)
OptionT.liftF(et)
}
implicit class DBOResultToTask[A](r: DBOResult[A]) {
def toTask = r.value.value
}
implicit class DBOResultToOption[A](r:Either[Throwable,Option[A]]) {
def someValue: Option[A] = r match {
case Left(err) => (None: Option[A])
case Right(oa) => oa
}
}
def wrapCollectionInOption[A, C[_] <: TraversableOnce[_]](coll: C[A]): DBOResult[C[A]] =
if (coll.isEmpty)
optionToDBOResult(None: Option[C[A]])
else
optionToDBOResult(Some(coll): Option[C[A]])
}
filestream/FileStreaming.scala
package sdp.file
import java.io.{ByteArrayInputStream, InputStream}
import java.nio.ByteBuffer
import java.nio.file.Paths
import akka.stream.Materializer
import akka.stream.scaladsl.{FileIO, StreamConverters}
import akka.util._
import scala.concurrent.Await
import scala.concurrent.duration._
object Streaming {
def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
implicit mat: Materializer):ByteBuffer = {
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
hd ++ bs
}
(Await.result(fut, timeOut)).toByteBuffer
}
def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
implicit mat: Materializer): Array[Byte] = {
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
hd ++ bs
}
(Await.result(fut, timeOut)).toArray
}
def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
implicit mat: Materializer): InputStream = {
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
hd ++ bs
}
val buf = (Await.result(fut, timeOut)).toArray
new ByteArrayInputStream(buf)
}
def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
implicit mat: Materializer) = {
val ba = new Array[Byte](byteBuf.remaining())
byteBuf.get(ba,0,ba.length)
val baInput = new ByteArrayInputStream(ba)
val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
source.runWith(FileIO.toPath(Paths.get(fileName)))
}
def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
implicit mat: Materializer) = {
val bb = ByteBuffer.wrap(bytes)
val baInput = new ByteArrayInputStream(bytes)
val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
source.runWith(FileIO.toPath(Paths.get(fileName)))
}
def InputStreamToFile(is: InputStream, fileName: String)(
implicit mat: Materializer) = {
val source = StreamConverters.fromInputStream(() => is)
source.runWith(FileIO.toPath(Paths.get(fileName)))
}
}
logging/Log.scala
package sdp.logging
import org.slf4j.Logger
/**
* Logger which just wraps org.slf4j.Logger internally.
*
* @param logger logger
*/
class Log(logger: Logger) {
// use var consciously to enable squeezing later
var isDebugEnabled: Boolean = logger.isDebugEnabled
var isInfoEnabled: Boolean = logger.isInfoEnabled
var isWarnEnabled: Boolean = logger.isWarnEnabled
var isErrorEnabled: Boolean = logger.isErrorEnabled
def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
level match {
case 'debug | 'DEBUG => debug(msg)
case 'info | 'INFO => info(msg)
case 'warn | 'WARN => warn(msg)
case 'error | 'ERROR => error(msg)
case _ => // nothing to do
}
}
def debug(msg: => String): Unit = {
if (isDebugEnabled && logger.isDebugEnabled) {
logger.debug(msg)
}
}
def debug(msg: => String, e: Throwable): Unit = {
if (isDebugEnabled && logger.isDebugEnabled) {
logger.debug(msg, e)
}
}
def info(msg: => String): Unit = {
if (isInfoEnabled && logger.isInfoEnabled) {
logger.info(msg)
}
}
def info(msg: => String, e: Throwable): Unit = {
if (isInfoEnabled && logger.isInfoEnabled) {
logger.info(msg, e)
}
}
def warn(msg: => String): Unit = {
if (isWarnEnabled && logger.isWarnEnabled) {
logger.warn(msg)
}
}
def warn(msg: => String, e: Throwable): Unit = {
if (isWarnEnabled && logger.isWarnEnabled) {
logger.warn(msg, e)
}
}
def error(msg: => String): Unit = {
if (isErrorEnabled && logger.isErrorEnabled) {
logger.error(msg)
}
}
def error(msg: => String, e: Throwable): Unit = {
if (isErrorEnabled && logger.isErrorEnabled) {
logger.error(msg, e)
}
}
}
logging/LogSupport.scala
package sdp.logging
import org.slf4j.LoggerFactory
trait LogSupport {
/**
* Logger
*/
protected val log = new Log(LoggerFactory.getLogger(this.getClass))
}
mgo.engine/MGOProtoConversions.scala
package sdp.mongo.engine
import org.mongodb.scala.bson.collection.immutable.Document
import org.bson.conversions.Bson
import sdp.grpc.services._
import protobuf.bytes.Converter._
import MGOClasses._
import MGOAdmins._
import MGOCommands._
import org.bson.BsonDocument
import org.bson.codecs.configuration.CodecRegistry
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.FindObservable
object MGOProtoConversion {
type MGO_COMMAND_TYPE = Int
val MGO_COMMAND_FIND = 0
val MGO_COMMAND_COUNT = 20
val MGO_COMMAND_DISTICT = 21
val MGO_COMMAND_DOCUMENTSTREAM = 1
val MGO_COMMAND_AGGREGATE = 2
val MGO_COMMAND_INSERT = 3
val MGO_COMMAND_DELETE = 4
val MGO_COMMAND_REPLACE = 5
val MGO_COMMAND_UPDATE = 6
val MGO_ADMIN_DROPCOLLECTION = 8
val MGO_ADMIN_CREATECOLLECTION = 9
val MGO_ADMIN_LISTCOLLECTION = 10
val MGO_ADMIN_CREATEVIEW = 11
val MGO_ADMIN_CREATEINDEX = 12
val MGO_ADMIN_DROPINDEXBYNAME = 13
val MGO_ADMIN_DROPINDEXBYKEY = 14
val MGO_ADMIN_DROPALLINDEXES = 15
case class AdminContext(
tarName: String = "",
bsonParam: Seq[Bson] = Nil,
options: Option[Any] = None,
objName: String = ""
){
def toProto = sdp.grpc.services.ProtoMGOAdmin(
tarName = this.tarName,
bsonParam = this.bsonParam.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
objName = this.objName,
options = this.options.map(b => ProtoAny(marshal(b)))
)
}
object AdminContext {
def fromProto(msg: sdp.grpc.services.ProtoMGOAdmin) = new AdminContext(
tarName = msg.tarName,
bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
objName = msg.objName,
options = msg.options.map(b => unmarshal[Any](b.value))
)
}
case class Context(
dbName: String = "",
collName: String = "",
commandType: MGO_COMMAND_TYPE,
bsonParam: Seq[Bson] = Nil,
resultOptions: Seq[ResultOptions] = Nil,
options: Option[Any] = None,
documents: Seq[Document] = Nil,
targets: Seq[String] = Nil,
only: Boolean = false,
adminOptions: Option[AdminContext] = None
){
def toProto = new sdp.grpc.services.ProtoMGOContext(
dbName = this.dbName,
collName = this.collName,
commandType = this.commandType,
bsonParam = this.bsonParam.map(bsonToProto),
resultOptions = this.resultOptions.map(_.toProto),
options = { if(this.options == None)
None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else
Some(ProtoAny(marshal(this.options.get))) },
documents = this.documents.map(d => sdp.grpc.services.ProtoMGODocument(marshal(d))),
targets = this.targets,
only = Some(this.only),
adminOptions = this.adminOptions.map(_.toProto)
)
}
object MGODocument {
def fromProto(msg: sdp.grpc.services.ProtoMGODocument): Document =
unmarshal[Document](msg.document)
def toProto(doc: Document): sdp.grpc.services.ProtoMGODocument =
new ProtoMGODocument(marshal(doc))
}
object MGOProtoMsg {
def fromProto(msg: sdp.grpc.services.ProtoMGOContext) = new Context(
dbName = msg.dbName,
collName = msg.collName,
commandType = msg.commandType,
bsonParam = msg.bsonParam.map(protoToBson),
resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r)),
options = msg.options.map(a => unmarshal[Any](a.value)),
documents = msg.documents.map(doc => unmarshal[Document](doc.document)),
targets = msg.targets,
adminOptions = msg.adminOptions.map(ado => AdminContext.fromProto(ado))
)
}
def bsonToProto(bson: Bson) =
ProtoMGOBson(marshal(bson.toBsonDocument(
classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
def protoToBson(proto: ProtoMGOBson): Bson = new Bson {
val bsdoc = unmarshal[BsonDocument](proto.bson)
override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
}
def ctxFromProto(proto: ProtoMGOContext): MGOContext = proto.commandType match {
case MGO_COMMAND_FIND => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_QUERY,
action = Some(Find())
)
def toResultOption(rts: Seq[ProtoMGOResultOption]): FindObservable[Document] => FindObservable[Document] = findObj =>
rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
(proto.bsonParam, proto.resultOptions, proto.only) match {
case (Nil, Nil, None) => ctx
case (Nil, Nil, Some(b)) => ctx.setCommand(Find(firstOnly = b))
case (bp,Nil,None) => ctx.setCommand(
Find(filter = Some(protoToBson(bp.head))))
case (bp,Nil,Some(b)) => ctx.setCommand(
Find(filter = Some(protoToBson(bp.head)), firstOnly = b))
case (bp,fo,None) => {
ctx.setCommand(
Find(filter = Some(protoToBson(bp.head)),
andThen = fo.map(ResultOptions.fromProto)
))
}
case (bp,fo,Some(b)) => {
ctx.setCommand(
Find(filter = Some(protoToBson(bp.head)),
andThen = fo.map(ResultOptions.fromProto),
firstOnly = b))
}
case _ => ctx
}
}
case MGO_COMMAND_COUNT => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_QUERY,
action = Some(Count())
)
(proto.bsonParam, proto.options) match {
case (Nil, None) => ctx
case (bp, None) => ctx.setCommand(
Count(filter = Some(protoToBson(bp.head)))
)
case (Nil,Some(o)) => ctx.setCommand(
Count(options = Some(unmarshal[Any](o.value)))
)
case _ => ctx
}
}
case MGO_COMMAND_DISTICT => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_QUERY,
action = Some(Distict(fieldName = proto.targets.head))
)
(proto.bsonParam) match {
case Nil => ctx
case bp: Seq[ProtoMGOBson] => ctx.setCommand(
Distict(fieldName = proto.targets.head,filter = Some(protoToBson(bp.head)))
)
case _ => ctx
}
}
case MGO_COMMAND_AGGREGATE => {
new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_QUERY,
action = Some(Aggregate(proto.bsonParam.map(p => protoToBson(p))))
)
}
case MGO_ADMIN_LISTCOLLECTION => {
new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_QUERY,
action = Some(ListCollection(proto.dbName)))
}
case MGO_COMMAND_INSERT => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_UPDATE,
action = Some(Insert(
newdocs = proto.documents.map(doc => unmarshal[Document](doc.document))))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(Insert(
newdocs = proto.documents.map(doc => unmarshal[Document](doc.document)),
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_COMMAND_DELETE => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_UPDATE,
action = Some(Delete(
filter = protoToBson(proto.bsonParam.head)))
)
(proto.options, proto.only) match {
case (None,None) => ctx
case (None,Some(b)) => ctx.setCommand(Delete(
filter = protoToBson(proto.bsonParam.head),
onlyOne = b))
case (Some(o),None) => ctx.setCommand(Delete(
filter = protoToBson(proto.bsonParam.head),
options = Some(unmarshal[Any](o.value)))
)
case (Some(o),Some(b)) => ctx.setCommand(Delete(
filter = protoToBson(proto.bsonParam.head),
options = Some(unmarshal[Any](o.value)),
onlyOne = b)
)
}
}
case MGO_COMMAND_REPLACE => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_UPDATE,
action = Some(Replace(
filter = protoToBson(proto.bsonParam.head),
replacement = unmarshal[Document](proto.documents.head.document)))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(Replace(
filter = protoToBson(proto.bsonParam.head),
replacement = unmarshal[Document](proto.documents.head.document),
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_COMMAND_UPDATE => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_UPDATE,
action = Some(Update(
filter = protoToBson(proto.bsonParam.head),
update = protoToBson(proto.bsonParam.tail.head)))
)
(proto.options, proto.only) match {
case (None,None) => ctx
case (None,Some(b)) => ctx.setCommand(Update(
filter = protoToBson(proto.bsonParam.head),
update = protoToBson(proto.bsonParam.tail.head),
onlyOne = b))
case (Some(o),None) => ctx.setCommand(Update(
filter = protoToBson(proto.bsonParam.head),
update = protoToBson(proto.bsonParam.tail.head),
options = Some(unmarshal[Any](o.value)))
)
case (Some(o),Some(b)) => ctx.setCommand(Update(
filter = protoToBson(proto.bsonParam.head),
update = protoToBson(proto.bsonParam.tail.head),
options = Some(unmarshal[Any](o.value)),
onlyOne = b)
)
}
}
case MGO_ADMIN_DROPCOLLECTION =>
new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(DropCollection(proto.collName))
)
case MGO_ADMIN_CREATECOLLECTION => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(CreateCollection(proto.collName))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(CreateCollection(proto.collName,
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_ADMIN_CREATEVIEW => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(CreateView(viewName = proto.targets.head,
viewOn = proto.targets.tail.head,
pipeline = proto.bsonParam.map(p => protoToBson(p))))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(CreateView(viewName = proto.targets.head,
viewOn = proto.targets.tail.head,
pipeline = proto.bsonParam.map(p => protoToBson(p)),
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_ADMIN_CREATEINDEX=> {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(CreateIndex(key = protoToBson(proto.bsonParam.head)))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(CreateIndex(key = protoToBson(proto.bsonParam.head),
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_ADMIN_DROPINDEXBYNAME=> {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(DropIndexByName(indexName = proto.targets.head))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(DropIndexByName(indexName = proto.targets.head,
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_ADMIN_DROPINDEXBYKEY=> {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(DropIndexByKey(key = protoToBson(proto.bsonParam.head)))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(DropIndexByKey(key = protoToBson(proto.bsonParam.head),
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_ADMIN_DROPALLINDEXES=> {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(DropAllIndexes())
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(DropAllIndexes(
options = Some(unmarshal[Any](o.value)))
)
}
}
}
def ctxToProto(ctx: MGOContext): Option[sdp.grpc.services.ProtoMGOContext] = ctx.action match {
case None => None
case Some(act) => act match {
case Count(filter, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_COUNT,
bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
else Seq(bsonToProto(filter.get))},
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case Distict(fieldName, filter) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_DISTICT,
bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
else Seq(bsonToProto(filter.get))},
targets = Seq(fieldName)
))
case Find(filter, andThen, firstOnly) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_FIND,
bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
else Seq(bsonToProto(filter.get))},
resultOptions = andThen.map(_.toProto)
))
case Aggregate(pipeLine) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_AGGREGATE,
bsonParam = pipeLine.map(bsonToProto)
))
case Insert(newdocs, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_INSERT,
documents = newdocs.map(d => ProtoMGODocument(marshal(d))),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case Delete(filter, options, onlyOne) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_DELETE,
bsonParam = Seq(bsonToProto(filter)),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) },
only = Some(onlyOne)
))
case Replace(filter, replacement, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_REPLACE,
bsonParam = Seq(bsonToProto(filter)),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) },
documents = Seq(ProtoMGODocument(marshal(replacement)))
))
case Update(filter, update, options, onlyOne) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_UPDATE,
bsonParam = Seq(bsonToProto(filter),bsonToProto(update)),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) },
only = Some(onlyOne)
))
case DropCollection(coll) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = coll,
commandType = MGO_ADMIN_DROPCOLLECTION
))
case CreateCollection(coll, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = coll,
commandType = MGO_ADMIN_CREATECOLLECTION,
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case ListCollection(dbName) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
commandType = MGO_ADMIN_LISTCOLLECTION
))
case CreateView(viewName, viewOn, pipeline, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_ADMIN_CREATEVIEW,
bsonParam = pipeline.map(bsonToProto),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) },
targets = Seq(viewName,viewOn)
))
case CreateIndex(key, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_ADMIN_CREATEINDEX,
bsonParam = Seq(bsonToProto(key)),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case DropIndexByName(indexName, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_ADMIN_DROPINDEXBYNAME,
targets = Seq(indexName),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case DropIndexByKey(key, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_ADMIN_DROPINDEXBYKEY,
bsonParam = Seq(bsonToProto(key)),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case DropAllIndexes(options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_ADMIN_DROPALLINDEXES,
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
}
}
}
mgo.engine/MongoEngine.scala
package sdp.mongo.engine
import java.text.SimpleDateFormat
import java.util.Calendar
import akka.NotUsed
import akka.stream.Materializer
import akka.stream.alpakka.mongodb.scaladsl._
import akka.stream.scaladsl.{Flow, Source}
import org.bson.conversions.Bson
import org.mongodb.scala.bson.collection.immutable.Document
import org.mongodb.scala.bson.{BsonArray, BsonBinary}
import org.mongodb.scala.model._
import org.mongodb.scala.{MongoClient, _}
import protobuf.bytes.Converter._
import sdp.file.Streaming._
import sdp.logging.LogSupport
import scala.collection.JavaConverters._
import scala.concurrent._
import scala.concurrent.duration._
object MGOClasses {
type MGO_ACTION_TYPE = Int
val MGO_QUERY = 0
val MGO_UPDATE = 1
val MGO_ADMIN = 2
/* org.mongodb.scala.FindObservable
import com.mongodb.async.client.FindIterable
val resultDocType = FindIterable[Document]
val resultOption = FindObservable(resultDocType)
.maxScan(...)
.limit(...)
.sort(...)
.project(...) */
type FOD_TYPE = Int
val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item
val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult]
val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult]
val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult]
val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult]
//Sets a document describing the fields to return for all matching documents
val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult]
val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult]
//Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult]
//Sets the cursor type
val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult]
//Sets the hint for which index to use. A null value means no hint is set
val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult]
//Sets the exclusive upper bound for a specific index. A null value means no max is set
val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult]
//Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
//Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
val FOD_SHOWRECORDID=12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
//Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents
case class ResultOptions(
optType: FOD_TYPE,
bson: Option[Bson] = None,
value: Int = 0 ){
def toProto = new sdp.grpc.services.ProtoMGOResultOption(
optType = this.optType,
bsonParam = this.bson.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
valueParam = this.value
)
def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
optType match {
case FOD_FIRST => find
case FOD_FILTER => find.filter(bson.get)
case FOD_LIMIT => find.limit(value)
case FOD_SKIP => find.skip(value)
case FOD_PROJECTION => find.projection(bson.get)
case FOD_SORT => find.sort(bson.get)
case FOD_PARTIAL => find.partial(value != 0)
case FOD_CURSORTYPE => find
case FOD_HINT => find.hint(bson.get)
case FOD_MAX => find.max(bson.get)
case FOD_MIN => find.min(bson.get)
case FOD_RETURNKEY => find.returnKey(value != 0)
case FOD_SHOWRECORDID => find.showRecordId(value != 0)
}
}
}
object ResultOptions {
def fromProto(msg: sdp.grpc.services.ProtoMGOResultOption) = new ResultOptions(
optType = msg.optType,
bson = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
value = msg.valueParam
)
}
trait MGOCommands
object MGOCommands {
case class Count(filter: Option[Bson] = None, options: Option[Any] = None) extends MGOCommands
case class Distict(fieldName: String, filter: Option[Bson] = None) extends MGOCommands
/* org.mongodb.scala.FindObservable
import com.mongodb.async.client.FindIterable
val resultDocType = FindIterable[Document]
val resultOption = FindObservable(resultDocType)
.maxScan(...)
.limit(...)
.sort(...)
.project(...) */
case class Find(filter: Option[Bson] = None,
andThen: Seq[ResultOptions] = Seq.empty[ResultOptions],
firstOnly: Boolean = false) extends MGOCommands
case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
}
object MGOAdmins {
case class DropCollection(collName: String) extends MGOCommands
case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
case class ListCollection(dbName: String) extends MGOCommands
case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
}
case class MGOContext(
dbName: String,
collName: String,
actionType: MGO_ACTION_TYPE = MGO_QUERY,
action: Option[MGOCommands] = None,
actionOptions: Option[Any] = None,
actionTargets: Seq[String] = Nil
) {
ctx =>
def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
def setCollName(name: String): MGOContext = ctx.copy(collName = name)
def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd))
def toSomeProto = MGOProtoConversion.ctxToProto(this)
}
object MGOContext {
def apply(db: String, coll: String) = new MGOContext(db, coll)
def fromProto(proto: sdp.grpc.services.ProtoMGOContext): MGOContext =
MGOProtoConversion.ctxFromProto(proto)
}
case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {
ctxs =>
def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)
def appendContext(ctx: MGOContext): MGOBatContext =
ctxs.copy(contexts = contexts :+ ctx)
}
object MGOBatContext {
def