1 概述

  本篇主要介紹基於Kubernetes、容器(Docker)、微服務技術等在機器學習中的實踐應用。詳細介紹了機器學習文字分類系統的設計與實現過程,以及雲端計算分散式系統的部署。

2 系統介紹

2.1 功能全覽

  系統需要完成的功能點如下思維導圖1所示:

圖 1 雲化微服務機器學習系統功能全覽圖

2.2 核心功能

  主要完成功能:

  1. 支援Docker映象化釋出,支援Kuberneetes雲化部署;
  2. 微服務化設計支援服務自治,支援服務擴縮容;
  3. 支援負載均衡、系統資源監控、資源編排;
  4. 統一設計輕量級通訊RESTful API 介面框架,支援JSON格式請求;
  5. 支援多種機器學習演算法,支援JSON格式引數配置;
  6. 支援中文分詞:RobinSeg(RS)、IKAnalyzer(IK)、JEAnalysis(JE)、MmSeg4j(MS)、PaoDing(PD)、SmallSeg4j(SS)等;
  7. 支援特徵選擇演算法:Document Frequency(DF)、Information Gain(IG)、(χ2)Chi-Square Test(CHI)、Mutual Information(MI)、Matrix Projection(MP)等;
  8. 支援分類演算法:k-Nearest Neighbor(kNN)、Naïve Bayes(NB)、Support Vector Machine(SVM)、Normalized Vector(NLV)等;
  9. 支援Web圖形化UI機器學習效能評估、資料視覺化;

3 系統架構

3.1 雲化架構圖

  雲化微服務機器學習系統架構架構如圖2所示:

圖 2 雲化微服務機器學習系統架構圖

3.2 架構說明

  整個系統採用雲端計算的架構設計。系統支援部署在傳統的虛擬化技術(如KVM)或雲端計算IaaS層服務上(如Openstack等)。PaaS層採用Kubernetes+Docker的應用方式。   整個系統的重點是SaaS層的設計開發,即微服務化的機器學習系統。圖 2 所示紅框藍底部分為系統的核心部分。   系統主要功能模組包括:公共庫Comm-lib、微服務核(中文分詞、預處理、特徵選擇、分類器)、RESTful微服務框架(微服務核載入 、HTTP API)、應用服務+WEB、管理維護等。   公共庫Comm-lib:包括基礎功能,例如日誌、配置、數學計算等;   RESTful微服務框架:主要統一微服務介面,解耦與業務的關係,統一RESTful API。   微服務核:按照微服務介面定義,關注自身的業務實現。實現中文分詞、預處理、特徵選擇、分類器的獨立功能。   管理維護:主要包括Docker映象化製作、釋出,Kubernetes、Docker、微服務資源監控,資源的編排功能。   應用WEB:如果把微服務看出深服務端,那麼這裡包含淺服務端應用和WEB客戶端。服務端處理WEB分類任務的請求、排程和生命週期管理。WEB端顯示任務執行的狀態和機器學習的結果UI顯示,還包括資源的監控顯示。

4 雲化部署

4.1 部署圖

  雲化微服務機器學習系統架構架構如圖3所示:

圖 3 雲化微服務機器學習系統部署圖

4.2 部署說明

  系統部署服務元件主要包括:ETCD、Docker 、Kubernetes Master、Kubernetes Node、Docker Private Registry、Glassfish Server、Flannel。   ETCD:一個開源的、分散式的鍵值對資料儲存系統,提供共享配置、服務的註冊和發現。ETCD為Kubernetes提供預設的儲存系統,儲存所有叢集資料,使用時需要為etcd資料提供備份計劃。ETCD為Flannel 儲存網路配置、分配的子網以及任何輔助資料(如主機的公網 IP)。   Docker: 應用容器引擎,讓開發者可以打包他們的應用以及依賴包到一個可移植的容器中,然後釋出到任何流行的 Linux 機器上,也可以實現虛擬化。   Kubernetes Master:叢集的管理控制中心。   Kubernetes Node:提供Kubernetes執行時環境,以及維護Pod。   Docker Private Registry:Docker 映象私倉,存放開發的微服務映象。   Glassfish Server:Web 應用伺服器,提供web應用的部署。也可以採用映象化的方式部署。   Flannel:常用的網路配置工具,用於配置第三層(網路層)網路結構。

