1. 程式人生 > >threadlocal與ThreadPoolExecutor造成的記憶體洩漏

threadlocal與ThreadPoolExecutor造成的記憶體洩漏

threadlocal與執行緒相關,每個執行緒都會有一份,參考  http://python.jobbole.com/86150/

ThreadPoolExecutor建構函式裡面有max_workers引數,如果這個引數設定的不好,就有可能造成記憶體洩漏。

示例程式碼如下:

from concurrent.futures import ThreadPoolExecutor
import threading
import time
import traceback
x = threading.local()

def show():
    try:
        print(f'{threading.get_ident()} size {len(x.content)}')
    except:
        print(traceback.format_exc())

def func():

    print(f'{threading.current_thread()}')
    with open(r'/path/to/bigfile', 'rb') as f:
        x.content = f.read()
    print(len(x.content))
    show()
    time.sleep(15)
    return '1'


if __name__ == '__main__':

    executer = ThreadPoolExecutor(max_workers=4)
    for i in range(4):
        f1 = executer.submit(func)
        # print(f1.result())
    time.sleep(35)
    print(' in main')

輸出

 通過top檢視使用記憶體(為49.8)

把max_workers設定為1,

輸出

通過top檢視使用記憶體(為12.5)

 

如果改為用thread執行,也不會洩漏,

if __name__ == '__main__':

    # executer = ThreadPoolExecutor(max_workers=4)
    for i in range(4):
        # f1 = executer.submit(func)
        th = threading.Thread(target=func)
        th.start()
        th.join()
        # print(f1.result())
    time.sleep(35)
    print(' in main')

應該是ThreadPoolExecutor裡面thread執行完之後,thread沒有 釋放造成.

檢視ThreadPoolExecutor程式碼

    def _adjust_thread_count(self):
        # When the executor gets lost, the weakref callback will wake up
        # the worker threads.
        def weakref_cb(_, q=self._work_queue):
            q.put(None)
        # TODO(bquinlan): Should avoid creating new threads if there are more
        # idle threads than items in the work queue.
        num_threads = len(self._threads)
        if num_threads < self._max_workers:
            thread_name = '%s_%d' % (self._thread_name_prefix or self,
                                     num_threads)
            t = threading.Thread(name=thread_name, target=_worker,
                                 args=(weakref.ref(self, weakref_cb),
                                       self._work_queue))
            t.daemon = True
            t.start()
            self._threads.add(t)
            _threads_queues[t] = self._work_queue

發現有self._threads.add(t), 應該就是這個造成的。