Многопоточность в Python

В соавторстве со мной (Дэниел) и Хайле


В современном мире данные занимают центральное место в нашей жизни, от социальных сетей до умных приборов. Производительность программы определяется ее способностью манипулировать данными и вычислять их часто по сети. Обработка больших объемов данных сопровождается проблемами, в частности, увеличением времени выполнения программы, что приводит к «блокам» или «отставанию».

Из-за необходимости эффективного выполнения программы и все более сложной архитектуры многоядерного ос/аппаратного обеспечения языки программирования попытались лучше использовать это поведение. Буквальное значение слова параллелизм — «одновременное выполнение». Время выполнения параллельной программы может быть значительно сокращено, поскольку компьютер может одновременно выполнять несколько инструкций.

В модели параллелизма Python переплетаются три основные концепции операционных систем: поток, задача и процесс.

Разделение потоков

Потоки одного и того же процесса операционной системы распределяют вычислительную нагрузку между несколькими ядрами, как это наблюдается в таких языках программирования, как C++ и Java. Как правило, в python используется только один процесс, из которого порождается один главный поток для выполнения программы. Он остается на одном ядре независимо от количества ядер компьютера или количества порождаемых новых потоков благодаря механизму блокировки, называемому глобальной блокировкой интерпретатора, введенному для предотвращения так называемого состояния гонки.

При упоминании гонок вы, вероятно, подумали о NASCAR и Формуле-1. Давайте воспользуемся этой аналогией и представим, что все гонщики Формулы 1 пытаются участвовать в гонках на одном болиде одновременно. Звучит абсурдно, верно? Это было бы возможно, только если бы каждый гонщик имел доступ к своей собственной машине или, что еще лучше, проезжал по одному кругу за раз, каждый раз передавая машину следующему гонщику.

Это очень похоже на то, что происходит в потоках. Потоки «порождаются» из «главного» потока, каждый последующий поток является копией предыдущего. Все эти потоки существуют в одном и том же «контексте» процесса (событие или соревнование) — поэтому все ресурсы, назначенные этому процессу, такие как память, являются общими. Например, в типичном сеансе интерпретатора python:

>>> a = 8
Вход в полноэкранный режим Выход из полноэкранного режима

Здесь a потребляет очень мало памяти (RAM), поскольку некоторое произвольное место в памяти временно хранит значение 8.

Пока все хорошо, давайте запустим несколько потоков и понаблюдаем за их поведением при сложении двух чисел x и y:

import time
import threading
from threading import Thread

a = 8

def threaded_add(x, y):
    # simulation of a more complex task by asking
    # python to sleep, since adding happens so quick!
    for i in range(2):
        global a
        print("computing task in a different thread!")
        time.sleep(1)
        #this is not okay! but python will force sync, more on that later!
        a = 10
        print(a)

# the current thread will be a subset fork!
if __name__ != "__main__":
    current_thread = threading.current_thread()


# here we tell python from the main 
# thread of execution make others
if __name__ == "__main__":

    thread = Thread(target = threaded_add, args = (1, 2))
    thread.start()
    thread.join()
    print(a)
    print("main thread finished...exiting")
Войти в полноэкранный режим Выход из полноэкранного режима
>>> computing task in a different thread!
>>> 10
>>> computing task in a different thread!
>>> 10
>>> 10
>>> main thread finished...exiting
Вход в полноэкранный режим Выход из полноэкранного режима

В настоящее время запущены два потока. Назовем их thread_one и thread_two. Если thread_one хочет изменить a со значением 10, а thread_two одновременно пытается обновить ту же переменную, у нас возникает проблема! Возникнет состояние, известное как гонка данных, и результирующее значение a будет бессвязным.

Гонка NASCAR, которую вы не смотрели, но услышали два противоречащих друг другу результата от двух ваших друзей! thread_one говорит вам одно, а thread two противоречит этому! Вот фрагмент псевдокода, иллюстрирующий это:

a = 8
# spawns two different threads 1 and 2
# thread_one updates the value of a to 10

if (a == 10):
  # a check

#thread_two updates the value of a to 15
a = 15
b = a * 2

# if thread_one finished first the result will be 20
# if thread_two finished first the result will be 30
# who is right?
Войти в полноэкранный режим Выход из полноэкранного режима

Что же происходит на самом деле?

Python является интерпретируемым языком, что означает, что он поставляется с интерпретатором — программой, которая анализирует исходный код с другого языка! Некоторые из таких интерпретаторов в python включают cpython, pypy, Jpython, IronPython, причем Cpython является оригинальной реализацией для python.

