1. 程式人生 > >DICOM:DICOM開源庫多線程分析之“ThreadPoolQueue in fo-dicom”

DICOM:DICOM開源庫多線程分析之“ThreadPoolQueue in fo-dicom”

lower win man 例如 fec called sof current 資源下載

背景:

上篇博文介紹了dcm4chee中使用的Leader/Follower線程池模型。主要目的是節省上下文切換,提高運行效率。

本博文同屬【DICOM開源庫多線程分析】系列。著重介紹fo-dicom中使用的ThreadPoolQueue線程池。

ThreadPoolQueue in fo-dicom:

先看一下ThreadPoolQueue代碼中自己定義的數據結構,

    public class ThreadPoolQueue<T> {
        private class WorkItem {
            public T Group;
            public
Action Action; public WaitCallback Callback; public object State; } private class WorkGroup { public T Key; public object Lock = new object(); public volatile bool Executing = false; public Queue<WorkItem> Items = new
Queue<WorkItem>(); public WorkGroup(T key) { Key = key; } } private object _lock = new object(); private volatile bool _stopped = false; private Dictionary<T, WorkGroup> _groups; public ThreadPoolQueue() { _groups = new
Dictionary<T, WorkGroup>(); Linger = 200; DefaultGroup = default(T); } …… }

由上述結構基本能夠看出ThreadPoolQueue自己定義線程池隊列是將不同的線程依據類型T進行分組,並將相應的處理操作代理(Action與WaitCallback)一同傳入。
相較於傳統的ThreadPool系統線程池,ThreadPoolQueue通過創建字典對象Dictionary

private void Execute(T groupKey) {
            if (_stopped)
                return;

            WorkGroup group = null;
            lock (_lock) {
                if (!_groups.TryGetValue(groupKey, out group))
                    return;
            }
            lock (group.Lock) {
                if (group.Executing)
                    return;

                if (group.Items.Count == 0 && !group.Key.Equals(DefaultGroup)) {
                    _groups.Remove(groupKey);
                    System.Console.WriteLine("Remove WorkGroup Key is {0}", group.Key);
                    return;
                }

                group.Executing = true;

                ThreadPool.QueueUserWorkItem(ExecuteProc, group);
            }
        }

聯系之前專欄中其它博文,比如DICOM:DICOM3.0網絡通信協議之“開源庫實現剖析”、DICOM:DICOM開源庫多線程分析之“LF_ThreadPool in DCM4CHEE”,能夠總結fo-dicom開源庫對於DICOM請求的總體響應邏輯例如以下:
技術分享

用到ThreadPoolQueue的地方分別用來處理PDataTF數據包。即DICOM Upper layer協議中的P-DATA消息,詳情可參見之前關於DICOM網絡傳輸的博文DICOM:DICOM3.0網絡通信協議(三)、DICOM:DICOM3.0網絡通信協議(續)、DICOM醫學圖像處理:DICOM網絡傳輸,以及DICOM醫學圖像處理:全面分析DICOM3.0標準中的通訊服務模塊,依照消息的MessageID分別將指定的處理任務放到響應的分組中,控制總體消息流的FIFO順序運行。

另外也會待任務運行完成後會僅僅保留默認分組的線程池,降低系統資源浪費。

ThreadPoolQueue本地測試演示樣例:

為了演示ThreadPoolQueue在.NET系統ThreadPool線程池基礎上加入了任務FIFO順序運行控制流,本地編寫了一個簡單的測試程序,演示代碼例如以下:

        private static ThreadPoolQueue<string> threadpool = new ThreadPoolQueue<string>();
        private static string[] groups = new string[] { "group-0", "group-1", "group-2", "group-3", "group-4"};
        private static Dictionary<string, List<int>> results = new Dictionary<string, List<int>>();
        private static object mutex = new object();
        static void Main(string[] args)
        {
            threadpool.DefaultGroup = "group-0";
            for (int i = 0; i < 100; ++i)
            {
                threadpool.Queue(groups[i % 5], ThreadProcessing, i);
                Thread.Sleep(500);
            }
            System.Console.ReadLine();
            foreach (var result in results.Keys.ToList())
            {
                System.Console.WriteLine("Group {0}", result);
                foreach (var record in results[result])
                {
                    System.Console.Write("Item={0}\t", record);
                }
                System.Console.WriteLine();
            }
            System.Console.ReadKey();

        }

        private static void ThreadProcessing(object state)
        {
            int record = (int)state;
            Thread.Sleep(2 * 1000);
            lock(mutex)
            {
                List<int> recordList = new List<int>();
                if (!results.TryGetValue(groups[record % 5], out recordList))
                {
                    results.Add(groups[record % 5], new List<int>());
                }
                results[groups[record % 5]].Add(record);
            }
        }

