1. 程式人生 > >大資料之電話日誌分析callLog案例(四)

大資料之電話日誌分析callLog案例(四)

一、修改kafka資料在主題中的貯存時間,預設是7天
-------------------------------------------------
    [kafka/conf/server.properties]
    log.retention.hours=1

二、使用hive進行聚合查詢
----------------------------------------------------
    1.hive命令列查詢
        //查詢caller = xxx 的所有通話記錄,並按照calltime分組
        select count(*) , calltime from ext_calllogs_in_hbase where caller = '15338597777' group by calltime ;
        //查詢caller = xxx 的所有通話記錄,並按照月份進行分組
        select count(*) , substr(calltime,1,6) from ext_calllogs_in_hbase where caller = '15338597777' group by substr(calltime,1,6) ;
        //查詢caller = xxx ,年份是2017年的所有通話記錄,並按照月份進行分組
        select count(*) , substr(calltime,1,6) from ext_calllogs_in_hbase where caller = '15338597777' and substr(calltime,1,4) == '2018' group by substr(calltime,1,6) ;
        //查詢所有使用者的各個月份的通話次數
        select caller,substr(calltime,1,6) , count(*) from ext_calllogs_in_hbase group by caller , substr(calltime,1,6) ;


    2.程式設計實現:指定號碼指定年份中各月份的通話次數

        a. 在HiveService類中新增新的方法
          
/**
           * 查詢指定人員指定年份中各個月份的通話次數
           */
          public List<CalllogStat> statCalllogsCount(String caller, String year){
              List<CalllogStat> list = new ArrayList<CalllogStat>() ;
              try {
                  Connection conn = DriverManager.getConnection(url);
                  Statement st = conn.createStatement();
                  //拼串: select count(*) , substr(calltime,1,6) from ext_calllogs_in_hbase where caller = '15338597777'
                  //      and substr(calltime,1,4) == '2018' group by substr(calltime,1,6) ;
                  String sql = "select count(*) ,substr(calltime,1,6) from ext_calllogs_in_hbase " +
                          "where caller = '" + caller+"' and substr(calltime,1,4) == '" + year
                          + "' group by substr(calltime,1,6)";
                  ResultSet rs = st.executeQuery(sql);
                  Calllog log = null;
                  while (rs.next()) {
                      CalllogStat logSt = new CalllogStat();
                      logSt.setCount(rs.getInt(1));
                      logSt.setYearMonth(rs.getString(2));
                      list.add(logSt);
                  }
                  rs.close();
                  return list;
              } catch (Exception e) {
                  e.printStackTrace();
              }
              return null;
          }

        b.在Controller中新增新的控制器
            
/**
             * 統計指定人員,指定月份的通話次數
             */
            @RequestMapping("/calllog/toStatCalllog")
            public String toStatCalllog(){
                return "calllog/statCalllog" ;
            }

            /**
             * 統計指定人員,指定月份的通話次數
             */
            @RequestMapping("/calllog/statCalllog")
            public String statCalllog(Model m ,@RequestParam("caller") String caller ,@RequestParam("year") String year){
                List<CalllogStat> list = hcs.statCalllogsCount(caller, year);
                m.addAttribute("stat" , list) ;
                return "calllog/statCalllog" ;
            }

        c.編寫前端介面顯示
           
 <%@ page contentType="text/html;charset=UTF-8" language="java" %>
            <%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
            <html>
            <head>
                <title>通話記錄統計結果</title>
                <link rel="stylesheet" type="text/css" href="../css/my.css">
                <script type="text/javascript" src="../js/jquery-3.2.0.min.js"></script>
                <script type="text/javascript">

                    //定義函式
                    function refreshTable(){
                        $("#t1 tbody").empty();
                        $.getJSON("/calllog/json/findAll", function (data) {
                            $.each(data, function (i, obj) {
                                var str = "<tr><td>" + obj.caller + "</td>";
                                str = str + "<td> " + obj.callerName + "</td>";
                                str = str + "<td> " + obj.callee + "</td>";
                                str = str + "<td> " + obj.calleeName + "</td>";
                                str = str + "<td></td>";
                                str = str + "<td> " + obj.callTime + "</td>";
                                str = str + "<td> " + obj.callDuration + "</td>";
                                str = str + "</tr>";
                                $("#t1 tbody").append(str);
                            });
                        });
                    }

                    $(function(){
                        setInterval(refreshTable, 2000);
                    })
                </script>
            </head>
            <body>
            <form action='<c:url value="/calllog/statCalllog" />' method="post">
                電話號碼 : <input type="text" name="caller"><br>
                年 份:  <input type="text" name="year"><br>
                <input type="submit" name="查詢">
            </form>
            <br>
            <table id="t1" border="1px" class="t-1" style="width: 800px">
                <thead>
                <tr>
                    <td>月份</td>
                    <td>次數</td>
                </tr>
                </thead>
                <tbody>
                <c:forEach items="${stat}" var="s">
                    <tr>
                        <td><c:out value="${s.yearMonth}"/></td>
                        <td><c:out value="${s.count}"/></td>
                    </tr>
                </c:forEach>
                </tbody>

            </table>
            </body>
            </html>

        d.執行ssm app,輸入網址進行測試

