1. 程式人生 > >search(2)- elasticsearch scala終端:elastic4s

search(2)- elasticsearch scala終端:elastic4s

   上篇談到:elasticsearch本身是一個完整的後臺系統,對其的操作使用是通過終端api進行的。elasticsearch本身提供了多種程式語言的api,包括java的esjava。而elastic4s是一套基於esjava之上的scala api。

先看看scala 終端 ElasticClient的構建過程:

  import com.sksamuel.elastic4s.ElasticDsl._
  val esjava =  JavaClient(ElasticProperties("http://localhost:9200"))
  val client = ElasticClient(esjava)

先構建JavaClient,JavaClient包嵌了個esjava的RestClient進行具體的操作:

class JavaClient(client: RestClient) extends HttpClient {
...
//send request to elasticsearch
override def send(req: ElasticRequest, callback: Either[Throwable, HttpResponse] => Unit): Unit = {
    if (logger.isDebugEnabled) {
      logger.debug("Executing elastic request {}", Show[ElasticRequest].show(req))
    }

    val l = new ResponseListener {
      override def onSuccess(r: org.elasticsearch.client.Response): Unit = callback(Right(fromResponse(r)))
      override def onFailure(e: Exception): Unit = e match {
        case re: ResponseException => callback(Right(fromResponse(re.getResponse)))
        case t => callback(Left(JavaClientExceptionWrapper(t)))
      }
    }

    val request = new Request(req.method, req.endpoint)
    req.params.foreach { case (key, value) => request.addParameter(key, value) }
    req.entity.map(apacheEntity).foreach(request.setEntity)
//perform actual request sending
    client.performRequestAsync(request, l)
  }
...
}

上面這個RestClient即是elasticsearch提供的javaClient。而elastic4s的具體操作是通過RestClient.performRequestAsync進行的,如下:

 

public class RestClient implements Closeable {
...
    /**
     * Sends a request to the Elasticsearch cluster that the client points to.
     * The request is executed asynchronously and the provided
     * {@link ResponseListener} gets notified upon request completion or
     * failure. Selects a host out of the provided ones in a round-robin
     * fashion. Failing hosts are marked dead and retried after a certain
     * amount of time (minimum 1 minute, maximum 30 minutes), depending on how
     * many times they previously failed (the more failures, the later they
     * will be retried). In case of failures all of the alive nodes (or dead
     * nodes that deserve a retry) are retried until one responds or none of
     * them does, in which case an {@link IOException} will be thrown.
     *
     * @param request the request to perform
     * @param responseListener the {@link ResponseListener} to notify when the
     *      request is completed or fails
     */
    public void performRequestAsync(Request request, ResponseListener responseListener) {
        try {
            FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener);
            InternalRequest internalRequest = new InternalRequest(request);
            performRequestAsync(nextNodes(), internalRequest, failureTrackingResponseListener);
        } catch (Exception e) {
            responseListener.onFailure(e);
        }
    }
...

}

另外,ElasticProperties是一個javaClient與ES連線的引數結構,包括IP地址:

/**
  * Contains the endpoints of the nodes to connect to, as well as connection properties.
  */
case class ElasticProperties(endpoints: Seq[ElasticNodeEndpoint], options: Map[String, String] = Map.empty)

ElasticProperties包含了ES地址ElasticNodeEndPoint及其它連線引數(如果需要的話),如下:

 it should "support prefix path with trailing slash" in {
    ElasticProperties("https://host1:1234,host2:2345/prefix/path/") shouldBe
      ElasticProperties(Seq(ElasticNodeEndpoint("https", "host1", 1234, Some("/prefix/path")), ElasticNodeEndpoint("https", "host2", 2345, Some("/prefix/path"))))
  }

當elastic4s完成了與elasticsearch的連線之後,就可以把按ES要求組合的Json指令傳送到後臺ES去執行了。elastic4s提供了一套DSL, 一種嵌入式語言,可以幫助使用者更方便的用程式設計模式來組合ES的指令Json。當然,使用者也可以直接把字元類的Json直接通過ElasticClient傳送到後臺ES。下面是一個簡單可以執行的elastic4s示範:

import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.requests.common.RefreshPolicy
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}

object HttpClientExampleApp extends App {

  // you must import the DSL to use the syntax helpers
  import com.sksamuel.elastic4s.ElasticDsl._
  val esjava =  JavaClient(ElasticProperties("http://localhost:9200"))
  val client = ElasticClient(esjava)


    client.execute {
      bulk(
        indexInto("books" ).fields("title" -> "重慶火鍋的十種吃法", "content" -> "在這部書裡描述了火鍋的各種烹飪方式"),
        indexInto("books" ).fields("title" -> "中國火鍋大全", "content" -> "本書是全國中式烹飪中有關火鍋的各種介紹")
      ).refresh(RefreshPolicy.WaitFor)
    }.await

  val json =
    """
      |{
      |  "query" : {
      |    "match" : {"title" : "火鍋"}
      |  }
      |}
      |""".stripMargin
  val response = client.execute {
    search("books").source(json)   //      .matchQuery("title", "火鍋")
  }.await

  // prints out the original json
  println(response.result.hits.hits.head.sourceAsString)

  client.close()

}

&n