1. 程式人生 > >java 執行緒池管理多執行緒操作Hbase資料庫完整專案

java 執行緒池管理多執行緒操作Hbase資料庫完整專案

在這裡插入圖片描述
Hbase-site.xml配置檔案:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://叢集1(zhz100):9000/hbase</value>
    </property>
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <property>
        <name>hbase.master</name>
        <value>叢集(zhz100):60000</value>
    </property>
    <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/usr/java/zookeeper3.4.10/temp</value>
    </property>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>叢集1(zhz100),叢集2,叢集3</value>
    </property>
    <property>
        <name>hbase.zookeeper.property.clientPort</name>
        <value>2181</value>
    </property>
</configuration>

配置application.yml:

	server:
	port: 8080
	
	hbase:
	    conf:
	        confMaps:
	        'hbase.zookeeper.quorum' : 'IP地址1:2181,IP地址2:2181'

pom.xml 引入的依賴:

   <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.2</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.2.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-hadoop</artifactId>
        <version>2.5.0.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.5.1</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-hadoop-core</artifactId>
        <version>2.4.0.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase</artifactId>
        <version>1.2.1</version>
        <type>pom</type>
    </dependency>
    <dependency>
        <groupId>ch.ethz.ganymed</groupId>
        <artifactId>ganymed-ssh2</artifactId>
        <version>build210</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </exclusion>
        </exclusions>
    </dependency>

Test含主函式main:

package com.example.demo;
import org.apache.hadoop.hbase.client.Connection;
import sun.net.ftp.FtpClient;
import sun.net.ftp.FtpProtocolException;