三、Linux使用awk命令批量按照java程序名稱kill程序
------------------------------------------------------
    1.awk就是把檔案逐行的讀入,以空格為預設分隔符將每行切片,切開的部分再進行各種分析處理
        $> jps | awk '{print $1}';          //列印第一列
        $> jps | awk -F'.' {print $1}';          //指定分隔符'.'(預設為空格)

    2.shell程式設計關閉kafka
        $> kill -9  `jps | grep Kafka | awk '{print $1}'`;

    3.動態提取ip
        $> ifconfig | grep inet | head -1 | awk '{print $2}';


四、快捷bash指令碼改造
-------------------------------------------------------
    1.[xkill.sh]
        #!/bin/bash

        pids=`jps | grep $1 | awk '{print $1}'`

        for pid in $pids ; do
            kill -9 $pid
        done

    2.[xcall.sh]
        #!/bin/bash
        [email protected]
        i=201
        for (( i=201 ; i <= 206 ; i = $i + 1 )) ; do
            tput setaf 2
            echo ============= s$i =============
            tput setaf 7
            ssh -4 s$i "source /etc/profile ; $params"
        done


    3.開啟kafka叢集
    [/usr/local/bin/xkafka-cluster-start.sh]
        #!/bin/bash
        servsers="s200 s300 s400"
        for s in $servers ; do
            ssh $s "source /etc/profile ; kafka-server-start.sh -daemon /soft/kakfa/config/server.properties"
        done

    4.啟動zk叢集
    [/usr/local/bin/xzk-cluster-start.sh]
        #!/bin/bash
        servers="s100 s200 s300"
        for s in $servers ; do
            ssh $s "source /etc/profile ; zkServer.sh start"
        done

    5.xconsumer-start.sh
    [/usr/local/bin/xconsumer-start.sh]
        #!/bin/bash
        cd /home/centos/KafkaHbaseConsumer
        run.sh &

    6.s201:xflume-calllog-start.sh
    [/usr/local/bin/xconsumer-start.sh]
        #!/bin/bash
        cd /soft/flume/conf
        flume-ng agent -f calllog.conf -n a1 &


五、修改CalllogController,解決json亂碼問題
------------------------------------------------------------
    
