1. 程式人生 > >Akka-Cluster(3)- ClusterClient, 叢集客戶端

Akka-Cluster(3)- ClusterClient, 叢集客戶端

  上篇我們介紹了distributed pub/sub訊息傳遞機制。這是在同一個叢集內的訊息共享機制:釋出者(publisher)和訂閱者(subscriber)都在同一個叢集的節點上,所有節點上的DistributedPubSubMediator通過叢集內部的溝通機制在底層構建了訊息流通渠道。在actor pub/sub層面可以實現物件位置透明化。在現實裡很多前端都會作為某個叢集的客戶端但又與叢集分離,又或者兩個獨立的叢集之間可能會發生互動關係,這是也會出現客戶端與服務端不在同一叢集內的情況,ClusterClient就是為叢集外部actor與叢集內部actor進行溝通的解決方案。

實際上ClusterClient模式就代表一種依賴於訊息釋出訂閱機制的服務方式:客戶端通過訊息來請求服務,服務端接收請求服務訊息並提供相應運算服務。

我們可以把叢集客戶端模式分成叢集客戶端ClusterClient和叢集服務端ClusterClientReceptionist,從字面理解這就是個接待員這麼個角色,負責接待叢集外客戶端發起的服務請求。在叢集所有節點上(或者選定角色role)都部署ClusterClientReceptionist,它們都與本節點的DistributedPubSubMediator對接組成更上一層的訊息訂閱方,ClusterClient與ClusterClientReceptionist的對接又組成了一種統一叢集環境可以實現上集所討論的distributed pub/sub機制。

ClusterClient就是訊息釋出方,它是在目標叢集之外機器上的某個actor。這個機器上的actor如果需要向叢集內部actor傳送訊息可以通過這個機器上的ClusterClient actor與叢集內的ClusterClientReceptionist搭建的通道向叢集內某個ClusterClientReceptionist連線的DistributedPubSubMediator所登記的actor進行訊息傳送。所以使用叢集客戶端的機器必須在本機啟動ClusterClient服務(執行這個actor),這是通訊橋樑的一端。

ClusterClient在啟動時用預先配置的地址(contact points)與ClusterClientReceptionist連線,然後通過ClusterClientReceptionist釋出的聯絡點清單來維護內部的對接點清單,可以進行持久化,在發生系統重啟時用這個名單來與叢集連線。一旦連線,ClusterClient會監控對方執行情況,自動進行具體ClusterClientReceiptionist的替換。ClusterClient釋出訊息是包嵌在三種結構裡的:

1、ClusterClient.Send

2、ClusterClient.SendAll

3、ClusterClient.Publish

這幾種方法我們在上篇已經討論過,這裡就略去。

ClusterClientReceiptionist是叢集內的訊息接收介面。叢集內需要接收訊息的actor必須在本地的DistributedPubSubMediator上註冊自己的地址,ClusterClientReceptionist由此獲得叢集內所有服務專案actor的地址清單。通過ClusterClient釋出的訊息內指定接收方型別資訊來確定最終接收訊息並提供服務的actor。服務註冊示範如下:

//註冊服務A
  val serviceA = system.actorOf(Props[Service], "serviceA")
  ClusterClientReceptionist(system).registerService(serviceA)
//註冊服務B
  val serviceB = system.actorOf(Props[Service], "serviceB")
  ClusterClientReceptionist(system).registerService(serviceB)

ClusterClient呼叫服務示範:

  val client = system.actorOf(ClusterClient.props(
  ClusterClientSettings(system).withInitialContacts(initialContacts)), "client")
  client ! ClusterClient.Send("/user/serviceA", DoThis, localAffinity = true)
  client ! ClusterClient.SendToAll("/user/serviceB", DoThat)

注意:ClusterClientReceptionist需要接收DoThis,DoThat訊息並實現相關的運算。

在具體應用中要注意sender()的具體意義:從提供服務的actor方面看,sender()代表ClusterClientReceptionist。從釋出訊息的actor角度看,sender()代表的是DeadLetter。如果服務actor需要知道請求者具體地址,釋出方可以把自己的地址嵌在釋出的訊息結構裡。

下面我們就通過一個簡單的例子來進行示範。先設計兩個服務actor:Cat,Dog 。假設它們會提供不同的叫聲作為服務吧:

複製程式碼

class Cat extends Actor with ActorLogging {
  //使用pub/sub方式設定
  val mediator = DistributedPubSub(context.system).mediator
  override def preStart() = {
    mediator ! Subscribe("Shout", self)
    super.preStart()
  }

  override def receive: Receive = {
    case "Shout" =>
      log.info("*******I am a cat, MIAOM ...******")
  }
}

class Dog extends Actor with ActorLogging {
  //使用pub/sub方式設定
  val mediator = DistributedPubSub(context.system).mediator
  override def preStart() = {
    mediator ! Subscribe("Shout", self)
    super.preStart()
  }
  override def receive: Receive = {
    case "Shout" =>
      log.info("*****I am a dog, WANG WANG...*****")
  }
}

複製程式碼

我們看到,這就是兩個很普通的actor。但我們還是可以和上一篇分散式pub/sub結合起來驗證cluster-client是基於distributed-pub/sub的。然後我們分別把這兩個actor(服務)放到不同的叢集節點上:

複製程式碼

object Cat {
  def props = Props[Cat]
  def create(port: Int): ActorSystem  = {
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
      .withFallback(ConfigFactory.load())

    val system = ActorSystem("ClusterSystem",config)
    val catSound = system.actorOf(props,"CatSound")

    ClusterClientReceptionist(system).registerService(catSound)
    system
  }
}

object Dog {
  def props = Props(new Dog)
  def create(port: Int): ActorSystem = {
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
      .withFallback(ConfigFactory.load())
    val system = ActorSystem("ClusterSystem",config)
    val dogSound = system.actorOf(props,"DogSound")
    ClusterClientReceptionist(system).registerService(dogSound)
    system
  }
}

