python多进程与多线程

一、基本概念

1、进程process

什么是进程。最直观的就是一个个pid,官方的说法就:进程是程序在计算机上的一次执行活动。
从内核的观点看,进程的目的就是担当分配系统资源(CPU时间、内存等)的基本单位。
进程有独立的地址空间,一个进程崩溃后不会对其它进程产生影响。

2、线程thead

线程是进程的一个执行流,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
一个进程由几个线程组成,线程与同属一个进程的其他的线程共享进程所拥有的全部资源。
线程没有独立的地址空间,一个线程死掉就等于整个进程死掉。

3、多线程和多进程

在 Python 中,对于计算密集型任务,多进程占优势,对于 I/O 密集型任务,多线程占优势。
当然对运行一个程序来说,随着 CPU 的增多执行效率肯定会有所提高,这是因为一个程序基本上不会是纯计算或者纯 I/O,所以我们只能相对的去看一个程序到底是计算密集型还是 I/O 密集型。

二、Python多进程技术

python中的多进程主要使用到 multiprocessing 这个库

1、不使用进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time,os

def worker():
print("子进程{}执行中, 父进程{}".format(os.getpid(),os.getppid()))
time.sleep(2)
print("子进程{}终止".format(os.getpid()))

if __name__ == "__main__":
print("本机为",os.cpu_count(),"核 CPU")
print("主进程{}执行中, 开始时间={}".format(os.getpid(), time.strftime('%Y-%m-%d %H:%M:%S')))
start = time.time()

l=[]
# 创建子进程实例
for i in range(10):
p=Process(target=worker,name="worker"+str(i),args=())
l.append(p)

# 开启进程
for i in range(10):
l[i].start()

# 阻塞进程
for i in range(10):
l[i].join()

stop = time.time()
print("主进程终止,结束时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
print("总耗时 %s 秒" % (stop - start))

2、使用进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time,os

def worker(arg):
print("子进程{}执行中, 父进程{}".format(os.getpid(),os.getppid()))
time.sleep(2)
print("子进程{}终止".format(os.getpid()))

if __name__ == "__main__":
print("本机为",os.cpu_count(),"核 CPU")
print("主进程{}执行中, 开始时间={}".format(os.getpid(), time.strftime('%Y-%m-%d %H:%M:%S')))
start = time.time()

l = Pool(processes=5)
# 创建子进程实例
for i in range(10):
# l.apply(worker,args=(i,)) # 同步执行(Python官方建议废弃)
l.apply_async(worker,args=(i,)) # 异步执行

# 关闭进程池,停止接受其它进程
l.close()
# 阻塞进程
l.join()

stop = time.time()
print("主进程终止,结束时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
print("总耗时 %s 秒" % (stop - start))

三、Python多线程技术

python中的多进程主要使用到 threading 这个库

1、不使用线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# -*- coding:utf-8 -*-
from threading import Thread
import time,os

def worker(arg):
print("子线程执行中>>> 编号={}".format(arg))
time.sleep(2)
print("子线程终止>>> 编号={}".format(arg))

if __name__ == "__main__":
print("本机为",os.cpu_count(),"核 CPU") # 本机为4核

l = []
# 创建子线程实例
for i in range(10):
t = Thread(target=worker, name='one', args=(i,))
t.start()
l.append(t)
for p in l:
p.join()

关于线程互斥、同步等

2、使用线程池(multiprocessing库的线程池)

“from multiprocessing import Pool “这样导入的 Pool 表示的是进程池,”from multiprocessing.dummy import Pool”这样导入的 Pool表示的是线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# -*- coding:utf-8 -*-
from multiprocessing.dummy import Pool as ThreadPool
import time,os

def worker(arg):
print("子线程{}执行中".format(arg))
time.sleep(2)
print("子线程{}终止".format(arg))

if __name__ == "__main__":
print("本机为",os.cpu_count(),"核 CPU")
print("主线程执行中, 开始时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
start = time.time()

pool = ThreadPool(5)
results = pool.map(worker, range(10))

pool.close()
pool.join()

stop = time.time()
print("主线程终止,结束时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
print("总耗时 %s 秒" % (stop - start))

3、使用线程池(自定义线程池)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""
思路
1,将任务放在队列
1)创建队列:(初始化)
2)设置大小,线程池的最大容量
3)真实创建的线程 列表
4)空闲的线程数量

2,着手开始处理任务
1)创建线程
2)空闲线程数量大于0,则不再创建线程
3)创建线程池的数量 不能高于线程池的限制
4)根据任务个数判断 创建线程的数量
2)线程去队列中取任务
1)取任务包(任务包是一个元祖)
2)任务为空时,不再取(终止)
"""

import time
import threading
import queue

stopEvent = object() # 停止任务的标志

class ThreadPool(object):
def __init__(self, max_thread):
# 创建任务队列,可以放无限个任务
self.queue = queue.Queue()
# 指定最大线程数
self.max_thread = max_thread
# 停止标志
self.terminal = False
# 创建真实线程数
self.generate_list = []
# 空闲线程数
self.free_thread = []

def run(self, action, args, callback=None):
"""
线程池执行一个任务
INPUT ->
action:任务函数
args:任务参数
callback:执行完任务的回调函数,成功或者失败的返回值。
"""
# 线程池运行的条件:1)
if len(self.free_thread) == 0 and len(self.generate_list) < self.max_thread:
self.generate_thread()
task = (action, args, callback)
self.queue.put(task)

def callback(self):
"""
回调函数:循环取获取任务,并执行任务函数
"""
# 获取当前线程
current_thread = threading.current_thread()
self.generate_list.append(current_thread)
# 取任务并执行
event = self.queue.get()
# 事件类型是任务
while event != stopEvent: # 重点是这个判断 使任务终止
# 解开任务包 ,(任务是一个元祖)
# 执行任务
# 标记:执行任务前的状态,执行任务后的状态
action, args, callback = event
try:
ret = action(*args)
success = True
except Exception as x:
success = False
ret = x
if callback is not None:
try:
callback(success, ret)
except Exception as e:
print(e)
else:
pass
if not self.terminal:
self.free_thread.append(current_thread)
event = self.queue.get()
self.free_thread.remove(current_thread)
else:
# 停止进行取任务
event = stopEvent
else:
# 不是元祖,不是任务,则清空当前线程,不在去取任务
self.generate_list.remove(current_thread)

def generate_thread(self):
"""
创建一个线程
"""
t = threading.Thread(target=self.callback)
t.start()

# 终止取任务
def terminals(self):
"""
无论是否还有任务,终止线程
"""
self.terminal = True

def close(self):
"""
执行完所有的任务后,所有线程停止
"""
num = len(self.generate_list)
self.queue.empty()
while num:
self.queue.put(stopEvent)
num -= 1


def test(pi):
time.sleep(0.5)
print(pi)


pool = ThreadPool(10)

for i in range(100):
pool.run(action=test, args=(i,))

pool.terminals()
pool.close()
0%