您现在的位置是:首页 > 技术教程 正文

Python多进程详解

admin 阅读: 2024-03-23
后台-插件-广告管理-内容页头部广告(手机)

文章目录

  • 1. 多进程
  • 2. 创建进程
    • 2.1 直接创建
    • 2.2 继承创建
  • 3. 守护进程和join()方法
  • 4. 进程锁
  • 5. 进程通信
    • 5.1 Queue
    • 5.2 Pipe
  • 6. 进程数据共享
    • 6.1 Value
    • 6.2 Array
  • 7. 进程池
  • 参考文章

前面的多线程文章已经讲过了,Python中的多线程实际上是一种虚假的多线程,在大多时候甚至起到的效果是让运行时间更加的漫长,至少在Python中使用的价值不高,就比如写爬虫的时候只有多进程才能做到同时利用多个CPU内核,使得程序程序运行时间大大缩短,下面我们就来讲讲Python中的多进程。

1. 多进程

进程是指一个程序在给定数据集合上的一次执行过程,是系统进行资源分配和运行调用的独立单位。可以简单地理解为操作系统中正在执行的程序。也就说,每个应用程序都有一个自己的进程。

每一个进程启动时都会最先产生一个线程,即主线程。然后主线程会再创建其他的子线程。

Python中的进程类使用的是 multiprocessing 模块,该模块的API大部分复制了 threading 模块的API,该模块的常用方法如下。

import multiprocessing print('子进程的列表:{}'.format(multiprocessing.active_children())) print('电脑的CPU数量:{}'.format(multiprocessing.cpu_count())) print('现在运行的进程:{}'.format(multiprocessing.current_process()))
  • 1
  • 2
  • 3
  • 4
  • 5

输出如下:

子进程的列表:[]
电脑的CPU数量:12
现在运行的进程:<_MainProcess(MainProcess, started)>

2. 创建进程

创建线程的方法有两种,一种是直接使用 multiprocessing 模块里面的类来进行创建,一种是继承 multiprocessing 模块的类写一个类来对线程进行创建。

2.1 直接创建

我们可以通过直接从 multiprocessing.Process 继承创建一个新的子类,并实例化后调用 start() 方法启动新进程,即相当于它调用了进程的 run() 方法。

该方法的参数如下:

Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
  • 1

target 指要创建的进程的方法名,name 指给此进程命名,命名后可以调用 multiprocessing.current_process().name 方法输出该进程的名字, args/kwargs 指 target 指向的方法需要传递的参数,必须是元组形式,如果只有一个参数,需要以添加逗号。

假如我们创建两个进程,一个每隔两秒对传入的数加2,一个每隔1秒对传入的数加1,代码示例如下:

import multiprocessing from multiprocessing import Process import time def process1(num): while True: num += 2 print('{} is running >> {}'.format(multiprocessing.current_process().name, num)) time.sleep(2) if __name__ == '__main__': # 设置进程 new_pro = Process(target=process1, name='Add2', args=(100,)) # 进程开始 new_pro.start() n = 0 while True: n += 1 print('{} is running >> {}'.format(multiprocessing.current_process().name, n)) time.sleep(1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

输出结果如下:

MainProcess is running >> 1
Add2 is running >> 102
MainProcess is running >> 2
MainProcess is running >> 3
Add2 is running >> 104
MainProcess is running >> 4
MainProcess is running >> 5
Add2 is running >> 106

注意使用多进程的时候需要特别注意,必须要有 if __name__ == '__main__':, 该语句下的代码相当于主进程,没有该语句会报错。

简单解释下上述内容,由于Python运行过程中,新创建进程后,进程会导入正在运行的文件,即如果没有 if __name__ == '__main__':,代码在运行到 new_pro 时,新的进程会重新读入代码,新进程认为其是要再次运行的代码,这是子进程又一次运行到 new_pro ,但是在 multiprocessing.Process 的源码中是对子进程再次产生子进程是做了限制的,是不允许的,于是便会出现错误。

2.2 继承创建

即继承Process来自定义进程类,重写run方法。

import multiprocessing from multiprocessing import Process import time # 继承进程类 class MyProcess(Process): def __init__(self, num): super().__init__() # 必须调用父类的初始化方法 self.num = num def run(self) -> None: while True: self.num += 2 print('{} is running >> {}'.format(multiprocessing.current_process().name, self.num)) time.sleep(2) if __name__ == '__main__': new_pro = MyProcess(100) # 设置进程名字 new_pro.name = "Add2" new_pro.start() n = 0 while True: n += 1 print('{} is running >> {}'.format(multiprocessing.current_process().name, n)) time.sleep(1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

