1. 程式人生 > >基於Kubernetes的機器學習微服務系統設計系列——(九)應用服務

基於Kubernetes的機器學習微服務系統設計系列——(九)應用服務

 內容提要

  本文的應用服務是指機器學習任務的應用,主要涉及任務的排程、狀態機、和微服務發現,微服務訪問以及微服務資源的監控。

1 分類任務

1.1 分類任務類圖

  分類任務類圖如圖所示:

分類任務類圖

1.2 資料結構

  分類任務類圖如圖所示:

分類任務資料結構

  頂層採用任務池管理分類任務,每個任務下面掛多個分類子任務,每個子任務下又有多個任務例項。

1.3 排程狀態轉移

  分類任務排程狀態轉移圖如圖所示:

分類任務排程狀態轉移圖

  任務有建立、執行、等待、完成、超時、消亡六種狀態,不同的條件進入不同的狀態如圖所示。

2 資源監控

2.1 資源監控類圖

  資源監控類圖如圖所示:

資源監控類圖

2.2 部分程式碼實現

服務控制中心類:

package com.robin.monitor;

import com.
robin.config.ConfigUtil; import com.robin.log.RobinLogger; import com.robin.task.ClassifyTaskPool; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
import java.util.logging.Level; import java.util.logging.Logger; /** * <DT><B>描述:</B></DT> * <DD>服務控制中心</DD> * * @version Version1.0 * @author Robin * @version <I> V1.0 Date:2018-08-08</I> * @author <I> E-mail:[email protected]</I> */ public class MonitorCenter { private static final Logger LOGGER = RobinLogger.getLogger(); private static final Map<String, ReplicationController> RC_MAP; private static final KubeApiServer KUBE_APISERVER; private static MonitorThread monitor = null; private static final ExecutorService TASK_THREAD_POOL; static { RC_MAP = new HashMap<>(); String kubeApiAddress = ConfigUtil.getConfig("kubernetes.api.address"); int kubeApiPort = Integer.valueOf(ConfigUtil.getConfig("kubernetes.api.port")); KUBE_APISERVER = new KubeApiServer(kubeApiAddress, kubeApiPort); int threadPoolSize = Integer.valueOf(ConfigUtil.getConfig("macro.request.thread.pool.size")); TASK_THREAD_POOL = Executors.newFixedThreadPool(threadPoolSize); } public static KubeApiServer getKUBE_APISERVER() { return KUBE_APISERVER; } public static ExecutorService getTASK_THREAD_POOL() { return TASK_THREAD_POOL; } public static void addRC(String rcName) { if (RC_MAP.containsKey(rcName)) { return; } ReplicationController rc = KUBE_APISERVER.getReplicationController(rcName); if (rc == null) { return; } rc.startMonitor(); RC_MAP.put(rcName, rc); } public static void monitor() { if (monitor == null) { monitor = new MonitorThread(); Thread t = new Thread(monitor); t.start(); } } public static ReplicationController getRC(String rcName) { return RC_MAP.get(rcName); } public static void removeRC(String rcName) { RC_MAP.remove(rcName); } static class MonitorThread implements Runnable { private static final int INTERVAL_TIME = 1000 * Integer.valueOf(ConfigUtil.getConfig("kubernetes.rc.refresh.time")); @Override public void run() { LOGGER.log(Level.INFO, "++++ RCMonitor thread start. ++++"); while (true) { //清理殘留任務 ClassifyTaskPool.clearTimeoutTask(); if (!RC_MAP.isEmpty()) { Set<String> rmSet = new HashSet<>(); Iterator<String> it = RC_MAP.keySet().iterator(); while (it.hasNext()) { String rcName = it.next(); ReplicationController oldRc = RC_MAP.get(rcName); ReplicationController newRc = KUBE_APISERVER.getReplicationController(rcName); if (newRc == null) { rmSet.add(rcName); continue; } if (!oldRc.equals(newRc)) { rmSet.add(rcName); } } if (!rmSet.isEmpty()) { it = rmSet.iterator(); while (it.hasNext()) { String rcName = it.next(); ReplicationController oldRc = RC_MAP.remove(rcName); oldRc.clean(); } } } addRC("segment-rc"); addRC("pretreatment-rc"); addRC("feature-select-rc"); addRC("classifier-rc"); try { Thread.sleep(INTERVAL_TIME); } catch (InterruptedException ex) { LOGGER.log(Level.SEVERE, ex.getMessage()); } } } } }

3 應用WEB部署

  應用服務部署在虛擬機器的Glassfish server容器內,也可以部署在Docker Container的server的容器內。
  WEB部署應用服務配置檔案:

#web config
web.axis.title.font.name=Arial\u0020Italic\u0020Bold
web.axis.title.font.size=9
web.axis.title.font.color=000000

#docker config
docker.monitor.port=5257

#kubernetes config
kubernetes.rc.refresh.time=15
kubernetes.api.address=192.168.1.10
kubernetes.api.port=8080

#macro server ip type  Cluster/Host
macro.server.ip.type=Cluster
macro.request.thread.pool.size=10

#classify task config
classify.task.timeout=60

#log config
log.path=log/
log.prefix=robin-ml
# Level.ALL Level.FINEST Level.FINER Level.FINE Level.CONFIG 
# Level.INFO Level.WARNING Level.SEVERE Level.OFF
log.level=Level.INFO
log.file.limit=1048576
log.file.count=3