1. 程式人生 > >多線程(八)常用的線程模型

多線程(八)常用的線程模型

created ren 結果 bst lock 數據 例如 del unit

  在處理業務的時候,有時候需要根據情況使用不同的線程處理模型來處理業務邏輯,這裏演示一下常見的線程模型使用技巧。

1、Future模型

  前面的章節中提到過Future模型,該模型通常在使用的時候需要結合Callable接口配合使用。Future:未來的、將來的,再結合Callable大概可以明白其功能。

  Future是把結果放在將來獲取,當前主線程並不急於獲取處理結果。允許子線程先進行處理一段時間,處理結束之後就把結果保存下來,當主線程需要使用的時候再向子線程索取。

  Callable是類似於Runnable的接口,其中call方法類似於run方法,所不同的是run方法不能拋出受檢異常沒有返回值,而call方法則可以拋出受檢異常並可設置返回值

。兩者的方法體都是線程執行體。

    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object‘s
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * 
@see java.lang.Thread#run() */ public abstract void run(); /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception;

  註意這裏,無法拋出受檢異常不等於無法捕獲線程中throws的異常

。run方法執行體中拋出異常是可以被捕獲的,前提是使用Future來處理,後面會有說明。

  如果有一種場景需要一個線程處理一段業務,處理結束之後主線程將會使用處理結果進行後續處理。這樣,按照普通邏輯,就需要使用到一個全局變量來保存子線程處理之後的結果。子線程處理結束之後,把結果保存在全局變量中供主線程進行調用。一旦涉及到全局能量便存在著多線程讀寫全局變量錯誤的風險。而使用Future模式便可以省去全局變量的使用,直接從線程中獲取子線程處理結果。下面看一下使用示例;

package thread.blogs.threadmodel;

/**
 * Created by PerkinsZhu on 2017/9/1 15:34.
 */
public class AbstractModel {
    protected static void sleep(int time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    protected static void println(Object info) {
        System.out.println(info);
    }
}

package thread.blogs.threadmodel;

import java.util.concurrent.*;

/**
 * Created by PerkinsZhu on 2017/9/1 15:32.
 */
public class FutureModel extends AbstractModel {
    public static void main(String[] args) {
        testFuture();
    }

    /**
     * 區別: CallAble  可以有返回值  可以拋出受檢異常
     * Runnable  沒有返回值   無法拋出受檢異常但可捕獲線程中發生的異常。
     * 者都可通過對future.get()進行try cathch捕獲異常
     */
    private static void testFuture() {
        MyCallable myCallable = new MyCallable();
        MyRunnable myRunnable = new MyRunnable();
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Future<?> future = executorService.submit(myCallable);
        sleep(2000);
        try {
            //String data = future.get(2000, TimeUnit.MILLISECONDS);//可以指定超時時間
            Object data = future.get();//當執行Runnable的時候,這裏返回的為nul。此時如果有run方法體中有異常異常拋出,可以在此捕獲到,雖然Run方法沒有顯示的拋出受檢異常。
            println(data + "---" + data.getClass().toString());
        } catch (InterruptedException e) {
            println(e.getMessage());
        } catch (ExecutionException e) {
            println(e.getMessage());
        } catch (Exception e) {
            println(e.getMessage());
        }
        executorService.shutdown();
    }

    static class MyCallable implements Callable<String> {
        @Override
        public String call() throws Exception {
            sleep(500);
            println("I am Callable...");
            //int num = 10/0;
            //throw  new RuntimeException("異常");
            return "hello";
        }
    }