3. 守护进程和join()方法

如果当前python进程是守护进程,那么意味着这个进程是“不重要”的,“不重要”意味着如果他的主进程结束了但该守护进程没有运行完,守护进程就会被强制结束。如果进程是非守护进程,那么父进程只有等到非守护进程运行完毕后才能结束。如果需要设置一个进程为守护进程,只需要将其 daemon 参数设置为 True 即可。

join 方法的参数如下:

join([timeout])
  • 1

如果可选参数 timeout 是 None (默认值),则该方法将阻塞其他所有进程(包括主进程),直到调用 join() 方法的进程终止。如果 timeout 是一个正数,它最多会阻塞 timeout 秒。

import multiprocessing from multiprocessing import Process import time def process1(num): while True: num += 1 print('{} is running >> {}'.format(multiprocessing.current_process().name, num)) time.sleep(1) def process2(num): while True: num += 2 print('{} is running >> {}'.format(multiprocessing.current_process().name, num)) time.sleep(2) if __name__ == '__main__': new_pro1 = Process(target=process1, name='Add1', args=(100,)) new_pro1.daemon = True new_pro1.start() new_pro1.join(3) new_pro2 = Process(target=process2, name='Add2', args=(1,)) new_pro2.daemon = True new_pro2.start() time.sleep(3) print('{} Already Endding'.format(multiprocessing.current_process().name))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

输出结果如下:

Add1 is running >> 101
Add1 is running >> 102
Add1 is running >> 103
Add2 is running >> 3
Add1 is running >> 104
Add1 is running >> 105
Add2 is running >> 5
Add1 is running >> 106
MainProcess Already Endding

以上结果显示,前三秒钟,Add1 阻塞了所有的进程,包括主进程,过了三秒后,阻塞结束,再过三秒后,主进程运行结束,由于没有了非守护进程,两个守护进程没有了守护的意义,故程序结束。

4. 进程锁

进程锁与线程锁不能说相似吧,也就是一模一样了 ,只不过是库变了一下,甚至连API都一样,仅仅是使用时由 lock = threading.Lock() 变为了 lock = multiprocessing.Lock() 等,具体的使用可以看多线程的这篇文章中的线程锁。

5. 进程通信

进程之间不共享数据的。如果进程之间需要进行通信,则要用到 Queue 模块或者 Pipe 模块来实现。

例如,我有两个进程,一个进程会产生一个随机数,另一个进程需要将上一个进程产生的随机数进行加1操作,我想大部分人都是想的是用全局变量来进行操作,操作如下:

from multiprocessing import Process import random data = 0 def process1(): global data data = random.random() print('产生的data = {}'.format(data)) def process2(): global data data = data + 1 print('加一后的data = {}'.format(data)) if __name__ == '__main__': new_pro1 = Process(target=process1) new_pro1.start() new_pro2 = Process(target=process2) new_pro2.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

输出结果为:

加一后的data = 1
产生的data = 0.885936521669484

很明显没有达到想要的目的。那么,为什么会这样呢?那是因为每个子进程享有独立的内存空间,接收进程产生的数据不能马上同步到转发进程中。所以,我们接下来讲讲使用 Queue 模块或者 Pipe 模块进行进程间的通信。

5.1 Queue

Queue 模块中最常用的方法就是 put 以及 get 方法了,一个是将数据放入队列中,一个是将数据从队列中取出。其他诸如 empty(),full() 等方法由于多线程或多进程的环境,这些方法是不可靠的,就不进行介绍了。

上述方法如下:

  • Queuemultiprocessing.Queue([maxsize])
    • 1
    maxsize 参数可选,如果填入了参数,则申请一个 maxsize 大小的队列。
  • putput(obj[, block[, timeout]])
    • 1
    将 obj 放入队列。如果可选参数 block 是 True (默认值) 而且 timeout 是 None (默认值), 将会阻塞当前进程,直到有空的缓冲槽。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的缓冲槽时抛出 queue.Full 异常。反之 (block 是 False 时),仅当有可用缓冲槽时才放入对象,否则抛出 queue.Full 异常 (在这种情形下 timeout 参数会被忽略)。
  • getget([block[, timeout]])
    • 1
    从队列中取出并返回对象。如果可选参数 block 是 True (默认值) 而且 timeout 是 None (默认值), 将会阻塞当前进程,直到队列中出现可用的对象。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的对象时抛出 queue.Empty 异常。反之 (block 是 False 时),仅当有可用对象能够取出时返回,否则抛出 queue.Empty 异常 (在这种情形下 timeout 参数会被忽略)。