CPython — это интерпретатор, который обеспечивает интерфейс внешних функций с C, а также другими языками программирования, он компилирует исходный код python в промежуточный байткод, который интерпретируется виртуальной машиной CPython. До сих пор и в дальнейшем речь шла о CPython и понимании поведения в этой среде.

Модель памяти и механизмы блокировки

Язык программирования использует объекты в своих программах для выполнения операций. Эти объекты состоят из примитивных типов данных, таких как string, integer или boolean. Они также включают более сложные структуры данных, такие как list или classes/objects. Значения объектов вашей программы хранятся в памяти для быстрого доступа. Когда переменная используется в программе, процесс считывает значение из памяти и оперирует им. В ранних языках программирования большинство разработчиков отвечали за все управление памятью в своих программах. Это означало, что перед созданием списка или объекта сначала нужно было выделить память под переменную. После этого вы должны были деаллоцировать ее, «освободив» память.

В python объекты хранятся в памяти по ссылке. Ссылка — это своего рода метка для объекта, поэтому у одного объекта может быть много имен, подобно тому, как у вас может быть имя и прозвище. Ссылка — это точное местоположение объекта в памяти. Счетчик ссылок используется для сборки мусора в python, автоматического процесса управления памятью.

С помощью счетчика ссылок python отслеживает каждый объект, увеличивая счетчик ссылок всякий раз, когда объект создается или на него ссылаются, и уменьшая, когда объект разыменовывается. Когда счетчик ссылок равен 0, память для объекта деаллоцируется.

import sys
import gc

hello = "world" #reference to 'world' is 2
print (sys.getrefcount(hello))

bye = "world" 
other_bye = bye 
print(sys.getrefcount(bye)) 
print(gc.get_referrers(other_bye))
Вход в полноэкранный режим Выход из полноэкранного режима
>>> 4
>>> 6
>>> [['sys', 'gc', 'hello', 'world', 'print', 'sys', 'getrefcount', 'hello', 'bye', 'world', 'other_bye', 'bye', 'print', 'sys', 'getrefcount', 'bye', 'print', 'gc', 'get_referrers', 'other_bye'], (0, None, 'world'), {'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': <_frozen_importlib_external.SourceFileLoader object at 0x0138ADF0>, '__spec__': None, '__annotations__': {}, '__builtins__': <module 'builtins' (built-in)>, '__file__': 'test.py', '__cached__': None, 'sys': <module 'sys' (built-in)>, 'gc': <module 'gc' (built-in)>, 'hello': 'world', 'bye': 'world', 'other_bye': 'world'}]
Войти в полноэкранный режим Выход из полноэкранного режима

Эти переменные счетчика ссылок должны быть защищены от условий гонки или утечек памяти. Чтобы защитить эти переменные, можно добавить блокировки ко всем структурам данных, которые разделяются между потоками.

GIL в CPython управляет интерпретатором python, позволяя одному потоку одновременно управлять интерпретатором. Это дает прирост производительности однопоточным программам, так как необходимо управлять только одной блокировкой, однако в некоторых ситуациях это не позволяет многопоточным программам CPython использовать все преимущества многопроцессорных систем.

? Когда пользователь пишет программу на python, есть разница между теми, которые привязаны к процессору по своей производительности, и теми, которые привязаны к вводу/выводу. CPU доводит программу до предела, выполняя множество операций одновременно, в то время как программа ввода/вывода вынуждена тратить время на ожидание ввода/вывода.

Поэтому только многопоточные программы, которые проводят много времени внутри GIL, интерпретируя байткод CPython, становятся узким местом. GIL может снижать производительность даже тогда, когда в этом нет острой необходимости. Например, программа, написанная на python, которая обрабатывает как задачи, связанные с IO, так и задачи, связанные с CPU:

import time, os
from threading import Thread, current_thread
from multiprocessing import current_process

COUNT = 200000000
SLEEP = 10