複製程式碼

注意:叢集名稱是ClusterSystem。我們分別在actor所在節點用ClusterClientReceptionist.registerService登記了服務。這個叢集所使用的conf如下:

複製程式碼

akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off

akka {
  loglevel = INFO
  extensions = ["akka.cluster.client.ClusterClientReceptionist"]
  actor {
    provider = "cluster"
    serializers {
      java = "akka.serialization.JavaSerializer"
      proto = "akka.remote.serialization.ProtobufSerializer"
    }
    serialization-bindings {
      "java.lang.String" = java
      "scalapb.GeneratedMessage" = proto
    }
  }

  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551"]
    log-info = off
  }
}

複製程式碼

這是一個比較完整的叢集配置文件,只有port需要再配置。然後執行這兩個節點:

複製程式碼

object PetHouse extends App {

  val sysCat = Cat.create(2551)
  val sysDog = Dog.create(2552)

  scala.io.StdIn.readLine()

  sysCat.terminate()
  sysDog.terminate()

}

複製程式碼

完成了在2551,2552節點上的Cat,Dog actor構建及ClusterClientReceptionist.registerService服務登記。現在看看客戶端:

複製程式碼

object PetClient extends App {

  val conf = ConfigFactory.load("client")
  val clientSystem = ActorSystem("ClientSystem",conf)
/* 從 conf 檔案裡讀取 contact-points 地址
  val initialContacts = immutableSeq(conf.getStringList("contact-points")).map {
    case AddressFromURIString(addr) ⇒ RootActorPath(addr) / "system" / "receptionist"
  }.toSet
*/

  //先放一個contact-point, 系統會自動增加其它的點
  val initialContacts = Set(
    ActorPaths.fromString("akka.tcp://[email protected]:2551/system/receptionist")
  )

  val clusterClient = clientSystem.actorOf(
    ClusterClient.props(
      ClusterClientSettings(clientSystem)
        .withInitialContacts(initialContacts)),
    "petClient")

  clusterClient ! Send("/user/CatSound","Shout",localAffinity = true)
  clusterClient ! Send("/user/DogSound","Shout",localAffinity = true)

  println(s"sent shout messages ...")
  scala.io.StdIn.readLine()

  clusterClient ! Publish("Shout","Shout")
  println(s"publish shout messages ...")

  scala.io.StdIn.readLine()
  clientSystem.terminate();
}

複製程式碼

客戶端的ActorSystem名稱為ClientSystem,是在ClusterSystem叢集之外的。conf檔案如下:

複製程式碼

akka {

  actor.provider = remote

  remote.netty.tcp.port= 2553
  remote.netty.tcp.hostname=127.0.0.1

}

contact-points = [
  "akka.tcp://[email protected]:2551",
  "akka.tcp://[email protected]:2552"]

複製程式碼

把它設成actor.provider=remote可以免去提供seednodes。運算結果:

複製程式碼

 [12/08/2018 09:32:51.432] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://[email protected]:2551/user/CatSound] *******I am a cat, MIAOM ...******
[INFO] [12/08/2018 09:32:51.435] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://[email protected]:2552/user/DogSound] *****I am a dog, WANG WANG...*****
[INFO] [12/08/2018 09:33:44.113] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2551/user/CatSound] *******I am a cat, MIAOM ...******
[INFO] [12/08/2018 09:33:44.113] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://[email protected]:2552/user/DogSound] *****I am a dog, WANG WANG...*****

複製程式碼

無論ClusterClient或Receptionist都會針對自己的狀態傳送訊息。我們可以擷取這些訊息來做些相應的工作。參考下面的截聽器示範程式碼: 

複製程式碼

package petsound
import akka.actor._
import akka.cluster.client._
class ClientListener(clusterClient: ActorRef) extends Actor with ActorLogging {
  override def preStart(): Unit = {
    clusterClient ! SubscribeContactPoints
    super.preStart()
  }

  override def receive: Receive = {
    case ContactPoints(cps) =>
      cps.map {ap => log.info(s"*******ContactPoints:${ap.address.toString}******")}
    case ContactPointAdded(cp) =>
      log.info(s"*******ContactPointAdded: ${cp.address.toString}*******")
    case ContactPointRemoved(cp) =>
      log.info(s"*******ContactPointRemoved: ${cp.address.toString}*******")

  }
}

class ReceptionistListener(receptionist: ActorRef) extends Actor with ActorLogging {
  override def preStart(): Unit = {
    receptionist ! SubscribeClusterClients
    super.preStart()
  }

  override def receive: Receive = {
    case ClusterClients(cs) =>
      cs.map{aref => println(s"*******ClusterClients: ${aref.path.address.toString}*******")}
    case ClusterClientUp(cc) =>
      log.info(s"*******ClusterClientUp: ${cc.path.address.toString}*******")
    case ClusterClientUnreachable(cc) =>
      log.info(s"*******ClusterClientUnreachable: ${cc.path.address.toString}*******")

  }
}

複製程式碼

這兩個event-listener的安裝方法如下:

複製程式碼

    val receptionist = ClusterClientReceptionist(system).underlying
    system.actorOf(Props(classOf[ReceptionistListener],receptionist),"cat-event-listner")

    val receptionist = ClusterClientReceptionist(system).underlying
    system.actorOf(Props(classOf[ReceptionistListener],receptionist),"dog-event-listner")

  val clusterClient = clientSystem.actorOf(
    ClusterClient.props(
      ClusterClientSettings(clientSystem)
        .withInitialContacts(initialContacts)),
    "petClient")

  clientSystem.actorOf(Props(classOf[ClientListener],clusterClient),"client-event-listner")

複製程式碼

看看運算結果:

複製程式碼

[INFO] [12/09/2018 09:42:40.838] [ClientSystem-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:2553/user/client-event-listner] *******ContactPoints:akka.tcp://[email protected]:2551******
[INFO] [12/09/2018 09:42:40.947] [ClientSystem-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:2553/user/client-event-listner] *******ContactPointAdded: akka.tcp://[email protected]:2552*******
[INFO] [12/09/2018 09:42:40.967] [ClientSystem-akka.actor.default-dispatcher-15] [akka.tcp://[email protected]:2553/user/petClient] Connected to [akka.tcp://[email protected]:2551/system/receptionist]


[INFO] [12/09/2018 09:42:40.979] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2551/user/cat-event-listner] *******ClusterClientUp: akka.tcp://[email protected]:2553*******

[INFO] [12/09/2018 09:54:34.363] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://[email protected]:2551/user/cat-event-listner] *******ClusterClientUnreachable: akka.tcp://[email protected]:2553*******

複製程式碼

下面我們再做個示範,還是與上篇討論一樣:由叢集客戶端傳送MongoDB指令至某個在叢集裡用ClusterClientReceptionist註冊的MongoDB操作服務actor。服務方接收指令後在MongoDB上進行運算。下面是MongoDB的服務actor: 

複製程式碼

package petsound
import akka.actor._
import com.typesafe.config._
import akka.actor.ActorSystem
import org.mongodb.scala._
import sdp.grpc.services.ProtoMGOContext
import sdp.mongo.engine.MGOClasses._
import sdp.mongo.engine.MGOEngine._
import sdp.result.DBOResult._
import akka.cluster.client._

import scala.collection.JavaConverters._
import scala.util._

class MongoAdder extends Actor with ActorLogging {
  import monix.execution.Scheduler.Implicits.global
  implicit val mgosys = context.system
  implicit val ec = mgosys.dispatcher

  val clientSettings: MongoClientSettings = MongoClientSettings.builder()
    .applyToClusterSettings {b =>
      b.hosts(List(new ServerAddress("localhost:27017")).asJava)
    }.build()

  implicit val client: MongoClient = MongoClient(clientSettings)

  val ctx = MGOContext("testdb","friends")

  override def receive: Receive = {

    case someProto @ Some(proto:ProtoMGOContext) =>
      val ctx = MGOContext.fromProto(proto)
      log.info(s"****** received MGOContext: $someProto *********")

      val task = mgoUpdate[Completed](ctx).toTask
      task.runOnComplete {
        case Success(s) => println("operations completed successfully.")
        case Failure(exception) => println(s"error: ${exception.getMessage}")
      }

  }
}

object MongoAdder {

  def create(port: Int): ActorSystem = {
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
      .withFallback(ConfigFactory.load())
    val system = ActorSystem("ClusterSystem", config)

    val mongoAdder = system.actorOf(Props[MongoAdder],"MongoAdder")
    ClusterClientReceptionist(system).registerService(mongoAdder)

    val receptionist = ClusterClientReceptionist(system).underlying
    system.actorOf(Props(classOf[ReceptionistListener],receptionist),"mongo-event-listner")

    system

  }

}

複製程式碼

MongoAdder處於同一個叢集ClusterSystem中。程式碼裡已經包括了服務註冊部分。客戶端傳送MongoDB指令的示範如下:

複製程式碼

 //MongoDB 操作示範
  import org.mongodb.scala._
  import sdp.mongo.engine.MGOClasses._

  val ctx = MGOContext("testdb","friends")

  val chen = Document("姓" -> "陳", "名" -> "大文","age" -> 28)
  val zhang = Document("姓" -> "張", "名" -> "小海","age" -> 7)
  val lee = Document("姓" -> "李", "名" -> "四","age" -> 45)
  val ouyang = Document("姓" -> "歐陽", "名" -> "鋒","age" -> 120)

  val c = ctx.setCommand(MGOCommands.Insert(Seq(chen,zhang,lee,ouyang)))
  clusterClient ! Send("/user/MongoAdder",c.toSomeProto,localAffinity = true)

複製程式碼

由於MongoDB指令是通過protobuffer方式進行序列化的,所以需要修改client.conf通知akka使用protobuf格式的訊息:

複製程式碼

akka {

  actor {
    provider = remote
    serializers {
      java = "akka.serialization.JavaSerializer"
      proto = "akka.remote.serialization.ProtobufSerializer"
    }
    serialization-bindings {
      "java.lang.String" = java
      "scalapb.GeneratedMessage" = proto
    }
  }
  remote.netty.tcp.port= 2553
  remote.netty.tcp.hostname=127.0.0.1

}

contact-points = [
  "akka.tcp://[email protected]:2551",
  "akka.tcp://[email protected]:2552"]

複製程式碼

下面是本次討論完整示範原始碼:

build.sbt

複製程式碼

import scalapb.compiler.Version.scalapbVersion
import scalapb.compiler.Version.grpcJavaVersion

name := "akka-cluster-client"

version := "0.1"

scalaVersion := "2.12.7"

scalacOptions += "-Ypartial-unification"

libraryDependencies := Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.5.17",
  "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.17",
  "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
  // "io.grpc" % "grpc-netty" % grpcJavaVersion,
  "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
  "io.monix" %% "monix" % "2.3.0",
  //for mongodb 4.0
  "org.mongodb.scala" %% "mongo-scala-driver" % "2.4.0",
  "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.20",
  //other dependencies
  "co.fs2" %% "fs2-core" % "0.9.7",
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3",
  "org.typelevel" %% "cats-core" % "0.9.0",
  "io.monix" %% "monix-execution" % "3.0.0-RC1",
  "io.monix" %% "monix-eval" % "3.0.0-RC1"
)

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value
)

複製程式碼

project/scalapb.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")

libraryDependencies ++= Seq(
  "com.thesamet.scalapb" %% "compilerplugin" % "0.7.4"
)

resouces/application.conf

複製程式碼

akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off

