多线程编程

多线程编程

今天我们一起来探讨多线程编程。

首先,什么是线程?

线程相较于进程而言,是 CPU 调度的基本单位,线程是依赖于进程的。对于一个进程而言,其下有多个线程,线程共享进程的资源,线程之间会有竞态。

对于 IO 密集型的程序而言,线程和进程的性能相差不大。

接下来,我们将一起来学习 python 中的多线程编程。

简单使用

在 python 中,对于线程的操作库是 threading 模块。我们模拟一下抓取网页的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading
import time


def get_html(n):
time.sleep(n)
print("get html success!")


if __name__ == '__main__':
t = threading.Thread(target=get_html, args=(2,))

t.start()
t.join()
print("finished")

如果我需要并发去抓取不同的网页,应该如何操作呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading
import time


def get_html_1(url):
time.sleep(2)
print(f"get html 1 {url} success!")


def get_html_2(url):
time.sleep(1)
print(f"get html 2 {url} success!")


if __name__ == '__main__':
t1 = threading.Thread(target=get_html_1, args=("www.baidu.com",))
t2 = threading.Thread(target=get_html_2, args=("www.souhu.com",))

t1.start()
t1.join() # 阻塞
t2.start()
t2.join() # 阻塞
print("finished")

当然,除了以上的使用线程的方式外,我们还可以通过继承+重写 run 的方法来实现:

1
2
3
4
5
6
7
8
9
10
11
class GetHtmlDetail(threading.Thread):
def __init(self, name):
super().__init__(name=name)

def run(self):
print("running")

t1 = GetHtmlDetail("1")

t1.start()
t1.join()

线程间的通信

在前文中,我介绍了线程是依赖进程的,共享进程的资源,在线程间的通信中,我们可以通过共享变量来进行线程间的通信。当然,也可以通过 Queue 等方式进行通信。

共享变量

同样,我们也是实现一个生产者-消费者来模拟线程间的通信:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import threading
import time

queue = []

class Producer(threading.Thread):
def __init__(self):
super().__init__()

def run(self) -> None:
global queue
for i in range(0, 10):
time.sleep(1)
queue.append(i)
print(f"produce: {i} in queue")


class Consumer(threading.Thread):
def __init__(self):
super().__init__()

def run(self) -> None:
global queue
time.sleep(10)
while True:
if queue:
time.sleep(1)
data = queue.pop(0)
print(f"consumer data: {data}")

else:
break


if __name__ == '__main__':
p = Producer()
c = Consumer()


p.start()
c.start()

p.join()
c.join()

以上实现了一个生产者-消费者模型,演示了线程之间通过全局变量进行通信。由于 queue=[] 是非阻塞的,则先在 consumer 中 time.sleep(10) 等待生产物料就绪。在实际的应用中,我们一般不推荐共享变量的方式来完成线程通信,这是因为全局变量不是线程安全的,那么就会引发一些错误,接下来我们来看一下 Queue.

Queue

接下来,我将使用 Queue 来演示,在 python 中 from queue import Queue,其中的 Queue 是多线程安全的。我这里将演示,使用多个消费者来消费一个生产者。

queue.Queue 是一个阻塞队列,故会等待 data 的传入然后消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import threading
import time

from queue import Queue


class Producer(threading.Thread):
def __init__(self):
super().__init__()

def run(self) -> None:
for i in range(0, 10):
time.sleep(1)
queue.put(i)
print(f"produce: {i} in queue")

for _ in range(2):
queue.put(None)


class Consumer(threading.Thread):
def __init__(self, name):
super().__init__(name=name)

def run(self) -> None:
while True:
data = queue.get()
if data is None:
print(f"consumer: {self.name} exiting")
break
print(f"consumer: {self.name} data: {data}")


if __name__ == '__main__':
queue = Queue(maxsize=100) # 避免内存泄露
p = Producer()
c1 = Consumer("c1")
c2 = Consumer("c2")

p.start()
c1.start()
c2.start()

p.join()
c1.join()
c2.join()

那么为什么 Queue 是线程安全的呢?这是因为在每个操作之时都会进行获取锁和释放锁的操作。感兴趣的同学可以自行查阅学习。

线程同步

在多线程编程中,我们需要考虑线程同步,那么什么是线程同步以及为什么需要线程同步呢?

