1. 程式人生 > >基於Kubernetes的機器學習微服務系統設計系列——(三)RESTful微服務框架

基於Kubernetes的機器學習微服務系統設計系列——(三)RESTful微服務框架

 內容提要

  基於Kubernetes的機器學習微服務系統設計——(1)概念與構想

  為了微服務的介面互動統一,本系統採用統一的框架模式。採用Jersey軟體框架,Jersey 是開源的RESTful框架, 實現了JAX-RS (JSR 311 & JSR 339) 規範。

微服務框架

  RESTful框架實現流程如圖所示:

RESTful框架實現流程圖

  高層次的介面卡模式的應用,將業務服務與RESTful通訊解耦,業務僅關注自己的實現。

框架實現

  配置檔案config.properties內容如下:

#restful API config
listen.ip=0.0.0.0
listen.port=8084

#thread pool config
thread.core.pool.size=4
thread.max.pool.size=4

#mirco server config
mircoServer.name=business
jar.path=file:business-1.0.jar
jar.actionClass=com.robin.action.BusinessAction

#log config
log.path=
log/ log.prefix=business # Level.ALL Level.FINEST Level.FINER Level.FINE Level.CONFIG # Level.INFO Level.WARNING Level.SEVERE Level.OFF log.level=Level.INFO log.file.limit=1048576 log.file.count=3

  通用資源類:

/**
 * <DT><B>描述:</B></DT>
 * <DD>通用資源類</DD>
 *
 * @version Version1.0
 * @author  Robin
 * @version <I> V1.0 Date:2018-05-21</I>
 * @author  <I> E-mail:
[email protected]
</I> */
@Path("robin") public class CommonResource { // 日誌 private static final Logger LOGGER = RobinLogger.getLogger(); // 微服務 private static MircoServiceAction mircoServer; // 配置的微服務名稱 private static final String CFG_MS_NAME; static { // 微服務名稱配置檔案檢查 CFG_MS_NAME = ConfigUtil.getConfig("mircoServer.name"); String jarPath = ConfigUtil.getConfig("jar.path"); URL url = null; try { url = new URL(jarPath); } catch (MalformedURLException ex) { LOGGER.log(Level.SEVERE, ex.getMessage()); } URLClassLoader classLoader = new URLClassLoader(new URL[]{url}, Thread.currentThread() .getContextClassLoader()); Class<?> actionClass = null; try { String actionClassName = ConfigUtil.getConfig("jar.actionClass"); actionClass = (Class<?>) classLoader.loadClass(actionClassName); } catch (ClassNotFoundException ex) { LOGGER.log(Level.SEVERE, ex.getMessage()); } if (null == actionClass) { LOGGER.log(Level.SEVERE, "actionClass is null"); System.exit(-1); } try { mircoServer = (MircoServiceAction) actionClass.newInstance(); } catch (InstantiationException | IllegalAccessException ex) { LOGGER.log(Level.SEVERE, ex.getMessage()); } } /** * Method handling HTTP GET requests. The returned object will be sent to * the client as "application/json" media type. * * @return String that will be returned as a application/json response. */ @GET @Produces(MediaType.TEXT_PLAIN) public String getIt() { String cfgMsName = ConfigUtil.getConfig("mircoServer.name"); return "Micro server [" + cfgMsName + "] is running...\n"; } @POST @Path("{microService}") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) public JSONObject requestService( @PathParam("microService") String serverName, JSONObject reqJson) { JSONObject rspJson = null; if (!serverName.equals(CFG_MS_NAME)) { rspJson = new JSONObject(); try { rspJson.put("status", "ERROR"); rspJson.put("msg", "Mirco server name [" + serverName + "] error."); } catch (JSONException ex) { LOGGER.log(Level.SEVERE, ex.getMessage()); } return rspJson; } if (null != mircoServer) { rspJson = (JSONObject) mircoServer.action(reqJson); } return rspJson; } }