akka {
  loglevel = INFO
  extensions = ["akka.cluster.client.ClusterClientReceptionist"]
  actor {
    provider = "cluster"
    serializers {
      java = "akka.serialization.JavaSerializer"
      proto = "akka.remote.serialization.ProtobufSerializer"
    }
    serialization-bindings {
      "java.lang.String" = java
      "scalapb.GeneratedMessage" = proto
    }
  }

  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551"]
    log-info = off
  }
}

複製程式碼

resources/client.conf

複製程式碼

akka {

  actor {
    provider = remote
    serializers {
      java = "akka.serialization.JavaSerializer"
      proto = "akka.remote.serialization.ProtobufSerializer"
    }
    serialization-bindings {
      "java.lang.String" = java
      "scalapb.GeneratedMessage" = proto
    }
  }
  remote.netty.tcp.port= 2553
  remote.netty.tcp.hostname=127.0.0.1

}

contact-points = [
  "akka.tcp://[email protected]:2551",
  "akka.tcp://[email protected]:2552"]

複製程式碼

protobuf/spd.proto

複製程式碼

syntax = "proto3";

import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";

option (scalapb.options) = {
  // use a custom Scala package name
  // package_name: "io.ontherocks.introgrpc.demo"

  // don't append file name to package
  flat_package: true

  // generate one Scala file for all messages (services still get their own file)
  single_file: true

  // add imports to generated file
  // useful when extending traits or using custom types
  // import: "io.ontherocks.hellogrpc.RockingMessage"

  // code to put at the top of generated file
  // works only with `single_file: true`
  //preamble: "sealed trait SomeSealedTrait"
};

package sdp.grpc.services;


message ProtoDate {
  int32 yyyy = 1;
  int32 mm   = 2;
  int32 dd   = 3;
}

message ProtoTime {
  int32 hh   = 1;
  int32 mm   = 2;
  int32 ss   = 3;
  int32 nnn  = 4;
}

message ProtoDateTime {
   ProtoDate date = 1;
   ProtoTime time = 2;
}

message ProtoAny {
  bytes value = 1;
}

複製程式碼

protobuf/mgo.proto

複製程式碼

syntax = "proto3";

import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";


option (scalapb.options) = {
  // use a custom Scala package name
  // package_name: "io.ontherocks.introgrpc.demo"

  // don't append file name to package
  flat_package: true

  // generate one Scala file for all messages (services still get their own file)
  single_file: true

  // add imports to generated file
  // useful when extending traits or using custom types
  // import: "io.ontherocks.hellogrpc.RockingMessage"

  // code to put at the top of generated file
  // works only with `single_file: true`
  //preamble: "sealed trait SomeSealedTrait"
};

/*
 * Demoes various customization options provided by ScalaPBs.
 */

package sdp.grpc.services;

import "sdp.proto";

message ProtoMGOBson {
  bytes bson = 1;
}

message ProtoMGODocument {
  bytes document = 1;
}

message ProtoMGOResultOption { //FindObservable
   int32 optType = 1;
   ProtoMGOBson bsonParam = 2;
   int32 valueParam = 3;
}

message ProtoMGOAdmin{
  string tarName = 1;
  repeated ProtoMGOBson bsonParam  = 2;
  ProtoAny options = 3;
  string objName = 4;
}

message ProtoMGOContext {  //MGOContext
  string dbName = 1;
  string collName = 2;
  int32 commandType = 3;
  repeated ProtoMGOBson bsonParam = 4;
  repeated ProtoMGOResultOption resultOptions = 5;
  repeated string targets = 6;
  ProtoAny options = 7;
  repeated ProtoMGODocument documents = 8;
  google.protobuf.BoolValue only = 9;
  ProtoMGOAdmin adminOptions = 10;
}

複製程式碼

converters/ByteConverter.scala

複製程式碼

package protobuf.bytes
import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
import com.google.protobuf.ByteString
object Converter {

  def marshal(value: Any): ByteString = {
    val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(stream)
    oos.writeObject(value)
    oos.close()
    ByteString.copyFrom(stream.toByteArray())
  }

  def unmarshal[A](bytes: ByteString): A = {
    val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
    val value = ois.readObject()
    ois.close()
    value.asInstanceOf[A]
  }


}

複製程式碼

converters/DBOResultType.scala

複製程式碼

package sdp.result

import cats._
import cats.data.EitherT
import cats.data.OptionT
import monix.eval.Task
import cats.implicits._

import scala.concurrent._

import scala.collection.TraversableOnce

object DBOResult {


  type DBOError[A] = EitherT[Task,Throwable,A]
  type DBOResult[A] = OptionT[DBOError,A]

  implicit def valueToDBOResult[A](a: A): DBOResult[A] =
         Applicative[DBOResult].pure(a)
  implicit def optionToDBOResult[A](o: Option[A]): DBOResult[A] =
         OptionT((o: Option[A]).pure[DBOError])
  implicit def eitherToDBOResult[A](e: Either[Throwable,A]): DBOResult[A] = {
 //   val error: DBOError[A] = EitherT[Task,Throwable, A](Task.eval(e))
         OptionT.liftF(EitherT.fromEither[Task](e))
  }
  implicit def futureToDBOResult[A](fut: Future[A]): DBOResult[A] = {
       val task = Task.fromFuture[A](fut)
       val et = EitherT.liftF[Task,Throwable,A](task)
       OptionT.liftF(et)
  }

  implicit class DBOResultToTask[A](r: DBOResult[A]) {
    def toTask = r.value.value
  }

  implicit class DBOResultToOption[A](r:Either[Throwable,Option[A]]) {
    def someValue: Option[A] = r match {
      case Left(err) => (None: Option[A])
      case Right(oa) => oa
    }
  }