import java.io.*;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class Test {
    FtpClient ftpClient;

/**
 * 連線FTP服務
 * @param url //IP地址
 * @param port//埠號
 * @param username//使用者名稱
 * @param password//密碼
 * @return
 */
public static FtpClient connectFTP(String url, int port, String username, String password) {

         //建立ftp
     FtpClient ftp = null;
    try {
        //建立地址
        SocketAddress addr = new InetSocketAddress(url, port);
        //連線
        ftp = FtpClient.create();
        ftp.connect(addr);
        //登陸
        ftp.login(username, password.toCharArray());
        ftp.setBinaryType();

    } catch (FtpProtocolException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
    return ftp;
}

public static List<String> download(String ftpFile, FtpClient ftp) {
    synchronized (TimerWatch.class) {
        List<String> list = new ArrayList<String>();
        String str = "";
        InputStream is = null;
        BufferedReader br = null;
        try {
            // 獲取ftp上的檔案
            is = ftp.getFileStream(ftpFile);
            //轉為位元組流
            br = new BufferedReader(new InputStreamReader(is));
            while ((str = br.readLine()) != null) {
                list.add(str);
            }
            br.close();
        } catch (FtpProtocolException e) {
            e.printStackTrace();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return list;
    }
}
    public static void main(String[] args) throws Exception{

	//開啟主機1FTP伺服器
        FtpClient ftp = connectFTP("主機1IP"埠號,"賬號","密碼");
	//開啟主機1Linux指令
        RemoteExecuteCommand rec = new RemoteExecuteCommand("主機1IP", "賬號", "密碼"); 
        //連線資料庫
        Connection connection=LinkHbase.table();
	//如果出現宕機或者出現異常導致專案停止執行,此段程式碼可以將未成功插入的資料所在的檔案就會儲存在work目錄中。重啟後對work進行解析就會避免資料丟失,此方法只會在啟動時執行一次。            
         Sync.Judge(connection,ftp,rec);
        //開啟一個執行緒池,corePoolSize:核心執行緒數5;maxPoolSize:最大執行緒 數10; keepAliveTime:執行緒空閒時間:200微妙;ArrayBlockingQueue是一個基於陣列結構的有界阻塞佇列,此佇列按 FIFO(先進先出)原則對元素進行排序
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200,                 TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(5));

        for(int i=0;i<5;i++){
            MyTask myTask = new MyTask(i,connection,ftp,rec);
            executor.execute(myTask);
            System.out.println("執行緒池中執行緒數目:"+executor.getPoolSize()+",佇列中等待執行的任務數目:"+
                    executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount());
        }
        executor.shutdown();
    }
}

Sync類中Judge()方法:

//判斷in檔案是否有檔案執行完成
public static void Judge(Connection connection,FtpClient ftp,RemoteExecuteCommand rec) throws Exception {
	
//Linux命令獲取/usr/HDWork/work的檔名字
   String files=rec.execute("cd /usr/HDWork/work ; ls -lrt|sed -n '2, $p'|awk '{print $9}'");
    if(files.equals("") || files==null){
        System.out.println("work目錄下沒有檔案");
    } else {
        System.out.println("work目錄有檔案");
        String[] strs = files.split("\\n");
        for (int i = 0, len = strs.length; i < len; i++) {
            SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//計算執行時間
            long start, end;
            start = System.currentTimeMillis();
            Date date1 = new Date(start);
            String date1String = formatter.format(date1);

            System.out.println(strs[i].toString());
	//解析work目錄下的檔案內容,插入到hbase
            ReadIn.add(strs[i].toString(), connection, ftp);                
        //移動檔案到back目錄下
        Cmd.mvBack(strs[i].toString(), rec);
            end = System.currentTimeMillis();
            Date date2 = new Date(end);
            String date2String = formatter.format(date2);
            System.out.println(Thread.currentThread().getName() + ":執行檔案:." + strs[i].toString() + ";開始時間為:" + date1String + ";結束時間為:" + date2String + ";新增成功,共用時" + (end - start)+ "ms。");
                }
    }

ReadIn類中add()方法:

public class ReadIn {
    /*
    *解析work目錄下的檔案內容,插入到資料庫中
    * */
    public static void add(String fileName, Connection connection, FtpClient ftpClient) throws Exception{
    String[] y= { "trademark" , "dr_type" , "service_id" , "bill_month" );
    Table table=table(connection);//判斷是否已經存在含有這個表
    List<String> cutstrings;
    System.out.println("/usr/HDWork/work/"+fileName);
    List<String> list=download("/usr/HDWork/work/"+fileName,ftpClient);

    String str;

            for(int i=6;i<list.size();i++){
                int aa = (int) ((Math.random() * 9 + 1) * 100000);
                System.out.println(aa + "根據檔名來得到keyRow"+fileName);
                Put put = new Put(Bytes.toBytes( aa));

                cutstrings = cutstring(list.get(i).substring(19));
                for (int z = 0; z < cutstrings.size() - 1; z++) {  // 每一個for讀入一行資料
                    put.addColumn(Bytes.toBytes(y[z]), Bytes.toBytes("aa"), Bytes.toBytes(cutstrings.get(z)));//將欄位與值匹配對應插入
                }
                table.put(put);//提交資料
            }
}

/*
* 執行緒鎖判斷是否有這個表
* */

public static Table table(Connection connection)throws Exception{
    synchronized (ReadIn.class) {
        Admin admin = connection.getAdmin();
        Table table = null;
        SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
        String systemTime = String.valueOf(df.format(new Date()));//系統日期
        TableName tableNameObj = TableName.valueOf(systemTime);//以系統日期為表的名字
        if (admin.tableExists(tableNameObj)) {//如果表的名字已經存在
            table = connection.getTable(TableName.valueOf(systemTime));//連線這個表
            return table;
        } else {//沒有這個表,先建立表,在連結表
            HbaseDemo.createTable(systemTime, "trademark", "dr_type", "service_id");
            table = connection.getTable(TableName.valueOf(systemTime));
            return table;
        }
    }
}

/*
 * 擷取前19位字元後解析的內容
 * */
public   static List<String> cutstring (String Stence) {
    List<String> stringlist = new ArrayList<String>();//用來儲存解析出來的元素
    for (int i = 0; i < Stence.length(); i++) {
        if (Stence.charAt(i) == ';') {
            String temp = "";//儲存單詞
            int wordlength = i;
            while (wordlength < Stence.length() - 1 && Stence.charAt(++wordlength) != ';') {
                temp += Stence.charAt(wordlength);
                //System.out.println(temp);
            }
            stringlist.add(temp);
        }
    }
    return stringlist;
}
/**
 * 取ftp上的檔案內容
 * @param ftpFile
 * @param ftp
 * @return
 */
public static List<String> download(String ftpFile, FtpClient ftp) {
    synchronized (ReadIn.class) {
        List<String> list = new ArrayList<String>();
        String str = "";
        InputStream is = null;
        BufferedReader br = null;
        try {
            ftpFile=ftpFile.replace(" ","");
            // 獲取ftp上的檔案
            is = ftp.getFileStream(ftpFile);
            //轉為位元組流
            br = new BufferedReader(new InputStreamReader(is));
            while((str=br.readLine())!=null){
                list.add(str);
            }
            System.out.println(list+"-------------");
            br.close();
        }catch (FtpProtocolException e) {
            e.printStackTrace();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return list;
    }
}

}

MyTask類:

package com.example.demo;

import org.apache.hadoop.hbase.client.Connection;
import sun.applet.Main;
import sun.net.ftp.FtpClient;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.locks.Lock;
import java.util.logging.FileHandler;
import java.util.logging.Logger;

public class MyTask implements Runnable {
    private int taskNum;
    private Connection connection;
    private Lock lock;
    private FtpClient ftpClient;
    private RemoteExecuteCommand rec;



    public MyTask(int num, Connection connection, FtpClient ftpClient,RemoteExecuteCommand rec) {
        this.taskNum = num;
        this.connection=connection;
        this.ftpClient=ftpClient;
        this.rec=rec;
    }
    
    @Override
    public void run(){

        System.out.println(Thread.currentThread().getName() + "執行緒的名字");
        final Logger logger=Logger.getLogger(Main.class.toString());//日誌
        StringBuffer logPath=new StringBuffer();

        logPath.append("E:\\Logger");     //設定儲存路徑

        //設定檔名
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        logPath.append("\\"+sdf.format(new Date())+".log");
        //將輸出handler加入logger
        try {
            FileHandler fileHandler=new FileHandler(logPath.toString(),true);
            logger.addHandler(fileHandler);
        }catch (IOException e){
            e.printStackTrace();
        }

        try {

            while (true) {
                SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                long start,end;
                start = System.currentTimeMillis();
                Date date1=new Date(start);
                String date1String = formatter.format(date1);


                RemoteExecuteCommand rec=new RemoteExecuteCommand("10.248.56.120", "root","password");
               String aa= rec.execute("ls /usr/HDWork/in");
                if (aa.length() > 0) {


                    //Sync sync = new Sync();//開啟靜態鎖,(靜態鎖不能封裝到方法中,此處嘗試錯誤)
                  // String fileName=sync.Mv();//呼叫靜態鎖方法,獲取work目錄中檔案時間最小的檔案,將檔名稱存放在1.txt;將獲取的檔案移動到in目錄中等待操作,刪除1.txt。
                    String fileName=this.Mv();//靜態鎖方法,獲取work目錄中檔案時間最小的檔名字,將檔名稱存放在1.txt;將獲取的檔案移動到in目錄中等待操作,將檔名字返回,。
                    if(fileName==null || fileName.equals("")){
                        System.out.println(Thread.currentThread().getName()+"沒有檔案");
                    }else {
                        ReadIn.add(fileName, connection,ftpClient);//解析in目錄下的檔案內容,插入到hbase
                    }
                    if(fileName==null || fileName.equals("")){
                        System.out.println(Thread.currentThread().getName()+"沒有檔案");
                    }else {
                       if(Cmd.mvBack(fileName,rec)) {
                           end = System.currentTimeMillis();
                           Date date2=new Date(end);
                           String date2String = formatter.format(date2);
                           System.out.println(Thread.currentThread().getName() + ":執行檔案:." + fileName + ";開始時間為:"+date1String+";結束時間為:"+date2String+";新增成功,共用時"+(end-start)+"ms。");
                           logger.info(Thread.currentThread().getName() + ":執行檔案:." + fileName + ";開始時間為:"+date1String+";結束時間為:"+date2String+";新增成功,共用時"+(end-start)+"ms。");
                        }else {
                            System.out.println("過濾異常");
                            logger.info("資料插入成功,但是移動到back中失敗");
                        }
                    }
                } else {

                    System.out.println("run中的=="+Thread.currentThread().getName()+"沒有檔案");
                    Thread.sleep(1000);
                }
            }
        }catch (Exception e){

            System.out.println("run異常"+e.getMessage());
        }
    }

//此處用到了執行緒鎖,避免多個執行緒同時操作同一個檔案,重複新增
    synchronized public String  Mv(){
        synchronized (MyTask.class) {
            rec.execute("cd /usr/HDWork/in");
            String fileName = rec.execute("cd /usr/HDWork/in ; ls -lrt|sed -n \"2, 1p\"|awk '{print $9}'");
            fileName = fileName.replace("\n", " ");
            String mv = "mv /usr/HDWork/in/" + fileName + " /usr/HDWork/work";
            if (!"".equals(fileName)) {
            rec.execute(mv);
            System.out.println(fileName + "======================");
            return fileName;
        } else {
            return null;
        }
    }
    }
}

LinkHbase類:連線資料庫

public class LinkHbase {
    private static Configuration conf = HBaseConfiguration.create();
    private static Admin admin;
    static {
        conf.set("hbase.rootdir", "hdfs://node1:9000/hbase");
        // 設定Zookeeper,直接設定IP地址
        conf.set("hbase.zookeeper.quorum", "叢集1,叢集2,叢集3");
    }
    public static Connection table() throws  Exception{
        Connection connection = ConnectionFactory.createConnection(conf);
        return  connection;
    }
}

RemoteExcuteCommaand類:遠端執行Linux

/**
 * 遠端執行linux的shell script
 * @author Ickes
 * @since  V0.1
 */		
public class RemoteExecuteCommand {

    //字元編碼預設是utf-8
    private static String  DEFAULTCHART="UTF-8";
    private Connection conn;
    private String ip;
    private String userName;
    private String userPwd;

    public RemoteExecuteCommand(String ip, String userName, String userPwd) {
        this.ip = ip;
        this.userName = userName;
        this.userPwd = userPwd;
    }
   //無參方法
    public RemoteExecuteCommand() {
    }
    /**
     * 遠端登入linux的主機
     * @author Ickes
     * @since  V0.1
     * @return
     *      登入成功返回true,否則返回false
     */
    public Boolean login(){
        boolean flg=false;
        try {
            conn = new Connection(ip);
            conn.connect();//連線
            flg=conn.authenticateWithPassword(userName, userPwd);//認證
        } catch (IOException e) {
            e.printStackTrace();
        }
        return flg;
    }

/**
 * @author Ickes
 * 遠端執行shll指令碼或者命令
 * @param cmd
 *      即將執行的命令
 * @return
 *      命令執行完後返回的結果值
 * @since V0.1
 */
public String execute(String cmd){
    String result="";
    try {
        if(login()){
            Session session= conn.openSession();//開啟一個會話
            session.execCommand(cmd);//執行命令
            result=processStdout(session.getStdout(),DEFAULTCHART);
            //如果為得到標準輸出為空,說明指令碼執行出錯了
            if(StringUtils.isBlank(result)){
                result=processStdout(session.getStderr(),DEFAULTCHART);
            }
            conn.close();
            session.close();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
    return result;
}

/**
 * @author Ickes
 * 遠端執行shll指令碼或者命令
 * @param cmd
 *      即將執行的命令
 * @return
 *      命令執行成功後返回的結果值,如果命令執行失敗,返回空字串,不是null
 * @since V0.1
 */
public String executeSuccess(String cmd){
    String result="";
    try {
        if(login()){
            Session session= conn.openSession();//開啟一個會話
            session.execCommand(cmd);//執行命令
            result=processStdout(session.getStdout(),DEFAULTCHART);
            conn.close();
            session.close();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
    return result;
}

/**
 * 解析指令碼執行返回的結果集
 * @author Ickes
 * @param in 輸入流物件
 * @param charset 編碼
 * @since V0.1
 * @return
 *       以純文字的格式返回
 */

private String processStdout(InputStream in, String charset){
    InputStream    stdout = new StreamGobbler(in);
    StringBuffer buffer = new StringBuffer();;
    try {
        BufferedReader br = new BufferedReader(new InputStreamReader(stdout,charset));
        String line=null;
        while((line=br.readLine()) != null){
            buffer.append(line+"\n");
        }
    } catch (UnsupportedEncodingException e) {
        e.printStackTrace();
    } catch (IOException e) 

        e.printStackTrace();
    }
    return buffer.toString();
}

public static void setCharset(String charset) {
    DEFAULTCHART = charset;
}
public Connection getConn() {
    return conn;
}
public void setConn(Connection conn) {
    this.conn = conn;
}
public String getIp() {
    return ip;
}
public void setIp(String ip) {
    this.ip = ip;
}
public String getUserName() {
    return userName;
}
public void setUserName(String userName) {
    this.userName = userName;
}
public String getUserPwd() {
    return userPwd;
}
public void setUserPwd(String userPwd) {
    this.userPwd = userPwd;
}