我们通过一个例子来解释为什么需要线程同步?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import threading

a = 0


def add():
global a
for i in range(1000000):
a += 1


def desc():
global a
for i in range(1000000):
a -= 1


if __name__ == '__main__':
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=desc)

t1.start()
t2.start()

t1.join()
t2.join()

print(a)

在上面的例子中,我们对一个全局编程进行操作,但是我们发现打印结果每次都不一样,且不为 0

1
-307326

这是为什么呢?这是因为 python GIL 的存在,GIL 是基于固定时间的调度方式,每次执行固定时间的字节码,导致 t1 还未完成执行完成时切换到了 t2,导致不是我们预期的结果。

Lock && Lock

那么如何解决这个问题呢?这就涉及到了线程的锁操作:Lock && RLock

这里简单介绍一下这二者的区别:

  1. threading.Lock
    • Lock 是一个简单的互斥锁(Mutex),也被称为普通锁。
    • 它只有两个状态:锁定(locked)和未锁定(unlocked)。
    • 一旦某个线程获取了 Lock,其他线程就无法获取该锁,直到锁被释放。
    • 即使是同一个线程多次调用 Lockacquire() 方法,也会导致该线程被阻塞,因为它已经持有了该锁。
  2. threading.RLock
    • RLock 是可重入锁(Reentrant Lock)。
    • 可重入锁允许一个线程多次获得同一个锁而不会发生阻塞,前提是该线程已经持有了这个锁。也就是说,一个线程可以多次调用 RLockacquire() 方法而不会被阻塞,只要它在之前已经获得了该锁,并且对应的 release() 方法的次数和 acquire() 方法的次数相等即可释放该锁。
    • 这种机制允许线程在递归或嵌套函数调用中安全地多次获取同一个锁,避免了死锁的可能性。

现在,我们就可以利用 锁 对以上的代码进行改造,达到我们想要的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import threading

a = 0

lock = threading.Lock()

def add():
global a
global lock
for i in range(1000000):
lock.acquire()
a += 1
lock.release()


def desc():
global a
global lock
for i in range(1000000):
lock.acquire()
a -= 1
lock.release()


if __name__ == '__main__':
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=desc)

t1.start()
t2.start()

t1.join()
t2.join()

print(a)

结果输出为 0;

当然,在使用锁的时候要结合应用场景,锁会影响性能且容器导致死锁,在使用时,要谨慎。

Condition

用于复杂的线程间的同步,例如我们要完成两个 AI 机器人的问答交互,应该如何实现呢?

我们这里通过一个例子来进行学习:

这里我们将使用两个线程模拟两个 AI 机器人的对话,实现效果是这样的:

1
2
3
4
5
6
天猫精灵: 小爱同学
小爱: 我在
天猫精灵: 我们来对古诗吧
小爱: 好啊
天猫精灵: 我住长江头
小爱: 我住长江尾

我们一般是先使用锁的机制来运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import threading


class XiaoAi(threading.Thread):
def __init__(self, lock):
super().__init__(name="小爱")
self.lock = lock

def run(self):
self.lock.acquire()
print(f"{self.name}: 我在")
self.lock.release()

self.lock.acquire()
print(f"{self.name}: 好啊")
self.lock.release()

self.lock.acquire()
print(f"{self.name}: 我住长江尾")
self.lock.release()


class Tianmao(threading.Thread):
def __init__(self, lock):
super().__init__(name="天猫精灵")
self.lock = lock

def run(self):
self.lock.acquire()
print(f"{self.name}: 小爱同学")
self.lock.release()

self.lock.acquire()
print(f"{self.name}: 我们来对古诗吧")
self.lock.release()

self.lock.acquire()
print(f"{self.name}: 我住长江头")
self.lock.release()


if __name__ == '__main__':
lock = threading.RLock()
xiaoai = XiaoAi(lock)
tianmao = Tianmao(lock)

tianmao.start()
xiaoai.start()


现实是:

1
2
3
4
5
6
天猫精灵: 小爱同学
天猫精灵: 我们来对古诗吧
天猫精灵: 我住长江头
小爱: 我在
小爱: 好啊
小爱: 我住长江尾

这里我们可以看到,两个 AI 机器人并没有完成交互的对话,这是为什么呢?

我们知道在 python 语言中,有 GIL 全局解释器锁的机制,其背后代码运行的原理是:基于固定时间的调度方式,每次执行固定时间的字节码,或者遇到系统 IO 时,强制释放。

在上述代码中,在一个固定时间内,天猫精灵就运行完了所有的代码,这样是不可行的。

于是,我们可以采用 threading.Condition()来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import threading


class XiaoAi(threading.Thread):
def __init__(self, con):
super().__init__(name="小爱")
self.con = con

def run(self):
with self.con:
self.con.wait() # wait() 方法只能通过 notify 唤醒
print(f"{self.name}: 我在")
self.con.notify()

self.con.wait()
print(f"{self.name}: 好啊")
self.con.notify()

self.con.wait()
print(f"{self.name}: 我住长江尾")
self.con.notify()


class Tianmao(threading.Thread):
def __init__(self, con):
super().__init__(name="天猫精灵")
self.con = con

def run(self):
with self.con:
print(f"{self.name}: 小爱同学")
self.con.notify() # 通知调用了 wait() 方的线程
self.con.wait() # 等待通知

print(f"{self.name}: 我们来对古诗吧")
self.con.notify() # 通知调用了 wait() 方的线程
self.con.wait() # 等待通知


print(f"{self.name}: 我住长江头")
self.con.notify() # 通知调用了 wait() 方的线程
self.con.wait() # 等待通知


if __name__ == '__main__':
con = threading.Condition()
xiaoai = XiaoAi(con)
tianmao = Tianmao(con)

xiaoai.start() # 先 wait
tianmao.start()

  • 启动顺序很重要,要先 wait, 后 notify
  • 代码逻辑很重要,要先 with self.con:, 后嵌套 wait() 后 notify

效果如下:

1
2
3
4
5
6
天猫精灵: 小爱同学
小爱: 我在
天猫精灵: 我们来对古诗吧
小爱: 好啊
天猫精灵: 我住长江头
小爱: 我住长江尾

那么这是如何实现的呢?通过代码逻辑来了解

当小爱同学启动时,会调用 con.wait() 方法,等待被唤醒,当天猫精灵启动时,会调用 con.notify 方法,通知小爱同学,打印语句,然后通过 con.notiry() 通知天猫打印语句。这样循环打印,就完成了两个线程的同步对话机制。

那么这是如何实现的呢?通过代码逻辑来了解

当小爱同学启动时,会调用 con.wait() 方法,等待被唤醒,当天猫精灵启动时,会调用 con.notify 方法,通知小爱同学,打印语句,然后通过 con.notiry() 通知天猫打印语句。这样循环打印,就完成了两个线程的同步对话机制。

现在我们来通过解析 Condition 源码的方式来了解其内部运行的原理是如何的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
class Condition:
"""Class that implements a condition variable.

A condition variable allows one or more threads to wait until they are
notified by another thread.

If the lock argument is given and not None, it must be a Lock or RLock
object, and it is used as the underlying lock. Otherwise, a new RLock object
is created and used as the underlying lock.

"""

def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = _deque()
def notify(self, n=1):
"""Wake up one or more threads waiting on this condition, if any.

If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.

This method wakes up at most n of the threads waiting for the condition
variable; it is a no-op if no threads are waiting.

"""
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = _deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
waiter.release()
try:
all_waiters.remove(waiter)
except ValueError:
pass

def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.

If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.

This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notify_all() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.

When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof).

When the underlying lock is an RLock, it is not released using its
release() method, since this may not actually unlock the lock when it
was acquired multiple times recursively. Instead, an internal interface
of the RLock class is used, which really unlocks it even when it has
been recursively acquired several times. Another internal interface is
then used to restore the recursion level when the lock is reacquired.