本地調試結果例如以下:
技術分享

知識點補充:

不管是之前介紹的dcm4chee中使用的Leader/Follower線程池模型,還是今天介紹的fo-dicom中的ThreadPoolQueue自己定義線程池隊列。都是提高效率的一種實現方式。現現在多核多處理器,乃至分布式集群的出現。使得任務的調度變得尤為重要。因此搞清楚當中的各種概念,理清思路是前提。宏觀和微觀是相對而言的,
- 就線程進程來說,線程概念範疇<進程概念範疇,線程就屬於微觀,進程是宏觀。各進程內部須要實現詳細的線程調度算法。


- 就進程操作系統來說,進程概念範疇<操作系統概念範疇。操作系統內部須要實現各進程之間的調度算法。


- 就單核心多核心來說,單核心概念範疇<多核心概念範疇,多核心內部在單核心調度進程基礎上須要加入多個核心之間的調度算法。


- 就單機集群來說,單機的概念範疇<集群的概念範疇,集群內部須要調度協調各主機的狀態。

技術分享
上述各個環節、各個層級都提到了調度算法。其本質解決的是資源競爭和數據同步,假設兩個操作沒有不論什麽的資源競爭,甚至能夠說沒有不論什麽的關系,那就不存在調度了,就比方不同公司的兩個人同一時候領了工資;可是假設他倆同一時候去一個銀行一個櫃臺找同一位美麗MM存錢,那麽就得排隊等候了。

1. Thread:

  1. POSIX中Thread:

    A single flow of control within a process. Each thread has its own thread ID, scheduling priority and policy, errno value, floating point environment, thread-specific key/value bindings, and the required system resources to support a flow of control. Anything whose address may be determined by a thread, including but not limited to static variables, storage obtained via malloc(), directly addressable storage obtained through implementation-defined functions, and automatic variables, are accessible to all threads in the same process.

  2. MSDN中Thread:

    Operating systems use processes to separate the different applications that they are executing. Threads are the basic unit to which an operating system allocates processor time, and more than one thread can be executing code inside that process. Each thread maintains exception handlers, a scheduling priority, and a set of structures the system uses to save the thread context until it is scheduled. The thread context includes all the information the thread needs to seamlessly resume execution, including the thread’s set of CPU registers and stack, in the address space of the thread’s host process.


從上述兩個標準中能夠看出線程(Thread)是操作系統調度、分配CPU時間片最小單位,表示的是詳細的控制流(即指令運行過程)。

2. Process:

  1. POSIX Process:

    The POSIX model treats a “process” as an aggregation of system resources, including one or more threads that may be scheduled by the operating system on the processor(s) it controls. Although a process has its own set of scheduling attributes, these have an indirect effect (if any) on the scheduling behavior of individual threads as described below.

  2. MSDN Process:

    An application consists of one or more processes. A process, in the simplest terms, is an executing program. One or more threads run in the context of the process.


從上述兩個標準中能夠看出進程(Process)是我們平時編敲代碼的詳細運行(即executing program),是操作系統分配系統資源的最單位。

3. Concurrency VS Parallelism

Concurrency and parallelism are related concepts, but there are small differences. Concurrency means that two or more tasks are making progress even though they might not be executing simultaneously. This can for example be realized with time slicing where parts of tasks are executed sequentially and mixed with parts of other tasks. Parallelism on the other hand arise when the execution can be truly simultaneous.
【摘自】:Akka.NET:Terminology and Concepts
- Concurrency:
技術分享
- Parallelism:
技術分享