def io_bound(sec):
   pid = os.getpid()
   threadName = current_thread().name
   processName = current_process().name
   print(f"{pid} * {processName} * {threadName} 
           ---> Start sleeping...")
   time.sleep(sec)
   print(f"{pid} * {processName} * {threadName} 
           ---> Finished sleeping...")

def cpu_bound(n):
   pid = os.getpid()
   threadName = current_thread().name
   processName = current_process().name
   print(f"{pid} * {processName} * {threadName} 
           ---> Start counting...")
   while n>0:
          n -= 1
   print(f"{pid} * {processName} * {threadName} 
       ---> Finished counting...")

 def timeit(function,args,threaded=False):
      start = time.time()
      if threaded:
         t1 = Thread(target = function, args =(args, ))
         t2 = Thread(target = function, args =(args, ))
         t1.start()
         t2.start()
         t1.join()
         t2.join()
      else:
        function(args)
      end = time.time()
      print('Time taken in seconds for running {} on Argument {} is {}s -{}'.format(function,args,end - start,"Threaded" if threaded else "None Threaded"))

if __name__=="__main__":
      #Running io_bound task
      print("IO BOUND TASK NON THREADED")
      timeit(io_bound,SLEEP)

      print("IO BOUND TASK THREADED")
      #Running io_bound task in Thread
      timeit(io_bound,SLEEP,threaded=True)

      print("CPU BOUND TASK NON THREADED")
      #Running cpu_bound task
      timeit(cpu_bound,COUNT)

      print("CPU BOUND TASK THREADED")
      #Running cpu_bound task in Thread
      timeit(cpu_bound,COUNT,threaded=True)
Вход в полноэкранный режим Выход из полноэкранного режима

>>> IO BOUND TASK  NON THREADED
>>> 17244 * MainProcess * MainThread            ---> Start sleeping...
>>> 17244 * MainProcess * MainThread            ---> Finished sleeping...
>>> 17244 * MainProcess * MainThread            ---> Start sleeping...
>>> 17244 * MainProcess * MainThread            ---> Finished sleeping...
>>> Time taken in seconds for running <function io_bound at 0x00C50810> on Argument 10 is 20.036664724349976s -None Threaded
>>> IO BOUND TASK THREADED
>>> 10180 * MainProcess * Thread-1            ---> Start sleeping...
>>> 10180 * MainProcess * Thread-2            ---> Start sleeping...
>>> 10180 * MainProcess * Thread-1            ---> Finished sleeping...
>>> 10180 * MainProcess * Thread-2            ---> Finished sleeping...
>>> Time taken in seconds for running <function io_bound at 0x01B90810> on Argument 10 is 10.01464056968689s -Threaded
>>> CPU BOUND TASK NON THREADED
>>> 14172 * MainProcess * MainThread            ---> Start counting...
>>> 14172 * MainProcess * MainThread        ---> Finished counting...
>>> 14172 * MainProcess * MainThread            ---> Start counting...
>>> 14172 * MainProcess * MainThread        ---> Finished counting...
>>> Time taken in seconds for running <function cpu_bound at 0x018F4468> on Argument 200000000 is 44.90199875831604s -None Threaded
>>> CPU BOUND TASK THEADED
>>> 15616 * MainProcess * Thread-1            ---> Start counting...
>>> 15616 * MainProcess * Thread-2            ---> Start counting...
>>> 15616 * MainProcess * Thread-1        ---> Finished counting...
>>> 15616 * MainProcess * Thread-2        ---> Finished counting...
>>> Time taken in seconds for running <function cpu_bound at 0x01E44468> on Argument 200000000 is 106.09711360931396s -Threaded
Войти в полноэкранный режим Выход из полноэкранного режима

Из результатов видно, что multithreading отлично справляется с несколькими задачами, связанными с IO, со временем выполнения 10 секунд, в то время как для выполнения без потокового подхода потребовалось 20 секунд. Мы использовали тот же подход для выполнения задач, привязанных к процессору. Изначально он запускал наши потоки в одно и то же время, но в итоге мы увидели, что выполнение всей программы заняло около 106 секунд! Что же произошло? Это потому, что когда Thread-1 запускается, он приобретает глобальную блокировку интерпретатора (GIL), которая не позволяет Thread-2 использовать процессор. Следовательно, Thread-2 должен ждать, пока Thread-1 закончит свою задачу и освободит блокировку, чтобы он мог получить блокировку и выполнить свою задачу. Это получение и освобождение блокировки добавляет накладные расходы к общему времени выполнения. Поэтому можно с уверенностью сказать, что потоковое программирование не является идеальным решением для задач, выполнение которых зависит от центрального процессора.

Эта особенность затрудняет параллельное программирование. Если GIL сдерживает нас в плане параллелизма, не должны ли мы избавиться от него или иметь возможность отключить его? Это не так просто. Другие функции, библиотеки и пакеты стали полагаться на GIL, поэтому что-то должно заменить его, иначе вся экосистема сломается. Это крепкий орешек.

Взлом замков.

Мы выяснили, что CPython использует замки для защиты данных от скачков, но, хотя этот замок существует, программисты нашли способ реализовать параллелизм в явном виде. Когда дело доходит до GIL, мы можем использовать библиотеку multiprocessing, чтобы обойти глобальную блокировку. Многопроцессорная обработка позволяет достичь параллелизма в его истинном смысле, поскольку она выполняет код в разных процессах на разных ядрах процессора. Он создает новый экземпляр интерпретатора Python для запуска на каждом ядре. Различные процессы располагаются в разных областях памяти, поэтому обмен объектами между ними не так прост. В данной реализации python предоставляет отдельный интерпретатор для запуска каждому процессу; таким образом, в этом случае каждому процессу при многопроцессорной обработке предоставляется один поток.

import os
import time
from multiprocessing import Process, current_process

SLEEP = 10
COUNT = 200000000

def count_down(cnt):
   pid = os.getpid()
   processName = current_process().name
   print(f"{pid} * {processName} 
           ---> Start counting...")
   while cnt > 0:
       cnt -= 1

def io_bound(sec):
   pid = os.getpid()
   threadName = current_thread().name
   processName = current_process().name
   print(f"{pid} * {processName} * {threadName} 
           ---> Start sleeping...")
   time.sleep(sec)
   print(f"{pid} * {processName} * {threadName} 
           ---> Finished sleeping...")

if __name__ == '__main__':
# creating processes
    start = time.time()

    #CPU BOUND
    p1 = Process(target=count_down, args=(COUNT, ))
    p2 = Process(target=count_down, args=(COUNT, ))

    #IO BOUND
    #p1 = Process(target=, args=(SLEEP, ))
    #p2 = Process(target=count_down, args=(SLEEP, ))

  # starting process_thread
    p1.start()
    p2.start()

  # wait until finished
    p1.join()
    p2.join()

    stop = time.time()
    elapsed = stop - start

    print ("The time taken in seconds is :", elapsed)

Вход в полноэкранный режим Выход из полноэкранного режима
>>> 1660 * Process-2            ---> Start counting...
>>> 10184 * Process-1            ---> Start counting...
>>> The time taken in seconds is : 12.815475225448608
Вход в полноэкранный режим Выход из полноэкранного режима

Видно, что multiprocessing работает исключительно хорошо для cpu и io bound task. MainProcess запускает два подпроцесса, Process-1 и Process-2, имеющие разные PID, каждый из которых выполняет задачу уменьшения COUNT до нуля. Каждый процесс работает параллельно, используя отдельное ядро процессора и свой экземпляр интерпретатора Python, поэтому выполнение всей программы занимает всего 12 секунд.

Обратите внимание, что выходные данные могут быть выведены неупорядоченными, поскольку процессы не зависят друг от друга. Это происходит потому, что каждый процесс выполняет функцию в своем собственном основном потоке по умолчанию.

Мы также можем использовать библиотеку asyncio, чтобы обойти блокировку GIL. Основная концепция asyncio заключается в том, что один объект python, известный как цикл событий, управляет тем, как и когда выполняется каждая задача. Цикл событий знает о каждой задаче и ее состоянии. Состояние готовности указывает на то, что задача готова к выполнению, а стадия ожидания указывает на то, что задача ожидает завершения какой-то внешней задачи. В asyncio задачи никогда не передают управление и не прерываются в середине выполнения, поэтому совместное использование объектов является потокобезопасным.

import time
import asyncio

COUNT = 200000000

# asynchronous function defination
async def func_name(cnt):
       while cnt > 0:
           cnt -= 1

#asynchronous main function defination
async def main ():
  # Creating 2 tasks.....You could create as many tasks (n tasks)
  task1 = loop.create_task(func_name(COUNT))
  task2 = loop.create_task(func_name(COUNT))

  # await each task to execute before handing control back to the program
  await asyncio.wait([task1, task2])

if __name__ =='__main__':
  # get the event loop
  start_time = time.time()
  loop = asyncio.get_event_loop()
  # run all tasks in the event loop until completion
  loop.run_until_complete(main())
  loop.close()
  print("--- %s seconds ---" % (time.time() - start_time))
Вход в полноэкранный режим Выход из полноэкранного режима
>>> --- 41.74118399620056 seconds ---
Войти в полноэкранный режим Выход из полноэкранного режима

Мы видим, что asyncio требуется 41 секунда для завершения обратного отсчета, что лучше, чем 106 секунд multithreading, но не так хорошо, как 12 секунд multiprocessing для задач, связанных с процессором. Asyncio создает eventloop и две задачи, task1 и task2, эти задачи затем помещаются на eventloop. Затем программа ожидает выполнения задач, пока цикл событий выполняет все задачи до конца.

Наконец, чтобы полностью использовать всю мощь параллелизма в python, мы также можем использовать другой интерпретатор. JPython и IronPython не имеют GIL, что означает, что пользователь может в полной мере использовать многопроцессорные системы.

Оставьте комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *