1. 程式人生 > >遠程調用Spark平臺中的程序

遠程調用Spark平臺中的程序

getpara clas comm xxx 端口 write 處理 appname null

用scala語言,開發好了在spark平臺上可以一直運行的機器學習模型
現在有個需求:
要遠程調用該模型的一些方法並獲取結果
那麽可以使用jetty在服務器端主節點占用一個端口然後對外提供http服務

import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import com.xxx.rec.basic.ccam.CanonicalCorrelationAnalysisModel
import org.mortbay.jetty.{HttpStatus, Request, Server}
import org.mortbay.jetty.handler._


object CannonicalCorrelationAnalysisModelJerseyServer extends AbstractHandler{ var model: CanonicalCorrelationAnalysisModel = null /** * 處理請求 返回響應 * @param target * @param request * @param response * @param dispatch */ override def handle(target: String, request: HttpServletRequest, response: HttpServletResponse, dispatch: Int): Unit
= { val url=request.getRequestURI url.substring(url.lastIndexOf("/")+1,url.length) match { case "recommend" => { //request中的target 用,號分割 val target: Seq[String] = request.getParameter("target").split(",").toSeq val topNum: Int = request.getParameter("topNum").toInt val result
= model.recommend(target, topNum) response.setStatus(HttpStatus.ORDINAL_200_OK); response.getWriter().println(result.mkString(",")) request.asInstanceOf[Request].setHandled(true) response.getWriter.close() } case _ => { response.setStatus(HttpStatus.ORDINAL_404_Not_Found); request.asInstanceOf[Request].setHandled(true) } } } def main(args: Array[String]): Unit = { import org.apache.spark.{SparkConf, SparkContext} val sparkConf = new SparkConf().setAppName("CanonicalCorrelationAnalysisModelDemo") val textFilePath = "file:///home/xxx/xxx.txt" val sc = new SparkContext(sparkConf) val data = sc.textFile(textFilePath).map { line => line.split( ) }.cache() model = CanonicalCorrelationAnalysisModel.createModel(data, 0.3, 5) val server=new Server(9998) server.setHandler(this) server.start() } }

該程序運行後占用了服務器端主節點的9998端口,通過http訪問即可

遠程調用Spark平臺中的程序