基於Kubernetes的機器學習微服務系統設計系列——(九)應用服務
阿新 • • 發佈:2018-11-10
內容提要
本文的應用服務是指機器學習任務的應用,主要涉及任務的排程、狀態機、和微服務發現,微服務訪問以及微服務資源的監控。
1 分類任務
1.1 分類任務類圖
分類任務類圖如圖所示:
1.2 資料結構
分類任務類圖如圖所示:
頂層採用任務池管理分類任務,每個任務下面掛多個分類子任務,每個子任務下又有多個任務例項。
1.3 排程狀態轉移
分類任務排程狀態轉移圖如圖所示:
任務有建立、執行、等待、完成、超時、消亡六種狀態,不同的條件進入不同的狀態如圖所示。
2 資源監控
2.1 資源監控類圖
資源監控類圖如圖所示:
2.2 部分程式碼實現
服務控制中心類:
package com.robin.monitor;
import com. robin.config.ConfigUtil;
import com.robin.log.RobinLogger;
import com.robin.task.ClassifyTaskPool;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* <DT><B>描述:</B></DT>
* <DD>服務控制中心</DD>
*
* @version Version1.0
* @author Robin
* @version <I> V1.0 Date:2018-08-08</I>
* @author <I> E-mail:[email protected]</I>
*/
public class MonitorCenter {
private static final Logger LOGGER = RobinLogger.getLogger();
private static final Map<String, ReplicationController> RC_MAP;
private static final KubeApiServer KUBE_APISERVER;
private static MonitorThread monitor = null;
private static final ExecutorService TASK_THREAD_POOL;
static {
RC_MAP = new HashMap<>();
String kubeApiAddress = ConfigUtil.getConfig("kubernetes.api.address");
int kubeApiPort = Integer.valueOf(ConfigUtil.getConfig("kubernetes.api.port"));
KUBE_APISERVER = new KubeApiServer(kubeApiAddress, kubeApiPort);
int threadPoolSize = Integer.valueOf(ConfigUtil.getConfig("macro.request.thread.pool.size"));
TASK_THREAD_POOL = Executors.newFixedThreadPool(threadPoolSize);
}
public static KubeApiServer getKUBE_APISERVER() {
return KUBE_APISERVER;
}
public static ExecutorService getTASK_THREAD_POOL() {
return TASK_THREAD_POOL;
}
public static void addRC(String rcName) {
if (RC_MAP.containsKey(rcName)) {
return;
}
ReplicationController rc = KUBE_APISERVER.getReplicationController(rcName);
if (rc == null) {
return;
}
rc.startMonitor();
RC_MAP.put(rcName, rc);
}
public static void monitor() {
if (monitor == null) {
monitor = new MonitorThread();
Thread t = new Thread(monitor);
t.start();
}
}
public static ReplicationController getRC(String rcName) {
return RC_MAP.get(rcName);
}
public static void removeRC(String rcName) {
RC_MAP.remove(rcName);
}
static class MonitorThread implements Runnable {
private static final int INTERVAL_TIME = 1000 * Integer.valueOf(ConfigUtil.getConfig("kubernetes.rc.refresh.time"));
@Override
public void run() {
LOGGER.log(Level.INFO, "++++ RCMonitor thread start. ++++");
while (true) {
//清理殘留任務
ClassifyTaskPool.clearTimeoutTask();
if (!RC_MAP.isEmpty()) {
Set<String> rmSet = new HashSet<>();
Iterator<String> it = RC_MAP.keySet().iterator();
while (it.hasNext()) {
String rcName = it.next();
ReplicationController oldRc = RC_MAP.get(rcName);
ReplicationController newRc = KUBE_APISERVER.getReplicationController(rcName);
if (newRc == null) {
rmSet.add(rcName);
continue;
}
if (!oldRc.equals(newRc)) {
rmSet.add(rcName);
}
}
if (!rmSet.isEmpty()) {
it = rmSet.iterator();
while (it.hasNext()) {
String rcName = it.next();
ReplicationController oldRc = RC_MAP.remove(rcName);
oldRc.clean();
}
}
}
addRC("segment-rc");
addRC("pretreatment-rc");
addRC("feature-select-rc");
addRC("classifier-rc");
try {
Thread.sleep(INTERVAL_TIME);
} catch (InterruptedException ex) {
LOGGER.log(Level.SEVERE, ex.getMessage());
}
}
}
}
}
3 應用WEB部署
應用服務部署在虛擬機器的Glassfish server容器內,也可以部署在Docker Container的server的容器內。
WEB部署應用服務配置檔案:
#web config
web.axis.title.font.name=Arial\u0020Italic\u0020Bold
web.axis.title.font.size=9
web.axis.title.font.color=000000
#docker config
docker.monitor.port=5257
#kubernetes config
kubernetes.rc.refresh.time=15
kubernetes.api.address=192.168.1.10
kubernetes.api.port=8080
#macro server ip type Cluster/Host
macro.server.ip.type=Cluster
macro.request.thread.pool.size=10
#classify task config
classify.task.timeout=60
#log config
log.path=log/
log.prefix=robin-ml
# Level.ALL Level.FINEST Level.FINER Level.FINE Level.CONFIG
# Level.INFO Level.WARNING Level.SEVERE Level.OFF
log.level=Level.INFO
log.file.limit=1048576
log.file.count=3