"""
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
self._acquire_restore(saved_state)
if not gotit:
try:
self._waiters.remove(waiter)
except ValueError:
pass

首先,我们来看一下 __init__ 部分,这里其实也是用到了锁机制——RLock()

然后就是这里的 self._waiters = _deque() ,这里其实是借助了一个 collection 中的 双端队列,是可阻塞的;

condition 实际上是有两层锁,一把底层锁(conditino._lock)会在线程调用了 wait 方法的时候释放,并且在每次调用 wait()时,会生成一把锁,然后放入 self._waiters 中等待被释放,当调用了 notify 方法时,就会获取队列中的锁,然后释放。

信号量-Semaphore

在 Python 中的 threading 模块中,Semaphore(信号量)是一种用于线程同步的机制。它可以控制同时访问某个资源的线程数量。Semaphore 维护着一个内部计数器,可以通过 acquire() 和 release() 方法来增加或减少计数器的值。

例如我现在有个爬虫脚本,我们知道,某些网站对于爬虫是有一些反爬机制的,避免一些异常流量,避免多个线程的爬虫导致的网站崩溃。那么我们要如何正确有效的进行资源的爬取呢?

答案是:通过 Semaphore 模块,控制线程的创建并设置一定时间间隔取爬取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import threading
import time
from threading import Thread


class HtmlSpider(Thread):
def __init__(self, url, sem):
super().__init__()
self.url = url
self.sem = sem

def run(self):
time.sleep(2)
print("got html text success")
self.sem.release()


class UrlProducer(Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem

def run(self) -> None:
for i in range(20):
self.sem.acquire()
html_thread = HtmlSpider("https://www.baidu.com", self.sem)
html_thread.start()



if __name__ == '__main__':
sem = threading.Semaphore(3)
url_producer = UrlProducer(sem)
url_producer.start()

通过上述代码,在创建 20 个线程的过程中,就可以实现每次创建 3 个线程,而不是一次性创建 20 个了。

其实,Semaphore 也是基于 Condition 进行创建使用的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Semaphore:
"""This class implements semaphore objects.

Semaphores manage a counter representing the number of release() calls minus
the number of acquire() calls, plus an initial value. The acquire() method
blocks if necessary until it can return without making the counter
negative. If not given, value defaults to 1.