    static class MyRunnable implements Runnable {
        @Override
        public void run() {//不支持返回值,無法對線程捕獲異常。
            sleep(500);
            println("I am Runnable...");
            // int num = 10/0;
            //throw  new RuntimeException("異常");
        }
    }
}

可以取消註釋 分別測試 myCallable 和myRunnable 對異常捕獲和結果獲取進行測試。

2、fork&join 模型
該模型是jdk中提供的線程模型。該模型包含遞歸思想和回溯思想,遞歸用來拆分任務,回溯用合並結果。 可以用來處理一些可以進行拆分的大任務。其主要是把一個大任務逐級拆分為多個子任務,然後分別在子線程中執行,當每個子線程執行結束之後逐級回溯,返回結果進行匯總合並,最終得出想要的結果。這裏模擬一個摘蘋果的場景:有100棵蘋果樹,每棵蘋果樹有10個蘋果,現在要把他們摘下來。為了節約時間,規定每個線程最多只能摘10棵蘋樹以便於節約時間。各個線程摘完之後匯總計算總蘋果樹。代碼實現如下:

package thread.blogs.threadmodel;

import scala.Console;

import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by PerkinsZhu on 2017/9/5 13:05.
 */
public class ForkJoin {
    public static void main(String[] args) {
        testAcation();
    }

    private static void testAcation() {
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Integer> future = pool.submit(new ResultTask(100));//共100棵蘋果樹
        try {
            Console.println(future.get());
            pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
        pool.shutdown();
    }
}

class ResultTask extends RecursiveTask<Integer> { //也可繼承自RecursiveAction抽象類,區別在於compute方法沒有返回值,如果只需要執行動作則可以使用該接口
    private int treeNum;

    public ResultTask(int num) {
        this.treeNum = num;
    }

    @Override
    protected Integer compute() {
        if (treeNum < 10) {//每個線程最多只能摘10棵蘋果樹
            return getAppleNum(treeNum);
        } else {

            //對任務進行拆分,註意這裏不僅僅可以一分為二進行拆分,也可以拆為多個子任務
            int temp = treeNum / 2;
            ResultTask left = new ResultTask(temp);
            ResultTask right = new ResultTask(treeNum - temp);
            left.fork();
            right.fork();
            //對子任務處理的結果進行合並
            int result = left.join() + right.join();

            Console.println("========" + Thread.currentThread().getName() + "=========" + result);
            return result;
        }
    }

    public Integer getAppleNum(int treeNum) {
        return treeNum * 10;//每棵樹上10個蘋果
    }
}

這裏需要看一下執行結果,主要是為了明白在拆分子任務的時候並不是無限制開啟線程,而是使用了線程池ForkJoinPool復用線程。註意下面輸出的線程名稱!

========ForkJoinPool-1-worker-3=========120
========ForkJoinPool-1-worker-7=========120
========ForkJoinPool-1-worker-0=========120
========ForkJoinPool-1-worker-5=========120
========ForkJoinPool-1-worker-1=========130
========ForkJoinPool-1-worker-11=========130
========ForkJoinPool-1-worker-4=========250
========ForkJoinPool-1-worker-7=========130
========ForkJoinPool-1-worker-7=========250
========ForkJoinPool-1-worker-3=========130
========ForkJoinPool-1-worker-5=========250
========ForkJoinPool-1-worker-6=========250
========ForkJoinPool-1-worker-2=========500
========ForkJoinPool-1-worker-3=========500
========ForkJoinPool-1-worker-1=========1000
1000

3、actor消息模型

  actor模型屬於一種基於消息傳遞機制並行任務處理思想,它以消息的形式來進行線程間數據傳輸,避免了全局變量的使用,進而避免了數據同步錯誤的隱患。actor在接受到消息之後可以自己進行處理,也可以繼續傳遞(分發)給其它actor進行處理。在使用actor模型的時候需要使用第三方Akka提供的框架點擊查看。這裏使用scala進行演示,如果需要看java使用方法則可以查閱官方文檔:actor for java 使用。

package thread.blogs.threadmodel

import akka.actor.{Actor, ActorSystem, Props}

/**
  * Created by PerkinsZhu on 2017/9/21 18:58. 
  */
object ActorTest {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("MyActor")
    val actor = actorSystem.actorOf(Props[MyActor], "MyActor")
    actor ! "很高興認識你!"//發送消息給actor
  }
}

class MyActor extends Actor {
  override def receive: Receive = {//接收消息,根據消息類型進行case匹配,可以在此actor進行處理,也可以繼續傳遞給其它actor進行處理(參考master-worker)。
    case str: String => println(str)
  }
}

4、生產者消費者模型