  Restful服務類:

/**
 * <DT><B>描述:</B></DT>
 * <DD>Restful服務類</DD>
 *
 * @version Version1.0
 * @author  Robin
 * @version <I> V1.0 Date:2018-05-22</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public class RestfulServer {

    private static final Logger LOGGER = RobinLogger.getLogger();
    private static URI uri;
    private static HttpServer server;

    public static HttpServer getServer() {
        return server;
    }

    public static URI getUri() {
        if (null == uri) {
            String listenAddr = ConfigUtil.getConfig("listen.ip");
            String listenPort = ConfigUtil.getConfig("listen.port");
            String baseUri = "http://" + listenAddr + ":" + listenPort + "/";
            uri = URI.create(baseUri);
        }
        return uri;
    }

    /**
     * Starts Grizzly HTTP server exposing JAX-RS resources defined in this application.
     *
     */
    public static void startServer() {
        // create a resource config that scans for JAX-RS resources and providers
        // in com.robin.restful package
        final ResourceConfig rc = new ResourceConfig();
        rc.packages("com.robin.restful");
        rc.register(JettisonFeature.class);

        // create and start a new instance of grizzly http server
        // exposing the Jersey application at URI
        // return GrizzlyHttpServerFactory.createHttpServer(URI.create(BASE_URI), rc);
        server = GrizzlyHttpServerFactory.createHttpServer(getUri(), rc);

        String corePoolSizeStr = ConfigUtil.getConfig("thread.core.pool.size");
        String maxPoolSizeStr = ConfigUtil.getConfig("thread.max.pool.size");
        int corePoolSize = 0;
        int maxPoolSize = 0;
        if ((corePoolSizeStr != null) && (!corePoolSizeStr.equals(""))) {
            corePoolSize = Integer.valueOf(corePoolSizeStr);
        }

        if ((maxPoolSizeStr != null) && (!maxPoolSizeStr.equals(""))) {
            maxPoolSize = Integer.valueOf(maxPoolSizeStr);
        }

        if ((corePoolSize == 0) || (maxPoolSize == 0)) {
            LOGGER.log(Level.INFO, "Use default thread pool configuration.");
            return;
        }

        if ((corePoolSize > maxPoolSize)) {
            LOGGER.log(Level.SEVERE, "Core pool size greater than max pool sixe in configuration.");
            LOGGER.log(Level.INFO, "Use default thread pool configuration.");
            return;
        }

        //參考http://jersey.576304.n2.nabble.com/It-s-very-hard-to-increase-the-number-of-worker-threads-in-Jersey-Grizzly-module-td7579570.html
        NetworkListener nl = server.getListener("grizzly");
        System.out.println(nl.toString());
        TCPNIOTransport transport = nl.getTransport();
        ThreadPoolConfig config = transport.getWorkerThreadPoolConfig();
        config.setCorePoolSize(corePoolSize);
        String info = "Set thread core pool size [" + corePoolSize + "].";
        LOGGER.log(Level.INFO, info);
        config.setMaxPoolSize(maxPoolSize);
        info = "Set thread max pool size [" + maxPoolSize + "].";
        LOGGER.log(Level.INFO, info);
        GrizzlyExecutorService threadPool = (GrizzlyExecutorService) transport.getWorkerThreadPool();
        threadPool.reconfigure(config);
    }

    /**
     * RestfulServer method.
     *
     * @param args
     */
    public static void main(String[] args) {
        startServer();
        if (server.isStarted()) {
            LOGGER.log(Level.INFO, "Start http server sucessfully.");
        } else {
            LOGGER.log(Level.SEVERE, "Start http server failed.");
        }
    }
}

  微服務入口Action介面

package com.robin.loader;
/**
 * <DT><B>描述:</B></DT>
 * <DD>微服務入口Action介面</DD>
 *
 * @version Version1.0
 * @author Robin
 * @version <I> V1.0 Date:2018-05-04</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public interface MircoServiceAction {
    public Object action(Object obj);
}