上圖中Parallelism的示意圖肯定是沒有問題的,而Concurrency的運行並不是是上述圖中的單一模式,待介紹完“Multi-core”和“Multi-processor”概念後再比較一下這兩個概念。

4. Multi-core VS Multi-processor

  1. Multi-core processor:

    A multi-core processor is a single computing component with two or more independent actual processing units (called “cores”), which are the units that read and execute program instructions. The instructions are ordinary CPU instructions such as add, move data, and branch, but the multiple cores can run multiple instructions at the same time, increasing overall speed for programs amenable to parallel computing.
    技術分享
    A multi-core processor implements multiprocessing in a single physical package. Designers may couple cores in a multi-core device tightly or loosely. For example, cores may or may not share caches, and they may implement message passing or shared-memory inter-core communication methods. Common network topologies to interconnect cores include bus, ring, two-dimensional mesh, and crossbar.
    【摘自】:Wiki百科 Multi-core processor

  2. Multi-processor

    Multiprocessing is the use of two or more central processing units (CPUs) within a single computer system. The term also refers to the ability of a system to support more than one processor and/or the ability to allocate tasks between them. There are many variations on this basic theme, and the definition of multiprocessing can vary with context, mostly as a function of how CPUs are defined (multiple cores on one die, multiple dies in one package, multiple packages in one system unit, etc.).
    In a multiprocessing system, all CPUs may be equal, or some may be reserved for special purposes. A combination of hardware and operating system software design considerations determine the symmetry (or lack thereof) in a given system.
    【摘自】:Wiki百科:Multi-processor

在Oracle的博文Concurrency vs Parallelism, Concurrent Programming vs Parallel Programming中也提到了Concurrency(並發)與Parallelism(並行)概念,與此同一時候也涉及了Multi-core(多核)和Multi-processor(多處理器)。文中提到:

If two concurrent threads are scheduled by the OS to run on one single-core non-SMT non-CMP processor, you may get concurrency but not parallelism. Parallelism is possible on multi-core, multi-processor or distributed systems.
Concurrency is often referred to as a property of a program, and is a concept more general than parallelism.
Interestingly, we cannot say the same thing for concurrent programming and parallel programming. They are overlapped, but neither is the superset of the other. The difference comes from the sets of topics the two areas cover. For example, concurrent programming includes topic like signal handling, while parallel programming includes topic like memory consistency model. The difference reflects the different orignal hardware and software background of the two programming practices.

上文表明Concurrency與Parallelism概念有重合但又互不包括,因此在理解兩者概念上會常常有混淆。

5. Load Balancing:

隨著多核心(Multi-core)、多處理器(Multi-processor)。以及分布式集群(distributed systems)的出現,各部分之間的協調(這裏主要指的是任務的總體分配。與詳細的線程、進程、時間片的調度算法有別)相同顯得尤為重要。

On SMP systems, it is important to keep the workload balanced among all processors to fully utilize the benefits of having more than one processor.
【摘自】:《Operating System Concepts, 9th Edition 》第6.5.3小節

6. Time Slice:

The period of time for which a process is allowed to run in a preemptive multitasking system is generally called the time slice, or quantum. The scheduler is run once every time slice to choose the next process to run. The length of each time slice can be critical to balancing system performance vs process responsiveness - if the time slice is too short then the scheduler will consume too much processing time, but if the time slice is too long, processes will take longer to respond to input.
An interrupt is scheduled to allow the operating system kernel to switch between processes when their time slices expire, effectively allowing the processor’s time to be shared between a number of tasks, giving the illusion that it is dealing with these tasks simultaneously, or concurrently. The operating system which controls such a design is called a multi-tasking system.
【摘自】:Wiki百科:Preemption (computing)搶占式

由本節最上圖能夠看出,全部的多線程調度、多進程調度。乃至分布式系統的協調終於依賴的都是時間中斷(即時間片)。硬件時間中斷是全部調度最底層驅動的動力。

演示樣例源碼:

  1. CSDN資源下載
  2. Github資源下載
    註:下載Github的演示樣例代碼最好下載fo-dicom的整個倉庫。




作者:[email protected]
時間:2016-02-05

DICOM:DICOM開源庫多線程分析之“ThreadPoolQueue in fo-dicom”