  生產者消費者模型都比較熟悉,其核心是使用一個緩存來保存任務。開啟一個/多個線程來生產任務,然後再開啟一個/多個來從緩存中取出任務進行處理。這樣的好處是任務的生成和處理分隔開,生產者不需要處理任務,只負責向生成任務然後保存到緩存。而消費者只需要從緩存中取出任務進行處理。使用的時候可以根據任務的生成情況和處理情況開啟不同的線程來處理。比如,生成的任務速度較快,那麽就可以靈活的多開啟幾個消費者線程進行處理,這樣就可以避免任務的處理響應緩慢的問題。使用示例如下:

package thread.blogs.threadmodel;

import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;

/**
 * Created by PerkinsZhu on 2017/9/22 8:58.
 */
public class PCModel {
    public static void main(String[] args) {
        testPCModel();
    }
    private static Queue<String> queue = new LinkedList<String>();//任務緩存,這裏保存簡單的字符串模擬任務
    private static void testPCModel() {
        new Thread(() -> {//生產者線程
            while (true) {
                String uuid = UUID.randomUUID().toString();
                queue.add(uuid);
                sleep(100);
            }
        }).start();
        for (int i = 0; i < 10; i++) {//開啟10消費者處理任務,保證生產者產生的任務能夠被及時處理
            new Thread(() -> {
                while (true) {
                    doWork(queue);
                }
            }).start();
        }
    }

    private static void doWork(Queue<String> queue) {
        sleep(1000);
        synchronized (queue) {
            if (queue.size() > 0) {
                sleep(10);
                System.out.println(queue.poll() + "----" + queue.size());
            }
        }
    }
    private static void sleep(int time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

5、master-worker模型

  master-worker模型類似於任務分發策略,開啟一個master線程接收任務,然後在master中根據任務的具體情況進行分發給其它worker子線程,然後由子線程處理任務。如需返回結果,則worker處理結束之後把處理結果返回給master。下面的代碼示例是使用akka actor for scala演示。使用的時候也可以使用java Thread來實現該模型。

package thread.blogs.threadmodel

import akka.actor.{Actor, ActorSystem, Props}

/**
  * Created by PerkinsZhu on 2017/9/21 18:58. 
  */
object ActorTest {
  val actorSystem = ActorSystem("Master")


  def main(args: Array[String]): Unit = {
    val actor = actorSystem.actorOf(Props[Master], "Master")
    var taskNum = 0;
    while (true) {
      taskNum = taskNum + 1;
      actor ! Task("做作業!  --" + taskNum) //發送消息給actor
      Thread.sleep(100)
    }
  }
}

class Master extends Actor {
  val actorSystem = ActorSystem("worker")
  var num = 0;

  override def receive: Receive = {
    case task: Task => {
      num = num + 1;
      //接收到任務之後分發給其它worker線程。可以使用worker池 復用actor
      actorSystem.actorOf(Props[Worker], "worker" + num) ! task
    }
    case any: Any => println(any)
  }
}

class Worker extends Actor {

  def doWork(task: Task): Unit = println(task.name)

  override def receive: Receive = {
    case task: Task => doWork(task) //worker處理接受到的任務
    case any: Any => println(any)
  }
}

case class Task(name: String)

這裏如果需要worker返回處理結果,則只需要在worker中調用sender 發送處理結果即可。

=========================================

原文鏈接:多線程(八)常用的線程模型轉載請註明出處!

=========================================

---end

多線程(八)常用的線程模型