  def wrapCollectionInOption[A, C[_] <: TraversableOnce[_]](coll: C[A]): DBOResult[C[A]] =
    if (coll.isEmpty)
      optionToDBOResult(None: Option[C[A]])
    else
      optionToDBOResult(Some(coll): Option[C[A]])
}

複製程式碼

filestream/FileStreaming.scala

複製程式碼

package sdp.file

import java.io.{ByteArrayInputStream, InputStream}
import java.nio.ByteBuffer
import java.nio.file.Paths

import akka.stream.Materializer
import akka.stream.scaladsl.{FileIO, StreamConverters}
import akka.util._

import scala.concurrent.Await
import scala.concurrent.duration._

object Streaming {
  def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer):ByteBuffer = {
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
      hd ++ bs
    }
    (Await.result(fut, timeOut)).toByteBuffer
  }


  def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer): Array[Byte] = {
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
      hd ++ bs
    }
    (Await.result(fut, timeOut)).toArray
  }

  def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer): InputStream = {
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
      hd ++ bs
    }
    val buf = (Await.result(fut, timeOut)).toArray
    new ByteArrayInputStream(buf)
  }

  def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
    implicit mat: Materializer) = {
    val ba = new Array[Byte](byteBuf.remaining())
    byteBuf.get(ba,0,ba.length)
    val baInput = new ByteArrayInputStream(ba)
    val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))
    source.runWith(FileIO.toPath(Paths.get(fileName)))
  }

  def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
    implicit mat: Materializer) = {
    val bb = ByteBuffer.wrap(bytes)
    val baInput = new ByteArrayInputStream(bytes)
    val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
    source.runWith(FileIO.toPath(Paths.get(fileName)))
  }

  def InputStreamToFile(is: InputStream, fileName: String)(
    implicit mat: Materializer) = {
    val source = StreamConverters.fromInputStream(() => is)
    source.runWith(FileIO.toPath(Paths.get(fileName)))
  }

}

複製程式碼

logging/Log.scala

複製程式碼

package sdp.logging

import org.slf4j.Logger

/**
  * Logger which just wraps org.slf4j.Logger internally.
  *
  * @param logger logger
  */
class Log(logger: Logger) {

  // use var consciously to enable squeezing later
  var isDebugEnabled: Boolean = logger.isDebugEnabled
  var isInfoEnabled: Boolean = logger.isInfoEnabled
  var isWarnEnabled: Boolean = logger.isWarnEnabled
  var isErrorEnabled: Boolean = logger.isErrorEnabled

  def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
    level match {
      case 'debug | 'DEBUG => debug(msg)
      case 'info | 'INFO => info(msg)
      case 'warn | 'WARN => warn(msg)
      case 'error | 'ERROR => error(msg)
      case _ => // nothing to do
    }
  }

  def debug(msg: => String): Unit = {
    if (isDebugEnabled && logger.isDebugEnabled) {
      logger.debug(msg)
    }
  }

  def debug(msg: => String, e: Throwable): Unit = {
    if (isDebugEnabled && logger.isDebugEnabled) {
      logger.debug(msg, e)
    }
  }

  def info(msg: => String): Unit = {
    if (isInfoEnabled && logger.isInfoEnabled) {
      logger.info(msg)
    }
  }

  def info(msg: => String, e: Throwable): Unit = {
    if (isInfoEnabled && logger.isInfoEnabled) {
      logger.info(msg, e)
    }
  }

  def warn(msg: => String): Unit = {
    if (isWarnEnabled && logger.isWarnEnabled) {
      logger.warn(msg)
    }
  }

  def warn(msg: => String, e: Throwable): Unit = {
    if (isWarnEnabled && logger.isWarnEnabled) {
      logger.warn(msg, e)
    }
  }

  def error(msg: => String): Unit = {
    if (isErrorEnabled && logger.isErrorEnabled) {
      logger.error(msg)
    }
  }

  def error(msg: => String, e: Throwable): Unit = {
    if (isErrorEnabled && logger.isErrorEnabled) {
      logger.error(msg, e)
    }
  }

}

複製程式碼

logging/LogSupport.scala

複製程式碼

package sdp.logging

import org.slf4j.LoggerFactory

trait LogSupport {

  /**
    * Logger
    */
  protected val log = new Log(LoggerFactory.getLogger(this.getClass))

}

複製程式碼

mgo/engine/MGOProtoConversion.scala

複製程式碼

package sdp.mongo.engine
import org.mongodb.scala.bson.collection.immutable.Document
import org.bson.conversions.Bson
import sdp.grpc.services._
import protobuf.bytes.Converter._
import MGOClasses._
import MGOAdmins._
import MGOCommands._
import org.bson.BsonDocument
import org.bson.codecs.configuration.CodecRegistry
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.FindObservable

object MGOProtoConversion {

  type MGO_COMMAND_TYPE = Int
  val MGO_COMMAND_FIND            = 0
  val MGO_COMMAND_COUNT           = 20
  val MGO_COMMAND_DISTICT         = 21
  val MGO_COMMAND_DOCUMENTSTREAM  = 1
  val MGO_COMMAND_AGGREGATE       = 2
  val MGO_COMMAND_INSERT          = 3
  val MGO_COMMAND_DELETE          = 4
  val MGO_COMMAND_REPLACE         = 5
  val MGO_COMMAND_UPDATE          = 6


  val MGO_ADMIN_DROPCOLLECTION    = 8
  val MGO_ADMIN_CREATECOLLECTION  = 9
  val MGO_ADMIN_LISTCOLLECTION    = 10
  val MGO_ADMIN_CREATEVIEW        = 11
  val MGO_ADMIN_CREATEINDEX       = 12
  val MGO_ADMIN_DROPINDEXBYNAME   = 13
  val MGO_ADMIN_DROPINDEXBYKEY    = 14
  val MGO_ADMIN_DROPALLINDEXES    = 15