4.3 部署例項

  如上圖3所示,在一臺伺服器上部署例項。

  伺服器配置如下:     處理器:2顆Intel Xeon E5-2670 8核16執行緒      記憶體:32G = 8X4G PC3-106000R 1333      硬碟:240GB SSD + 4TB HDD    作業系統:Ubuntu 14.04.5 LTS   使用KVM啟動6臺虛擬機器。

  虛擬機器配置:     處理器:4核      記憶體:4G      硬碟:80GB    作業系統:CentOS 7.5

  虛擬機器網路採用linux系統網橋管理工具配置網橋進行連結。Kubernetes Master和ETCD部署在一臺虛擬機器上。4臺虛擬機器作為Kubernetes Node節點,其上部署Docker、Flannel服務。一臺作為Docker 私倉,部署Docker服務。

  軟體版本:    ETCD版本:3.0.0    Docker版本:1.12.6    Kubernetes 版本:1.6.7    Flannel版本:0.9.1    Docker映象倉庫版本: 2.5

5 設計實現

5.1 RESTful微服務框架

  為了微服務的介面互動統一,本系統採用統一的框架模式。採用Jersey軟體框架,Jersey 是開源的RESTful框架, 實現了JAX-RS (JSR 311 & JSR 339) 規範。

5.1.1 微服務框架圖

  RESTful框架實現流程如圖4所示:

圖 4 RESTful框架實現流程圖

5.1.2 微服務框架實現

  配置檔案config.properties內容如下:

#restful API config
listen.ip=0.0.0.0
listen.port=8084

#thread pool config
thread.core.pool.size=4
thread.max.pool.size=4

#mirco server config
mircoServer.name=business
jar.path=file:business-1.0.jar
jar.actionClass=com.robin.action.BusinessAction

#log config
log.path=log/
log.prefix=business
# Level.ALL Level.FINEST Level.FINER Level.FINE Level.CONFIG 
# Level.INFO Level.WARNING Level.SEVERE Level.OFF
log.level=Level.INFO
log.file.limit=1048576
log.file.count=3

  通用資源類:

/**
 * <DT><B>描述:</B></DT>
 * <DD>通用資源類</DD>
 *
 * @version Version1.0
 * @author  Robin
 * @version <I> V1.0 Date:2018-05-21</I>
 * @author  <I> E-mail:[email protected]</I>
 */
@Path("robin")
public class CommonResource {
    // 日誌
    private static final Logger LOGGER = RobinLogger.getLogger();
    // 微服務
    private static MircoServiceAction mircoServer;
    // 配置的微服務名稱
    private static final String CFG_MS_NAME;

    static {
        // 微服務名稱配置檔案檢查
        CFG_MS_NAME = ConfigUtil.getConfig("mircoServer.name");
        
        String jarPath = ConfigUtil.getConfig("jar.path");
        URL url = null;
        try {
            url = new URL(jarPath);
        } catch (MalformedURLException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }

        URLClassLoader classLoader = new URLClassLoader(new URL[]{url}, Thread.currentThread()
                .getContextClassLoader());
        Class<?> actionClass = null;
        try {
            String actionClassName = ConfigUtil.getConfig("jar.actionClass");
            actionClass = (Class<?>) classLoader.loadClass(actionClassName);
        } catch (ClassNotFoundException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }
        if (null == actionClass) {
            LOGGER.log(Level.SEVERE, "actionClass is null");
            System.exit(-1);
        }
        try {
            mircoServer = (MircoServiceAction) actionClass.newInstance();
        } catch (InstantiationException | IllegalAccessException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }
    }

    /**
     * Method handling HTTP GET requests. The returned object will be sent to
     * the client as "application/json" media type.
     *
     * @return String that will be returned as a application/json response.
     */
    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String getIt() {
        String cfgMsName = ConfigUtil.getConfig("mircoServer.name");
        return "Micro server [" + cfgMsName + "] is running...\n";
    }

    @POST
    @Path("{microService}")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public JSONObject requestService(
            @PathParam("microService") String serverName,
            JSONObject reqJson) {
        JSONObject rspJson = null;
        if (!serverName.equals(CFG_MS_NAME)) {
            rspJson = new JSONObject();
            try {
                rspJson.put("status", "ERROR");
                rspJson.put("msg", "Mirco server name [" + serverName + "] error.");
            } catch (JSONException ex) {
                LOGGER.log(Level.SEVERE, ex.getMessage());
            }
            return rspJson;
        }

        if (null != mircoServer) {
            rspJson = (JSONObject) mircoServer.action(reqJson);
        }

        return rspJson;
    }
}

  Restful服務類:

/**
 * <DT><B>描述:</B></DT>
 * <DD>Restful服務類</DD>
 *
 * @version Version1.0
 * @author  Robin
 * @version <I> V1.0 Date:2018-05-22</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public class RestfulServer {

    private static final Logger LOGGER = RobinLogger.getLogger();
    private static URI uri;
    private static HttpServer server;

    public static HttpServer getServer() {
        return server;
    }

    public static URI getUri() {
        if (null == uri) {
            String listenAddr = ConfigUtil.getConfig("listen.ip");
            String listenPort = ConfigUtil.getConfig("listen.port");
            String baseUri = "http://" + listenAddr + ":" + listenPort + "/";
            uri = URI.create(baseUri);
        }
        return uri;
    }

    /**
     * Starts Grizzly HTTP server exposing JAX-RS resources defined in this application.
     *
     */
    public static void startServer() {
        // create a resource config that scans for JAX-RS resources and providers
        // in com.robin.restful package
        final ResourceConfig rc = new ResourceConfig();
        rc.packages("com.robin.restful");
        rc.register(JettisonFeature.class);

        // create and start a new instance of grizzly http server
        // exposing the Jersey application at URI
        // return GrizzlyHttpServerFactory.createHttpServer(URI.create(BASE_URI), rc);
        server = GrizzlyHttpServerFactory.createHttpServer(getUri(), rc);

        String corePoolSizeStr = ConfigUtil.getConfig("thread.core.pool.size");
        String maxPoolSizeStr = ConfigUtil.getConfig("thread.max.pool.size");
        int corePoolSize = 0;
        int maxPoolSize = 0;
        if ((corePoolSizeStr != null) && (!corePoolSizeStr.equals(""))) {
            corePoolSize = Integer.valueOf(corePoolSizeStr);
        }

        if ((maxPoolSizeStr != null) && (!maxPoolSizeStr.equals(""))) {
            maxPoolSize = Integer.valueOf(maxPoolSizeStr);
        }

        if ((corePoolSize == 0) || (maxPoolSize == 0)) {
            LOGGER.log(Level.INFO, "Use default thread pool configuration.");
            return;
        }

        if ((corePoolSize > maxPoolSize)) {
            LOGGER.log(Level.SEVERE, "Core pool size greater than max pool sixe in configuration.");
            LOGGER.log(Level.INFO, "Use default thread pool configuration.");
            return;
        }

        //參考http://jersey.576304.n2.nabble.com/It-s-very-hard-to-increase-the-number-of-worker-threads-in-Jersey-Grizzly-module-td7579570.html
        NetworkListener nl = server.getListener("grizzly");
        System.out.println(nl.toString());
        TCPNIOTransport transport = nl.getTransport();
        ThreadPoolConfig config = transport.getWorkerThreadPoolConfig();
        config.setCorePoolSize(corePoolSize);
        String info = "Set thread core pool size [" + corePoolSize + "].";
        LOGGER.log(Level.INFO, info);
        config.setMaxPoolSize(maxPoolSize);
        info = "Set thread max pool size [" + maxPoolSize + "].";
        LOGGER.log(Level.INFO, info);
        GrizzlyExecutorService threadPool = (GrizzlyExecutorService) transport.getWorkerThreadPool();
        threadPool.reconfigure(config);
    }

    /**
     * RestfulServer method.
     *
     * @param args
     */
    public static void main(String[] args) {
        startServer();
        if (server.isStarted()) {
            LOGGER.log(Level.INFO, "Start http server sucessfully.");
        } else {
            LOGGER.log(Level.SEVERE, "Start http server failed.");
        }
    }
}

  微服務入口Action介面

package com.robin.loader;
/**
 * <DT><B>描述:</B></DT>
 * <DD>微服務入口Action介面</DD>
 *
 * @version Version1.0
 * @author Robin
 * @version <I> V1.0 Date:2018-05-04</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public interface MircoServiceAction {
    public Object action(Object obj);
}

5.2 中文分詞微服務

  中文分詞微服務包括分詞方法有:RobinSeg(RS)、IKAnalyzer(IK)、JEAnalysis(JE)、MmSeg4j(MS)、PaoDing(PD)、SmallSeg4j(SS)。其中RS分詞實現見我的文章:知更鳥中文分詞RS設計實現 ,其他分詞方法都採用釋出的jar包進行封裝裝。

5.2.1 設計模式

  主要涉及外觀模式、介面卡模式、工廠模式和單例模式。分詞微服務類圖如圖5所示:

圖 5 分詞微服務類圖

  設計原則:(1)針對介面程式設計,不要針對實現;(2)只和最緊密的類互動;(3)封裝變化;(4)鬆耦合設計。   外觀模式:提供一個統一的介面,用來訪問子系統中的一群介面,外觀定義了一個高層介面,讓子系統更容易使用。我們採用統一的分詞外觀類封裝各種分詞介面,提供一個一致的高層介面。   介面卡模式:將一個類的介面,轉換成客戶期望的另一個介面。介面卡讓原本介面不相容的類可以合作無間。各種分詞的的私有實現介面需要一個提供一個統一的介面呼叫。   工廠模式:定義一個建立物件的介面,但有子類決定要例項化的類是哪一個。提供統一的分詞工廠,建立分類例項物件。   單例模式:確保一個類只有一個例項,並提供了一個全域性訪問點。由於各種分詞物件的建立、載入詞典等需要申請大量的記憶體,耗費大量的時間,所以所分詞器例項都通過介面卡進行控制只建立一個例項。

5.2.2 程式碼實現

中文分詞介面抽象類

package com.robin.segment;

import com.robin.log.RobinLogger;
import java.util.logging.Logger;

/**
 * <DT><B>描述:</B></DT>
 * <DD>中文分詞介面抽象類</DD>
 *
 * @version Version1.0
 * @author  Robin
 * @version <I> Date:2018-04-18</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public abstract class AbstractSegmenter {

    /** 日誌 */
    protected static final Logger LOGGER = RobinLogger.getLogger();

    /**
     * 分詞抽象方法
     *
     * @param text 文字
     * @param SEPARATOR 分隔符
     * @return 已分詞文字
     */
    public abstract String segment(String text, String SEPARATOR);
}