/**
     * 模擬最底層的request和response,實現直接返回json串到前端頁面
     */
    @RequestMapping("calllog/json/findAll")
    public String findAllJson(HttpServletResponse response)
    {
        try {
            List<Calllog> list = cs.findAll();
            String jsonStr = JSONArray.toJSONString(list);
            //設定迴應的資料型別是json串
            response.setContentType("application/json");
            response.setCharacterEncoding("utf-8");
            //得到傳送給客戶端的輸出流
            ServletOutputStream sos = response.getOutputStream();
            sos.write(jsonStr.getBytes("utf-8"));
            sos.flush();
            sos.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

六、使用echarts實現資料視覺化 -- 柱狀圖
-----------------------------------------------------
    1.下載echarts指令碼,並放圖ssm專案的web/js目錄下
        http://echarts.baidu.com/dist/echarts.js

    2.echarts入門演示bar.html -- 柱狀圖
       
 <!DOCTYPE html>
        <html lang="en">
        <head>
            <meta charset="UTF-8">
            <title>bar.html</title>
            <script src="../js/jquery-3.2.0.min.js"></script>
            <script src="../js/echarts.js"></script>
            <script>
                $(function () {
                    var myChart = echarts.init(document.getElementById('main'));
                    var option = {
                        title: {
                            text: 'xxxx2018年度各月份通話次數'
                        },
                        tooltip: {},
                        legend: {
                            data: ['通話次數']
                        },
                        xAxis: {
                            data: ["1月份", "2月份", "3月份"]
                        },
                        yAxis: {},
                        series: [{
                            name: '通話次數',
                            type: 'bar',
                            data: [100, 300, 280]
                        }]
                    };
                    myChart.setOption(option);
                })
            </script>
        </head>
        <body>
        <div id="main" style="border:1px solid blue;width:600px;height:400px;">
        </div>

        </body>
        </html>

    3.修改統計的jsp介面statCalllog.jsp
       
 <%@ page contentType="text/html;charset=UTF-8" language="java" %>
        <%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
        <html>
        <head>
            <title>通話記錄統計結果</title>
            <link rel="stylesheet" type="text/css" href="../css/my.css">
            <script type="text/javascript" src="../js/jquery-3.2.0.min.js"></script>
            <script src="../js/echarts.js"></script>
            <script>
                $(function () {
                    var myChart = echarts.init(document.getElementById('main'));
                    var option = {
                        title: {
                            text: '<c:out value="${title}" />'
                        },
                        tooltip: {},
                        legend: {
                            data: ['通話次數']
                        },
                        xAxis: {
                            data: [<c:forEach items="${list}" var="e">'<c:out value="${e.yearMonth}"/>',</c:forEach>]
                        },
                        yAxis: {},
                        series: [{
                            name: '通話次數',
                            type: 'bar',
                            data: [<c:forEach items="${list}" var="e"><c:out value="${e.count}"/>,</c:forEach>]
                        }]
                    };
                    myChart.setOption(option);
                })
            </script>
        </head>
        <body>
            <form action='<c:url value="/calllog/statCalllog" />' method="post">
                電話號碼 : <input type="text" name="caller"><br>
                年 份:  <input type="text" name="year"><br>
                <input type="submit" name="查詢">
            </form>
            <div id="main" style="border:1px solid blue;width:600px;height:400px;">
            </div>


        </body>
        </html>

    4.修改CallLogController.java
       
 /**
         * 統計指定人員,指定月份的通話次數
         */
        @RequestMapping("/calllog/statCalllog")
        public String statCalllog(Model m ,@RequestParam("caller") String caller ,@RequestParam("year") String year){
            List<CalllogStat> list = hcs.statCalllogsCount(caller, year);
            m.addAttribute("title", caller + "在" + year + "年各月份通話次數統計");
            m.addAttribute("list", list);
            return "calllog/statCalllog" ;
        }

七、ganglia
-----------------------------
    1.ganglia 介紹
        叢集監控.
        不僅能夠監控單個主機的資源情況,還可以對叢集整個資源進行統計。
        gmond        //在每個節點收集資源資料的。
        gmetad       //接受每個節點發送資源資料
        gweb         //webui,展示資料web程式,和gmetad通訊。

    2.Centos安裝ganglia
        a.ganglia-gmond
            所有節點。
            $>sudo yum install -y ganglia-gmond
            $>sudo apt-get install -y ganglia-gmond

        b.ganglia-gmetad
            s201
            $>sudo yum install -y ganglia-gmetad

        c.ganglia-gweb
            [s201]
            1)安裝依賴
                $>sudo yum install -y httpd php

            2)下載ganglia-web-3.5.12.tar.gz程式
                wget http://ncu.dl.sourceforge.net/project/ganglia/ganglia-web/3.5.12/ganglia-web-3.5.12.tar.gz

            3)tar開檔案

            4)修改Makefile檔案
                ...

            5)啟動服務
                [s201]
                $>sudo service httpd start
                $>sudo service gmetad start
                $>sudo service gmond start

                [s202]
                $>sudo service gmond start

    3.使用 yum方式安裝如果出現沒有可用源
        a.換源(aliyan-->)
            對於大資料生態圈的專案,cloudera的倉庫比較全,而且沒有bug.
            cloudera-cdh-5.repo --> /etc/yum.repo.d/下

        b.清除快取
            $>sudo yum cleanall

        c.重建快取
            $>sudo yum make cache

        d.繼續通過yum安裝

    4.Ubuntu安裝ganglia
    ---------------------------------
        1.在Master節點s100上安裝monitor和webfrontend[slave節點不需要安裝web]
            $s100> sudo apt-get update
            $s100> sudo apt-get install ganglia-monitor rrdtool gmetad ganglia-webfrontend

        2.配置ganglia    mycluster
            a.複製 Ganglia webfrontend Apache 配置:
                $s100> sudo cp /etc/ganglia-webfrontend/apache.conf /etc/apache2/sites-enabled/ganglia.conf

            b.編輯 Ganglia 元守護程式的配置檔案:
                $s100> sudo nano /etc/ganglia/gmetad.conf
                找到     "data_source "my cluster" localhost"
                更改為   data_source "mycluster" 192.168.43.131:8649
                //列出機器服務的資料來源[ha 叢集name],叢集數量,IP:埠,如果未指定埠號8649(預設gmond埠)。

            c.編輯主節點的配置檔案
                $s100> sudo nano /etc/ganglia/gmond.conf
                [------]
                    cluster {
                    name = "mycluster"          //unspecified修改為mycluster
                    owner = "unspecified"
                    latlong = "unspecified"
                    url = "unspecified"
                    }

                [-----]
                    /* Feel free to specify as many udp_send_channels as you like. Gmond
                    used to only support having a single channel */
                    udp_send_channel {
                    # mcast_join = 239.2.11.71
                    host = 192.168.43.131           //新增一行,master ip
                    port = 8649
                    ttl = 1
                    }

                [-----]
                    /* You can specify as many udp_recv_channels as you like as well. */
                    udp_recv_channel {
                    # mcast_join = 239.2.11.71       //註釋掉
                    port = 8649
                    #bind = 239.2.11.71             //註釋掉
                    }

            d.儲存退出

        3.配置slave節點配置[s200 s300 s400 s500]
          將master修改好的/etc/ganglia/gmond.conf 複製到slave個節點,替換原有檔案。
            $s100> scp gmond.conf  [email protected]:/etc/ganglia

        4.啟動hadoop、hbase叢集
          start-dfs.sh
          start-yarn.sh
          start-hbase.sh

        5.啟動ganglia
            [方式一]
                $> sudo service ganglia-monitor start(每臺機器都需要啟動)
                $> sudo service gmetad start(在安裝了ganglia-webfrontend的機器上啟動
                $> sudo /etc/init.d/apache2 restart(在主機上重啟apache2)

            [方式二]  或者:使用apt-get方式安裝的Ganglia,可以直接用service方式啟動
              $s100> sudo /etc/init.d/ganglia-monitor start
              $s100> sudo /etc/init.d/gmetad start
              $s100> sudo /etc/init.d/apache2 restart

        6.驗證是否安裝成功
            http://localhost/ganglia/


八、使用udp協議實現程序監控
----------------------------------------------------------
    1.在資料生成模組中新建類udp.HeartBeatThread
    ---------------------------------------------------
          
package udp;

          import calllog.gen.main.PropertiesUtil;

          import java.io.IOException;
          import java.net.*;

          /**
           * 工具類
           * 傳送心跳資訊,證明程式還活著
           * 監控使用
           */
          public class HeartBeatThread extends Thread{

              private DatagramSocket sock;

              public HeartBeatThread()
              {
                  try {
                      sock = new DatagramSocket(PropertiesUtil.getInt("heartbeat.udp.send.port"));
                      //守護執行緒
                      this.setDaemon(true);

                  } catch (SocketException e) {
                      e.printStackTrace();
                  }
              }

              @Override
              public void run() {
                  byte[] bs = new byte[1];
                  bs[0] = (byte)PropertiesUtil.getInt("heartbeat.udp.send.flag");
                  DatagramPacket packet = new DatagramPacket(bs,1);
                  String bcAddr = PropertiesUtil.getString("heartbeat.udp.send.bcAddr");
                  int bcPort = PropertiesUtil.getInt("heartbeat.udp.send.bcPort");
                  packet.setSocketAddress(new InetSocketAddress(bcAddr,bcPort));
                  while(true)
                  {
                      try {
                          sock.send(packet);
                          Thread.sleep(PropertiesUtil.getInt("heartbeat.udp.send.sleep.ms"));
                          System.out.println("資料生成模組,傳送1次心跳" + bs[0]);
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
                  }
              }
          }

    2.修改資料生成App主程式的main函式,啟動app的時候,同時開啟監控執行緒
       
 public static void main(String [] args)
        {
            genCallLog();
            //開啟監控執行緒
            new HeartBeatThread().start();
        }

    3.在SSM模組中新增監控類com.ssm.monitor.MonitorService ,用於監控其他程序傳送的心跳資訊
      
  package com.ssm.monitor;

        import com.it18zhang.ssm.domain.HeartBeat;
        import com.it18zhang.ssm.util.PropertiesUtil;
        import org.springframework.stereotype.Service;

        import java.net.DatagramPacket;
        import java.net.DatagramSocket;
        import java.net.InetSocketAddress;
        import java.net.SocketException;
        import java.util.ArrayList;
        import java.util.List;

        /**
         * 監控其他程式心跳資訊的類
         */

        @Service("monitorService")
        public class MonitorService extends Thread{

            private ReceiveThread t;
            public MonitorService()
            {
                //開啟監控執行緒
                t = new ReceiveThread();
                t.start();
            }

            public List<HeartBeat> getHeartBeats()
            {
                return new ArrayList<HeartBeat>(t.map.values());
            }
        }


    4.在SSM模組中新增監控分執行緒類com.ssm.monitor.ReceviceThread
        
 package com.ssm.monitor;

         import com.it18zhang.ssm.domain.HeartBeat;
         import com.it18zhang.ssm.util.PropertiesUtil;

         import java.net.DatagramPacket;
         import java.net.DatagramSocket;
         import java.net.InetSocketAddress;
         import java.net.SocketException;
         import java.util.ArrayList;
         import java.util.HashMap;
         import java.util.List;
         import java.util.Map;

         public class ReceiveThread extends Thread{

             private DatagramSocket sock;
             //ip + 最後一次收到心跳的時間戳
             public Map<String, HeartBeat> map = new HashMap<String, HeartBeat>();

             public ReceiveThread() {

                 try{
                     //設定接收埠
                     sock = new DatagramSocket(PropertiesUtil.getInt("heartbeat.udp.receive.port"));
                     //守護執行緒
                     this.setDaemon(true);

                     System.out.println("開始接收心跳 ...");
                 }
                 catch(Exception e)
                 {
                     e.printStackTrace();
                 }
             }

             @Override
             public void run() {
                 byte[] bs = new byte[1];
                 DatagramPacket packet = new DatagramPacket(bs,1);
                 while(true)
                 {
                     try {
                         sock.receive(packet);
                         int flag = bs[0];
                         InetSocketAddress addr = (InetSocketAddress) packet.getSocketAddress();
                         String sendIp = addr.getAddress().getHostAddress();
                         map.put(sendIp,new HeartBeat(sendIp, flag, System.currentTimeMillis()));
                         System.out.println("接收心跳" + flag);
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
                 }
             }
         }




    5.在ssm的domain包中新增新的javabean類,封裝心跳資訊HeartBeat類
      
  package com.it18zhang.ssm.domain;

        public class HeartBeat {

            //udp傳送方的ip
            private String ip;
            //傳送的訊息內容
            private int flag;
            //最近一次接收到訊息時的事件
            private long ts;

            public HeartBeat() {

            }

            public HeartBeat(String ip, int flag, long ts) {
                this.ip = ip;
                this.flag = flag;
                this.ts = ts;
            }

            public String getIp() {
                return ip;
            }

            public void setIp(String ip) {
                this.ip = ip;
            }

            public int getFlag() {
                return flag;
            }

            public void setFlag(int flag) {
                this.flag = flag;
            }

            public long getTs() {
                return ts;
            }

            public void setTs(long ts) {
                this.ts = ts;
            }
        }

    6.在消費者模組中拷貝心跳傳送程式碼HeartBeatThread和工具類和修改kafka配置檔案,新增心跳屬性
        [kafka.propeties]
        zookeeper.connect=s100:2181,s200:2181,s300:2181
        group.id=calllog
        zookeeper.session.timeout.ms=500
        zookeeper.sync.time.ms=250
        auto.commit.interval.ms=1000
        #從頭消費
        auto.offset.reset=smallest
        #主題
        topic=calllog
        #表名
        table.name=call:calllogs
        #分割槽數
        partition.number=100
        #主叫標記
        caller.flag=0
        #hash區域的模式
        hashcode.pattern=00

        heartbeat.udp.send.port=6666
        heartbeat.udp.send.flag=3

        heartbeat.udp.send.bcAddr=192.168.43.255
        heartbeat.udp.send.bcPort=9999
        heartbeat.udp.send.sleep.ms=1000

    7.修改HbaseCustomer類,在main函式中啟動心跳傳送執行緒
     
   package calllog.kafka.hbase.customer;

        import kafka.consumer.Consumer;
        import kafka.consumer.ConsumerConfig;
        import kafka.message.MessageAndMetadata;

        import java.io.IOException;
        import java.io.InputStream;
        import java.util.HashMap;
        import java.util.List;
        import java.util.Map;
        import java.util.Properties;
        import kafka.consumer.ConsumerIterator;
        import kafka.consumer.KafkaStream;
        import kafka.javaapi.consumer.ConsumerConnector;

        import java.util.Properties;

        /**
         * hbase消費者,從kafka獲取日誌資訊,儲存到hbase中
         */
        public class HbaseCustomer {


            public static void main(String [] args)
            {
                //開啟心跳傳送
                new HeartBeatThread().start();

                //hbasedao
                HbaseDao dao = new HbaseDao();
                //建立消費者配置檔案
                ConsumerConfig config = new ConsumerConfig(PropertiesUtil.props);
                //建立消費者
                ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(PropertiesUtil.props));
                //繫結主題
                String topic = PropertiesUtil.getPorp("topic");

                Map<String, Integer> map = new HashMap<String, Integer>();
                map.put(topic, new Integer(1));

                //開始消費
                Map<String, List<KafkaStream<byte[], byte[]>>> kafkaMsg = consumer.createMessageStreams(map);
                List<KafkaStream<byte[], byte[]>> msgList = kafkaMsg.get(topic);

                String kafka_hbaseMsg = "";
                for(KafkaStream<byte[],byte[]> msg : msgList)
                {
                    ConsumerIterator<byte[],byte[]> mm = msg.iterator();
                    while (mm.hasNext()) {
                        MessageAndMetadata<byte[], byte[]> next = mm.next();
                        byte [] m = next.message();
                        //獲取訊息
                        kafka_hbaseMsg = new String(m);
                        //寫入hbase
                        dao.put(kafka_hbaseMsg);
                    }
                }
            }
        }

    8.在ssm模組中編寫網頁MonitorController
    --------------------------------------
       
 package com.it18zhang.ssm.web.controller;


        import com.alibaba.fastjson.JSON;
        import com.it18zhang.ssm.domain.HeartBeat;
        import com.ssm.monitor.MonitorService;
        import org.springframework.stereotype.Controller;
        import org.springframework.web.bind.annotation.RequestMapping;
        import org.springframework.web.bind.annotation.RequestParam;

        import javax.annotation.Resource;
        import java.util.List;

        @Controller
        public class MonitorController {

            @Resource(name="monitorService")
            private MonitorService ms;

            @RequestMapping("/monitor/toMonitorPage")
            public String toMonitorPage()
            {
                return "monitor/monitorPage";
            }

            @RequestMapping("/json/monitor/getMonitorInfo")
            public String getMonitorInfo(@RequestParam("heartbeat")HeartBeat ht)
            {
                List<HeartBeat> list = ms.getHeartBeats();
                return JSON.toJSONString(list);
            }

        }

    9.編寫jsp介面monitor/monitorPage.jsp
        
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
        <%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
        <html>
        <head>
            <title>通話記錄</title>
            <link rel="stylesheet" type="text/css" href="../css/my.css">
            <script type="text/javascript" src="../js/jquery-3.2.0.min.js"></script>
            <script type="text/javascript" >

                //定義函式
                function getHearbeatInfo(){
                    $("#div1").empty();
                    $.getJSON("/json/monitor/getMonitorInfo", function (data) {
                        $("#div1").append(data);
                    });
                }

                $(function(){
                    setInterval(getHearbeatInfo, 1000);
                })

            </script>
        </head>
        <body>

        <div id="div1" style="border:1px solid blue;width: 400px ; height: 300px">
        </div>


        </body>
        </html>


    10.將生成資料程式和消費者程式打包,扔到ubuntu上執行,開啟ssm網頁進行測試