  case class AdminContext(
                           tarName: String = "",
                           bsonParam: Seq[Bson] = Nil,
                           options: Option[Any] = None,
                           objName: String = ""
                         ){
    def toProto = sdp.grpc.services.ProtoMGOAdmin(
      tarName = this.tarName,
      bsonParam = this.bsonParam.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
      objName = this.objName,
      options = this.options.map(b => ProtoAny(marshal(b)))

    )
  }

  object AdminContext {
    def fromProto(msg: sdp.grpc.services.ProtoMGOAdmin) = new AdminContext(
      tarName = msg.tarName,
      bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
      objName = msg.objName,
      options = msg.options.map(b => unmarshal[Any](b.value))
    )
  }

  case class Context(
                      dbName: String = "",
                      collName: String = "",
                      commandType: MGO_COMMAND_TYPE,
                      bsonParam: Seq[Bson] = Nil,
                      resultOptions: Seq[ResultOptions] = Nil,
                      options: Option[Any] = None,
                      documents: Seq[Document] = Nil,
                      targets: Seq[String] = Nil,
                      only: Boolean = false,
                      adminOptions: Option[AdminContext] = None
                    ){

    def toProto = new sdp.grpc.services.ProtoMGOContext(
      dbName = this.dbName,
      collName = this.collName,
      commandType = this.commandType,
      bsonParam = this.bsonParam.map(bsonToProto),
      resultOptions = this.resultOptions.map(_.toProto),
      options = { if(this.options == None)
        None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
      else
        Some(ProtoAny(marshal(this.options.get))) },
      documents = this.documents.map(d => sdp.grpc.services.ProtoMGODocument(marshal(d))),
      targets = this.targets,
      only = Some(this.only),
      adminOptions = this.adminOptions.map(_.toProto)
    )

  }

  object MGODocument {
    def fromProto(msg: sdp.grpc.services.ProtoMGODocument): Document =
      unmarshal[Document](msg.document)
    def toProto(doc: Document): sdp.grpc.services.ProtoMGODocument =
      new ProtoMGODocument(marshal(doc))
  }