"""

# After Tim Peters' semaphore class, but not quite the same (no maximum)

def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value

关于详细信息可参考源码

TreadPoolExecutor 线程池

为什么需要线程池?

线程池是一种并发编程的技术,用于管理和复用线程。它由一组预先创建的线程组成,这些线程可以被重复使用来执行多个任务。线程池的主要目的是优化线程的创建和销毁过程,以提高程序的性能和资源利用率。

以下是一些使用线程池的好处:

  1. 降低线程创建和销毁的开销:线程的创建和销毁是一项开销较高的操作,涉及到操作系统的调度和资源管理。通过使用线程池,可以预先创建一组线程,并重复使用它们来执行多个任务,避免了频繁的创建和销毁线程的开销。
  2. 提高系统的响应性:在多线程环境下,任务提交给线程池后可以立即开始执行,而不需要等待新线程的创建。这样可以大大减少任务的等待时间,提高系统的响应性和吞吐量。
  3. 控制并发数量:线程池可以通过设置最大线程数来限制并发执行的任务数量。这可以帮助我们控制系统资源的使用,避免过多的线程竞争和资源抢占导致的性能下降或系统崩溃。
  4. 提供任务排队和调度机制:线程池通常提供了任务队列,用于存储待执行的任务。当线程池中的线程空闲时,它们可以从任务队列中获取任务并执行。这种任务调度机制可以有效地平衡任务的执行,并确保任务按照预期的顺序和优先级进行处理。
  5. 资源管理和监控:线程池可以提供一些额外的功能,如线程超时管理、线程数动态调整、异常处理等。它们可以帮助我们更好地管理和监控线程的行为,提高程序的稳定性和可靠性。

总之,线程池是一种有效管理线程的技术,它能够提高程序的性能、响应性和资源利用率。通过合理配置线程池的大小和参数,可以更好地控制线程的并发数量,避免资源竞争和性能问题,提供更好的用户体验和系统可靠性。

简单使用

  • ThreadPoolExecutor() 线程池对象
  • task.submit 注册函数到线程池中
  • task.cancel 取消 task 的运行
  • task.result 线程的运行结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED

# futures 可以让多进程和多线程编码接口一致
import time


def get_html(times):
time.sleep(times)
print(f"get page {times}")
return times


if __name__ == '__main__':
executor = ThreadPoolExecutor(max_workers=1)

# # 通过 submit 函数提交执行的函数到线程池中,submit 是立即返回
# task1 = executor.submit(get_html, (3,))
# task2 = executor.submit(get_html, (2,))
#
# print(task1.done())
# print(task2.cancel())
# time.sleep(3)
# print(task1.done())

# 获取已经成功的 task 的返回
urls = [3, 2, 4]
all_task = [executor.submit(get_html, url) for url in urls]
# wait 控制主线程的推出
wait(all_task, return_when=FIRST_COMPLETED)
print("main")
for future in as_completed(all_task):
data = future.result()
print(f"get {data} page")

# # 通过 executor 的 map 获取已经完成的值
# for data in executor.map(get_html, urls):
# print(f"get {data} page")

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class ThreadPoolExecutor(_base.Executor):

# Used to assign unique thread names when thread_name_prefix is not supplied.
_counter = itertools.count().__next__

def __init__(self, max_workers=None, thread_name_prefix=''):
"""Initializes a new ThreadPoolExecutor instance.

Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
"""
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
# used to overlap I/O instead of CPU work.
max_workers = (os.cpu_count() or 1) * 5
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")

self._max_workers = max_workers
self._work_queue = queue.Queue()
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._thread_name_prefix = (thread_name_prefix or
("ThreadPoolExecutor-%d" % self._counter()))

def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')

f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)

self._work_queue.put(w)
self._adjust_thread_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__

在上述的源码中,我们通过 submit 函数模块可以比较清楚的看到一个 函数 是如何注册到 Future 中的

  • 创建一个锁 Thread.Lock() 保证线程安全

  • 创建 Future() 实例 并将其作为参数传递到 work_item 中, 并 put 到 队列中

    • Future——未来对象,可以理解为 task 的容器对象,其中包含了 task 的执行情况,理解 Future 的设计理念是 Python 异步编程中的关键;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    class Future(object):
    """Represents the result of an asynchronous computation."""

    def __init__(self):
    """Initializes the future. Should not be called by clients."""
    self._condition = threading.Condition()
    self._state = PENDING
    self._result = None
    self._exception = None
    self._waiters = []
    self._done_callbacks = []

    def _invoke_callbacks(self):
    for callback in self._done_callbacks:
    try:
    callback(self)
    except Exception:
    LOGGER.exception('exception calling callback for %r', self)

    可以看到其中也使用了 threading.Condition 来控制线程的管理

    • _workItem
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    class _WorkItem(object):
    def __init__(self, future, fn, args, kwargs):
    self.future = future
    self.fn = fn
    self.args = args
    self.kwargs = kwargs

    def run(self):
    if not self.future.set_running_or_notify_cancel():
    return

    try:
    result = self.fn(*self.args, **self.kwargs)
    except BaseException as exc:
    self.future.set_exception(exc)
    # Break a reference cycle with the exception 'exc'
    self = None
    else:
    self.future.set_result(result)

    其中比较重要的代码为:result = self.fn(*self.args, **self.kwargs) && self.future.set_result(result)

​ 分别是 运行函数,并获取结果,以及将结果保存至 future 中

  • _adjust_thread_count() 函数,控制线程池的大小
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def _adjust_thread_count(self):
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
  • target = _worker() 运行函数的主体对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    def _worker(executor_reference, work_queue):
    try:
    while True:
    work_item = work_queue.get(block=True)
    if work_item is not None:
    work_item.run()
    # Delete references to object. See issue16284
    del work_item
    continue
    executor = executor_reference()
    # Exit if:
    # - The interpreter is shutting down OR
    # - The executor that owns the worker has been collected OR
    # - The executor that owns the worker has been shutdown.
    if _shutdown or executor is None or executor._shutdown:
    # Notice other workers
    work_queue.put(None)
    return
    del executor
    except BaseException:
    _base.LOGGER.critical('Exception in worker', exc_info=True)

    这里不断从 wok_queue 中获取 work_item 实例,然后调用其 run 方法,然后销毁依赖;

  • _threads_queues 是继承自 MutableMapping 保存线程池中的队列对象

以上就是多线程编程的全部内容

总结

在实际的开发工作中,对于 CPU 密集型的任务,可以考虑多进程编程;

对于 IO 密集型的任务,可以考虑多线程编程;

多线程的缺点就是 GIL 的存在以及锁会影响性能;

但是进程的创建和销毁会高于线程,所以要结合具体应用场景去使用哦

-------------THANKS FOR READING-------------