Zettelkasten

concurrent.futures.ThreadPooolExecutor 동작

·수정 2026.04.23·수정 3

요약

  • ThreadPoolExecutor는 스레드 풀을 관리하고 작업을 스레드 단위로 실행하는 클래스
  • submit, map 인터페이스를 갖고 있음
  • submit시 함수와 인자들을 _WorkItem으로 감싸고 self._work_queue 에 제출함
  • self._adjust_thread_count 호출
  • _idle_semaphore를 acquire
  • 현재 스레드가 max_workers를 넘지 않으면 _worker 를 타겟으로 threading.Thread 객체를 생성하고 start함
  • _worker는 아래처럼 생김

본문

  • _base.Executor 를 상속 받아서 생성함
    • _base.Executor는 submit, map 인터페이스를 갖고 있음
  • submit시 아래와 같은 동작이 진행됨
    • 함수와 인자들을 _WorkItem으로 감싸고 self._work_queue 에 제출함
    • 이후 self._adjust_thread_count 호출
  • self._adjust_thread_count
    • _idle_semaphore를 acquire
    • 현재 스레드가 max_workers를 넘지 않으면 _worker 를 타겟으로 threading.Thread 객체를 생성하고 start함
  • _worker는 아래처럼 생김
def _worker(executor_reference, work_queue, initializer, initargs):
    if initializer is not None:
        try:
            initializer(*initargs)
        except BaseException:
            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
            executor = executor_reference()
            if executor is not None:
                executor._initializer_failed()
            return
    try:
        while True:
            work_item = work_queue.get(block=True)
            if work_item is not None:
                work_item.run()
                # Delete references to object. See issue16284
                del work_item

                # attempt to increment idle count
                executor = executor_reference()
                if executor is not None:
                    executor._idle_semaphore.release()
                del executor
                continue

            executor = executor_reference()
            # Exit if:
            #   - The interpreter is shutting down OR
            #   - The executor that owns the worker has been collected OR
            #   - The executor that owns the worker has been shutdown.
            if _shutdown or executor is None or executor._shutdown:
                # Flag the executor as shutting down as early as possible if it
                # is not gc-ed yet.
                if executor is not None:
                    executor._shutdown = True
                # Notice other workers
                work_queue.put(None)
                return
            del executor
    except BaseException:
        _base.LOGGER.critical('Exception in worker', exc_info=True)