  object MGOProtoMsg {
    def fromProto(msg: sdp.grpc.services.ProtoMGOContext) = new Context(
      dbName = msg.dbName,
      collName = msg.collName,
      commandType = msg.commandType,
      bsonParam = msg.bsonParam.map(protoToBson),
      resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r)),
      options = msg.options.map(a => unmarshal[Any](a.value)),
      documents = msg.documents.map(doc => unmarshal[Document](doc.document)),
      targets = msg.targets,
      adminOptions = msg.adminOptions.map(ado => AdminContext.fromProto(ado))
    )
  }

  def bsonToProto(bson: Bson) =
    ProtoMGOBson(marshal(bson.toBsonDocument(
      classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))

  def protoToBson(proto: ProtoMGOBson): Bson = new Bson {
    val bsdoc = unmarshal[BsonDocument](proto.bson)
    override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
  }

  def ctxFromProto(proto: ProtoMGOContext): MGOContext = proto.commandType match {
    case MGO_COMMAND_FIND => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_QUERY,
        action = Some(Find())
      )
      def toResultOption(rts: Seq[ProtoMGOResultOption]): FindObservable[Document] => FindObservable[Document] = findObj =>
        rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))

      (proto.bsonParam, proto.resultOptions, proto.only) match {
        case (Nil, Nil, None) => ctx
        case (Nil, Nil, Some(b)) => ctx.setCommand(Find(firstOnly = b))
        case (bp,Nil,None) => ctx.setCommand(
          Find(filter = Some(protoToBson(bp.head))))
        case (bp,Nil,Some(b)) => ctx.setCommand(
          Find(filter = Some(protoToBson(bp.head)), firstOnly = b))
        case (bp,fo,None) => {
          ctx.setCommand(
            Find(filter = Some(protoToBson(bp.head)),
              andThen = fo.map(ResultOptions.fromProto)
            ))
        }
        case (bp,fo,Some(b)) => {
          ctx.setCommand(
            Find(filter = Some(protoToBson(bp.head)),
              andThen = fo.map(ResultOptions.fromProto),
              firstOnly = b))
        }
        case _ => ctx
      }
    }
    case MGO_COMMAND_COUNT => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_QUERY,
        action = Some(Count())
      )
      (proto.bsonParam, proto.options) match {
        case (Nil, None) => ctx
        case (bp, None) => ctx.setCommand(
          Count(filter = Some(protoToBson(bp.head)))
        )
        case (Nil,Some(o)) => ctx.setCommand(
          Count(options = Some(unmarshal[Any](o.value)))
        )
        case _ => ctx
      }
    }
    case MGO_COMMAND_DISTICT => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_QUERY,
        action = Some(Distict(fieldName = proto.targets.head))
      )
      (proto.bsonParam) match {
        case Nil => ctx
        case bp: Seq[ProtoMGOBson] => ctx.setCommand(
          Distict(fieldName = proto.targets.head,filter = Some(protoToBson(bp.head)))
        )
        case _ => ctx
      }
    }
    case MGO_COMMAND_AGGREGATE => {
      new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_QUERY,
        action = Some(Aggregate(proto.bsonParam.map(p => protoToBson(p))))
      )
    }
    case MGO_ADMIN_LISTCOLLECTION => {
      new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_QUERY,
        action = Some(ListCollection(proto.dbName)))
    }
    case MGO_COMMAND_INSERT => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_UPDATE,
        action = Some(Insert(
          newdocs = proto.documents.map(doc => unmarshal[Document](doc.document))))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(Insert(
          newdocs = proto.documents.map(doc => unmarshal[Document](doc.document)),
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_COMMAND_DELETE => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_UPDATE,
        action = Some(Delete(
          filter = protoToBson(proto.bsonParam.head)))
      )
      (proto.options, proto.only) match {
        case (None,None) => ctx
        case (None,Some(b)) => ctx.setCommand(Delete(
          filter = protoToBson(proto.bsonParam.head),
          onlyOne = b))
        case (Some(o),None) => ctx.setCommand(Delete(
          filter = protoToBson(proto.bsonParam.head),
          options = Some(unmarshal[Any](o.value)))
        )
        case (Some(o),Some(b)) => ctx.setCommand(Delete(
          filter = protoToBson(proto.bsonParam.head),
          options = Some(unmarshal[Any](o.value)),
          onlyOne = b)
        )
      }
    }
    case MGO_COMMAND_REPLACE => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_UPDATE,
        action = Some(Replace(
          filter = protoToBson(proto.bsonParam.head),
          replacement = unmarshal[Document](proto.documents.head.document)))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(Replace(
          filter = protoToBson(proto.bsonParam.head),
          replacement = unmarshal[Document](proto.documents.head.document),
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_COMMAND_UPDATE => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_UPDATE,
        action = Some(Update(
          filter = protoToBson(proto.bsonParam.head),
          update = protoToBson(proto.bsonParam.tail.head)))
      )
      (proto.options, proto.only) match {
        case (None,None) => ctx
        case (None,Some(b)) => ctx.setCommand(Update(
          filter = protoToBson(proto.bsonParam.head),
          update = protoToBson(proto.bsonParam.tail.head),
          onlyOne = b))
        case (Some(o),None) => ctx.setCommand(Update(
          filter = protoToBson(proto.bsonParam.head),
          update = protoToBson(proto.bsonParam.tail.head),
          options = Some(unmarshal[Any](o.value)))
        )
        case (Some(o),Some(b)) => ctx.setCommand(Update(
          filter = protoToBson(proto.bsonParam.head),
          update = protoToBson(proto.bsonParam.tail.head),
          options = Some(unmarshal[Any](o.value)),
          onlyOne = b)
        )
      }
    }
    case MGO_ADMIN_DROPCOLLECTION =>
      new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(DropCollection(proto.collName))
      )
    case MGO_ADMIN_CREATECOLLECTION => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(CreateCollection(proto.collName))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(CreateCollection(proto.collName,
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_ADMIN_CREATEVIEW => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(CreateView(viewName = proto.targets.head,
          viewOn = proto.targets.tail.head,
          pipeline = proto.bsonParam.map(p => protoToBson(p))))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(CreateView(viewName = proto.targets.head,
          viewOn = proto.targets.tail.head,
          pipeline = proto.bsonParam.map(p => protoToBson(p)),
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_ADMIN_CREATEINDEX=> {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(CreateIndex(key = protoToBson(proto.bsonParam.head)))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(CreateIndex(key = protoToBson(proto.bsonParam.head),
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_ADMIN_DROPINDEXBYNAME=> {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(DropIndexByName(indexName = proto.targets.head))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(DropIndexByName(indexName = proto.targets.head,
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_ADMIN_DROPINDEXBYKEY=> {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(DropIndexByKey(key = protoToBson(proto.bsonParam.head)))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(DropIndexByKey(key = protoToBson(proto.bsonParam.head),
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_ADMIN_DROPALLINDEXES=> {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(DropAllIndexes())
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(DropAllIndexes(
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }

  }

  def ctxToProto(ctx: MGOContext): Option[sdp.grpc.services.ProtoMGOContext] = ctx.action match {
    case None => None
    case Some(act) => act match {
      case Count(filter, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_COUNT,
          bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
                        else Seq(bsonToProto(filter.get))},
          options = { if(options == None) None  //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
                      else Some(ProtoAny(marshal(options.get))) }
      ))
      case Distict(fieldName, filter) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_DISTICT,
          bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
                        else Seq(bsonToProto(filter.get))},
          targets = Seq(fieldName)

        ))

      case Find(filter, andThen, firstOnly) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_FIND,
          bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
          else Seq(bsonToProto(filter.get))},
          resultOptions = andThen.map(_.toProto)
        ))

      case Aggregate(pipeLine) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_AGGREGATE,
          bsonParam = pipeLine.map(bsonToProto)
        ))

      case Insert(newdocs, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_INSERT,
          documents = newdocs.map(d => ProtoMGODocument(marshal(d))),
          options = { if(options == None) None      //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))

      case Delete(filter, options, onlyOne) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_DELETE,
          bsonParam = Seq(bsonToProto(filter)),
          options = { if(options == None) None  //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) },
          only = Some(onlyOne)
        ))

      case Replace(filter, replacement, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_REPLACE,
          bsonParam = Seq(bsonToProto(filter)),
          options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) },
          documents = Seq(ProtoMGODocument(marshal(replacement)))
        ))

      case Update(filter, update, options, onlyOne) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_UPDATE,
          bsonParam = Seq(bsonToProto(filter),bsonToProto(update)),
          options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) },
          only = Some(onlyOne)
        ))


      case DropCollection(coll) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = coll,
          commandType = MGO_ADMIN_DROPCOLLECTION
        ))

      case CreateCollection(coll, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = coll,
          commandType = MGO_ADMIN_CREATECOLLECTION,
          options = { if(options == None) None  //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))

      case ListCollection(dbName) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          commandType = MGO_ADMIN_LISTCOLLECTION
        ))

      case CreateView(viewName, viewOn, pipeline, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_ADMIN_CREATEVIEW,
          bsonParam = pipeline.map(bsonToProto),
          options = { if(options == None) None  //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) },
          targets = Seq(viewName,viewOn)
        ))

      case CreateIndex(key, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_ADMIN_CREATEINDEX,
          bsonParam = Seq(bsonToProto(key)),
          options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))


      case DropIndexByName(indexName, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_ADMIN_DROPINDEXBYNAME,
          targets = Seq(indexName),
          options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))

      case DropIndexByKey(key, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_ADMIN_DROPINDEXBYKEY,
          bsonParam = Seq(bsonToProto(key)),
          options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))


      case DropAllIndexes(options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_ADMIN_DROPALLINDEXES,
          options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))

    }
  }

}