統一分詞器外觀類

package com.robin.segment;

import com.robin.log.RobinLogger;
import com.robin.segment.SegmentFactory.SegmentMethod;
import com.robin.segment.robinseg.RobinSeg;
import com.robin.segment.robinseg.SegmentArgs;
import java.util.logging.Logger;

/**
 * <DT><B>描述:</B></DT>
 * <DD>統一分詞器外觀類</DD>
 * <DD>外觀模式</DD>
 *
 * @version 1.0
 * @author Robin
 * @version <I> Date:2018-04-19</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public class SegmentFacade {

    // 日誌
    private static final Logger LOGGER = RobinLogger.getLogger();

    /**
     * 獲取分詞器配置引數物件
     *
     * @param methodName 分詞方法
     * @return SegmentArgs
     */
    public static SegmentArgs getSegmentArgsObj(SegmentMethod methodName) {
        AbstractSegmenter segment = SegmentFactory.getSegInstance(methodName);
        if (methodName.equals(SegmentMethod.RS)) {
            return ((RobinSeg) segment).getSegmentConfInstance();
        }
        return null;
    }

    /**
     * <DD>根據不同分詞演算法進行分詞,</DD>
     * <DD>傳入演算法名錯誤或預設情況下用RobinSeg分詞。</DD>
     *
     * @param methodName 分詞方法名稱,“SegmentMethod.IK”,“.JE”,“.MS”,“.PD”,“.SS”,
     * “.RS”
     * @param text 待分詞文字
     * @param separator 分隔符
     * @return 使用分隔符分好詞文字
     */
    public static String split(SegmentMethod methodName, String text, String separator) {
        AbstractSegmenter segmenter = SegmentFactory.getSegInstance(methodName);
        return segmenter.segment(text, separator);
    }
}

分詞Action實現類

package com.robin.segment.action;

import com.robin.loader.MircoServiceAction;
import com.robin.log.RobinLogger;
import com.robin.segment.SegmentFacade;
import com.robin.segment.SegmentFactory.SegmentMethod;
import com.robin.segment.robinseg.SegmentArgs;
import com.robin.segment.robinseg.SegmentArgs.SegAlgorithm;
import java.util.HashSet;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;

/**
 * <DT><B>描述:</B></DT>
 * <DD>分詞Action實現類</DD>
 *
 * @version Version1.0
 * @author Robin
 * @version <I> V1.0 Date:2018-06-05</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public class SegmentAction implements MircoServiceAction {

    private static final Logger LOGGER = RobinLogger.getLogger();

    public enum StatusCode {
        OK,
        JSON_ERR,
        KIND_ERR,
        VERSION_ERR,
        SEGMETHOD_ERR,
        SEPARATOR_ERR,
        SEGMENT_FAILED,
        TEXTS_NULL,
    }

    private class ActionStatus {

        StatusCode statusCode;
        String msg;

    }

    private JSONObject getErrorJson(ActionStatus actionStatus) {
        JSONObject errJson = new JSONObject();
        try {
            errJson.put("status", actionStatus.statusCode.toString());
            errJson.put("msg", actionStatus.msg);
        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }
        return errJson;
    }

    private ActionStatus checkJSONObjectTerm(JSONObject jsonObj,
            String key,
            HashSet<String> valueSet,
            StatusCode errStatusCode) {
        ActionStatus actionStatus = new ActionStatus();

        try {
            if (!jsonObj.isNull(key)) {
                String value = jsonObj.getString(key);
                if (!valueSet.contains(value)) {
                    actionStatus.msg = "The value [" + value + "] of " + key + " is error.";
                    actionStatus.statusCode = errStatusCode;
                    return actionStatus;
                }
            } else {
                actionStatus.msg = "The input parameter is missing " + key + ".";
                actionStatus.statusCode = errStatusCode;
                return actionStatus;
            }

        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }

        actionStatus.statusCode = StatusCode.OK;
        return actionStatus;
    }

    private ActionStatus checkInputJSONObject(JSONObject jsonObj)