使用该模块将上面的例子写为进程间的通信后改为:

from multiprocessing import Process, Queue import random def process1(q): data = random.random() # 将数据放入队列中 q.put(data) print('产生的data = {}'.format(data)) def process2(q): # 从队列中得到数据 data = q.get() data = data + 1 print('加一后的data = {}'.format(data)) if __name__ == '__main__': # 初始化队列 queue = Queue() new_pro1 = Process(target=process1, args=(queue,)) new_pro1.start() new_pro2 = Process(target=process2, args=(queue,)) new_pro2.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

输出结果为:

产生的data = 0.2752695453210734
加一后的data = 1.2752695453210734

5.2 Pipe

如果你创建了很多个子进程,那么其中任何一个子进程都可以对Queue进行存(put)和取(get)。但Pipe不一样,Pipe只提供两个端点,只允许两个子进程进行存(send)和取(recv)。也就是说,Pipe实现了两个子进程之间的通信。

将上面的例子使用 Pipe 进行改动后,程序如下:

from multiprocessing import Process, Pipe import random def process1(conn_1): data = random.random() # 将数据放入管道的一端 conn_1.send(data) print('产生的data = {}'.format(data)) def process2(conn_2): # 从管道另一端得到数据 data = conn_2.recv() data = data + 1 print('加一后的data = {}'.format(data)) if __name__ == '__main__': # 初始化管道的两端 conn_1, conn_2 = Pipe() new_pro1 = Process(target=process1, args=(conn_1,)) new_pro1.start() new_pro2 = Process(target=process2, args=(conn_2,)) new_pro2.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

输出结果如下:

产生的data = 0.07282616926609853
加一后的data = 1.0728261692660985

6. 进程数据共享

上面的进程通信中, Queue 要求要先进先出, Pipe 只能够实现两个进程的存取,如果要一个数据拿给所有进程使用且不要求先进先出,那该使用什么呢?这就需要用到进程间的数据共享了。

6.1 Value

Value 数据共享类最多能够共享一个值,该函数的参数如下:

Value(typecode_or_type, args, lock=True)
  • 1

上述方法中,参数 typecode_or_type 定义 ctypes() 对象的类型,可以传 Type code 或 C Type,具体对照表见下文。args 传递给 typecode_or_type 构造函数的参数,lock 默认为True,创建一个互斥锁来限制对Value对象的访问,如果传入一个锁,如Lock或RLock的实例,将用于同步。如果传入False,Value的实例就不会被锁保护,它将不是进程安全的。

Type codeC TypePython TypeMinimum size in bytes
'b'signed charint1
'B'unsigned charint1
'u'Py_UNICODEUnicode character2
'h'signed shortint2
'H'unsigned shortint2
'i'signed intint2
'I'unsigned intint2
'l'signed longint4
'L'unsigned longint4
'q'signed long longint8
'Q'unsigned long longint8
'f'floatfloat4
'd'doublefloat8

例如下面我们在一个进程中传入值并对其进行改动,另一个进程输出传入的值,代码如下:

from multiprocessing import Process, Value def process1(n): n.value = 1 def process2(n): print('改变后的参数 = {}'.format(n.value)) if __name__ == '__main__': # 初始化value num = Value('d', 0) new_pro1 = Process(target=process1, args=(num,)) new_pro1.start() new_pro1.join() new_pro2 = Process(target=process2, args=(num,)) new_pro2.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

输出如下:

改变后的参数 = 1.0

6.2 Array

上述的 Value 类只能传递一个参数,但是 Array 可以传递很多的参数,该方法参数如下:

Array(typecode_or_type, size_or_initializer, **kwds[, lock])
  • 1

typecode_or_type 参数与上面的相同,同样参照上表,size_or_initializer 如果它是一个整数,那么它确定数组的长度,并且数组将被初始化为0。否则,size_or_initializer 是用于初始化数组的序列,其长度决定数组的长度。kwds 是传递给 typecode_or_type 构造函数的参数,lock 参数与上面的相同。

例如,我们传入一个0数组给一个进程,然后在另一个进程中计算其和,代码如下:

from multiprocessing import Process, Array def process1(n): n[3] = 0 n[4] = 0 def process2(n): # 从队列中得到数据 print('数组的和为 = {}'.format(sum(n))) if __name__ == '__main__': # 初始化数组 num = Array('d', range(5)) new_pro1 = Process(target=process1, args=(num,)) new_pro1.start() new_pro1.join() new_pro2 = Process(target=process2, args=(num,)) new_pro2.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

输出如下:

数组的和为 = 3.0

7. 进程池

进程池即可以提供指定数量的进程给用户使用,即当有新的请求提交到进程池中时,如果池未满,则会创建一个新的进程用来执行该请求;反之,如果池中的进程数已经达到规定最大值,那么该请求就会等待,只要池中有进程空闲下来,该请求就能得到执行。

进程池的话Python中有两个方法可以实现,一个是 multiprocessing 模块自带的 Pool 类,还有一个是 concurrent.futures.ProcessPoolExecutor 进行实现,其中后者的API与线程池的API一模一样,可以参考这里的线程池来进行学习,这里我介绍下前面一种方式。

该模块的常用方法如下:

  • PoolPool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
    • 1
    该类一般初始化使用的是 processes ,该参数是要使用的工作进程数目。如果 processes 为 None,则使用 os.cpu_count() 返回的值。
  • applyapply(func[, args[, kwds]])
    • 1
    使用阻塞的方式调用 func,必须等待上⼀个进程执行完任务后才能执行下一个进程,了解即可,几乎不用
  • apply_asyncapply_async(func[, args[, kwds[, callback[, error_callback]]]])
    • 1
    使用非阻塞的方式调用 func(任务并行执行),args 为传递给 func 的参数列表,kwds 为传递给 func 的关键字参数列表。
  • terminateterminate()
    • 1
    不管任务是否完成,立即终止
  • closeclose()
    • 1
    关闭Pool,使其不再接受新的任务。
  • joinjoin()
    • 1
    主进程阻塞,等待子进程的退出,必须在 close 或 terminate 之后使用。

下面创建一个容量为4的进程池,并让10个进程都停留三秒输出进程名,代码如下:

import multiprocessing from multiprocessing import Process, Pool import time def proc(): time.sleep(3) print('{} Already Endding >> {}'.format(multiprocessing.current_process().name, time.strftime('%H:%M:%S',time.localtime(time.time())))) if __name__ == '__main__': # 创建容量为4的进程池 pool = Pool(4) for i in range(10): pool.apply_async(proc) pool.close() # 阻塞主进程,等所有子进程运行完后再通过 pool.join() print('{} Already Endding >> {}'.format(multiprocessing.current_process().name, time.strftime('%H:%M:%S', time.localtime(time.time()))))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

输出如下:

SpawnPoolWorker-1 Already Endding >> 17:16:15
SpawnPoolWorker-4 Already Endding >> 17:16:15
SpawnPoolWorker-3 Already Endding >> 17:16:15
SpawnPoolWorker-2 Already Endding >> 17:16:15
SpawnPoolWorker-1 Already Endding >> 17:16:18
SpawnPoolWorker-2 Already Endding >> 17:16:18
SpawnPoolWorker-4 Already Endding >> 17:16:18
SpawnPoolWorker-3 Already Endding >> 17:16:18
SpawnPoolWorker-1 Already Endding >> 17:16:21
SpawnPoolWorker-3 Already Endding >> 17:16:21
MainProcess Already Endding >> 17:16:21

参考文章

[1] : https://docs.python.org/zh-cn/3.7/library/multiprocessing.html
[2] : https://www.freesion.com/article/2458937949/
[3] : https://blog.csdn.net/zong596568821xp/article/details/99678390
[4] : https://zhuanlan.zhihu.com/p/493699150
[5] : https://zhuanlan.zhihu.com/p/477826233
[6] : https://blog.csdn.net/weixin_56319791/article/details/122270700
[7] : https://zhuanlan.zhihu.com/p/568073350
[8] : https://www.jb51.net/article/230310.htm

标签:
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

在线投稿:投稿 站长QQ:1888636

后台-插件-广告管理-内容页尾部广告(手机)
关注我们

扫一扫关注我们,了解最新精彩内容

搜索