複製程式碼

mgo/engine/MongoDBEngine.scala

複製程式碼

package sdp.mongo.engine

import java.text.SimpleDateFormat
import java.util.Calendar

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.alpakka.mongodb.scaladsl._
import akka.stream.scaladsl.{Flow, Source}
import org.bson.conversions.Bson
import org.mongodb.scala.bson.collection.immutable.Document
import org.mongodb.scala.bson.{BsonArray, BsonBinary}
import org.mongodb.scala.model._
import org.mongodb.scala.{MongoClient, _}
import protobuf.bytes.Converter._
import sdp.file.Streaming._
import sdp.logging.LogSupport

import scala.collection.JavaConverters._
import scala.concurrent._
import scala.concurrent.duration._

object MGOClasses {
  type MGO_ACTION_TYPE = Int
  val MGO_QUERY        = 0
  val MGO_UPDATE       = 1
  val MGO_ADMIN        = 2

  /*  org.mongodb.scala.FindObservable
    import com.mongodb.async.client.FindIterable
    val resultDocType = FindIterable[Document]
    val resultOption = FindObservable(resultDocType)
      .maxScan(...)
    .limit(...)
    .sort(...)
    .project(...) */

  type FOD_TYPE       = Int
  val FOD_FIRST       = 0  //def first(): SingleObservable[TResult], return the first item
  val FOD_FILTER      = 1  //def filter(filter: Bson): FindObservable[TResult]
  val FOD_LIMIT       = 2  //def limit(limit: Int): FindObservable[TResult]
  val FOD_SKIP        = 3  //def skip(skip: Int): FindObservable[TResult]
  val FOD_PROJECTION  = 4  //def projection(projection: Bson): FindObservable[TResult]
  //Sets a document describing the fields to return for all matching documents
  val FOD_SORT        = 5  //def sort(sort: Bson): FindObservable[TResult]
  val FOD_PARTIAL     = 6  //def partial(partial: Boolean): FindObservable[TResult]
  //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
  val FOD_CURSORTYPE  = 7  //def cursorType(cursorType: CursorType): FindObservable[TResult]
  //Sets the cursor type
  val FOD_HINT        = 8  //def hint(hint: Bson): FindObservable[TResult]
  //Sets the hint for which index to use. A null value means no hint is set
  val FOD_MAX         = 9  //def max(max: Bson): FindObservable[TResult]
  //Sets the exclusive upper bound for a specific index. A null value means no max is set
  val FOD_MIN         = 10 //def min(min: Bson): FindObservable[TResult]
  //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
  val FOD_RETURNKEY   = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
  //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
  val FOD_SHOWRECORDID=12  //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
  //Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents

  case class ResultOptions(
                            optType: FOD_TYPE,
                            bson: Option[Bson] = None,
                            value: Int = 0 ){
    def toProto = new sdp.grpc.services.ProtoMGOResultOption(
      optType = this.optType,
      bsonParam = this.bson.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
      valueParam = this.value
    )
    def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
      optType match {
        case  FOD_FIRST        => find
        case  FOD_FILTER       => find.filter(bson.get)
        case  FOD_LIMIT        => find.limit(value)
        case  FOD_SKIP         => find.skip(value)
        case  FOD_PROJECTION   => find.projection(bson.get)
        case  FOD_SORT         => find.sort(bson.get)
        case  FOD_PARTIAL      => find.partial(value != 0)
        case  FOD_CURSORTYPE   => find
        case  FOD_HINT         => find.hint(bson.get)
        case  FOD_MAX          => find.max(bson.get)
        case  FOD_MIN          => find.min(bson.get)
        case  FOD_RETURNKEY    => find.returnKey(value != 0)
        case  FOD_SHOWRECORDID => find.showRecordId(value != 0)

      }
    }
  }
  object ResultOptions {
    def fromProto(msg: sdp.grpc.services.ProtoMGOResultOption) = new ResultOptions(
      optType = msg.optType,
      bson = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
      value = msg.valueParam
    )

  }

  trait MGOCommands

  object MGOCommands {

    case class Count(filter: Option[Bson] = None, options: Option[Any] = None) extends MGOCommands

    case class Distict(fieldName: String, filter: Option[Bson] = None) extends MGOCommands

    /*  org.mongodb.scala.FindObservable
    import com.mongodb.async.client.FindIterable
    val resultDocType = FindIterable[Document]
    val resultOption = FindObservable(resultDocType)
      .maxScan(...)
    .limit(...)
    .sort(...)
    .project(...) */
    case class Find(filter: Option[Bson] = None,
                       andThen: Seq[ResultOptions] = Seq.empty[ResultOptions],
                       firstOnly: Boolean = false) extends MGOCommands

    case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands

    case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands

    case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands

    case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands

    case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands

    case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands


    case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands

  }

  object MGOAdmins {

    case class DropCollection(collName: String) extends MGOCommands

    case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands

    case class ListCollection(dbName: String) extends MGOCommands

    case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands

    case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands

    case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands

    case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands

    case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands

  }

  case class MGOContext(
                         dbName: String,
                         collName: String,
                         actionType: MGO_ACTION_TYPE = MGO_QUERY,
                         action: Option[MGOCommands] = None,
                         actionOptions: Option[Any] = None,
                         actionTargets: Seq[String] = Nil
                       ) {
    ctx =>
    def setDbName(name: String): MGOContext = ctx.copy(dbName = name)

    def setCollName(name: String): MGOContext = ctx.copy(collName = name)

    def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)

    def setCommand(cmd: MGOCommands): MGOContext  = ctx.copy(action = Some(cmd))

    def toSomeProto = MGOProtoConversion.ctxToProto(this)

  }

  object MGOContext {
    def apply(db: String, coll: String) = new MGOConte