python多进程
文章最后更新时间为:2018年08月14日 11:16:39
以下主要是python程序实现多进程的方法
1.fork()
Unix/Linux操作系统提供了一个fork()
系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。
子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。
python在Unix/Linux下实现多进程是非常方便的,直接调用fork()
函数即可实现:
请看代码:
# -- coding:utf-8 --
import os
print(os.getpid()) #获取子进程的进程号
pid = os.fork()
if pid == 0 :
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else :
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
上述代码执行结果为
2219
I (2219) just created a child process (2221).
I am child process (2221) and my parent is 2219.
2. multiprocessing
对于windows是没有fork()
函数的,但是python为了实现跨平台的多进程,创建了一个模块用于实现跨平台的多进程程序。
对于这个函数的介绍,请看官方文档
multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。
2.1 创建进程--Process
关于创建子进程,请看下面代码:
from multiprocessing import Process
import os
# 子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',)) # 创建一个进程
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
执行结果如下:
$ python index.py
Parent process 10992.
Child process will start.
Run child process test (1328)...
Child process end.
可以看到multiprocessing
中的Process
类可以很方便的创建一个子进程。关于其格式如下:
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
其中
- target 是函数名字,需要调用的函数
- args 函数需要的参数,以 tuple 的形式传入,必须有逗号
关于子进程类的方法,有如下说明:
star()
方法用于启动进程join()
方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。close()
用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。(关于进程池请看下文)
2.2 创建进程池--pool
Pool 可以提供指定数量的进程供用户使用,默认是 CPU 核数。当有新的请求提交到 Poll 的时候,如果池子没有满,会创建一个进程来执行,否则就会让该请求等待。
- Pool 对象调用 join 方法会等待所有的子进程执行完毕
- 调用 join 方法之前,必须调用 close
- 调用 close 之后就不能继续添加新的 Process 了
pool.apply_async
apply_async 方法用来同步执行进程,允许多个进程同时进入池子。
from multiprocessing import Pool
import os
import time
def run_task(name):
print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
time.sleep(1)
print('Task {0} end.'.format(name))
if __name__ == '__main__':
print('current process {0}'.format(os.getpid()))
p = Pool(processes=3)
for i in range(6):
p.apply_async(run_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All processes done!')
执行结果:
$ python index.py
current process 3728
Waiting for all subprocesses done...
Task 0 pid 5988 is running, parent id is 3728
Task 1 pid 11288 is running, parent id is 3728
Task 2 pid 12340 is running, parent id is 3728
Task 1 end.
Task 2 end.
Task 0 end.
Task 3 pid 11288 is running, parent id is 3728
Task 5 pid 5988 is running, parent id is 3728
Task 4 pid 12340 is running, parent id is 3728
Task 5 end.
Task 4 end.
Task 3 end.
All processes done!
pool.apply
apply(func[, args[, kwds]])
该方法只能允许一个进程进入池子,在一个进程结束之后,另外一个进程才可以进入池子。
from multiprocessing import Pool
import os
import time
def run_task(name):
print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
time.sleep(1)
print('Task {0} end.'.format(name))
if __name__ == '__main__':
print('current process {0}'.format(os.getpid()))
p = Pool(processes=3)
for i in range(6):
p.apply(run_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All processes done!')
执行结果:
$ python index.py
current process 3848
Task 0 pid 10884 is running, parent id is 3848
Task 0 end.
Task 1 pid 15900 is running, parent id is 3848
Task 1 end.
Task 2 pid 15276 is running, parent id is 3848
Task 2 end.
Task 3 pid 10884 is running, parent id is 3848
Task 3 end.
Task 4 pid 15900 is running, parent id is 3848
Task 4 end.
Task 5 pid 15276 is running, parent id is 3848
Task 5 end.
Waiting for all subprocesses done...
All processes done!
2.3 进程通信
关于进线程能否通过一个全局变量来实现通信,请看这篇文章和这篇文章
进程之间是不共享数据的,要实现进程间的通信,multiprocessing模块提供了两种形式:队列和管道。
队列:
Queue([maxsize])
创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。maxsize是队列中允许最大项数,省略则无大小限制。
队列先进先出,当使用Quere
进行进程通信的时候,有两种方法,get
和put
看实例:
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def proc_write(q,urls):
print('Process(%s) is writing...' % os.getpid())
for url in urls:
q.put(url)
print('Put %s to queue...' % url)
time.sleep(random.random())
# 读数据进程执行的代码:
def proc_read(q):
print('Process(%s) is reading...' % os.getpid())
while True:
url = q.get(True)
print('Get %s from queue.' % url)
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
proc_writer1 = Process(target=proc_write, args=(q,['url_1', 'url_2', 'url_3']))
proc_writer2 = Process(target=proc_write, args=(q,['url_4','url_5','url_6']))
proc_reader = Process(target=proc_read, args=(q,))
# 启动子进程proc_writer,写入:
proc_writer1.start()
proc_writer2.start()
# 启动子进程proc_reader,读取:
proc_reader.start()
# 等待proc_writer结束:
proc_writer1.join()
proc_writer2.join()
# proc_reader进程里是死循环,无法等待其结束,只能强行终止:
proc_reader.terminate()
执行结果:
$ python index.py
Process(6420) is writing...
Put url_1 to queue...
Process(14808) is reading...
Get url_1 from queue.
Process(9084) is writing...
Put url_4 to queue...
Get url_4 from queue.
Put url_5 to queue...
Get url_5 from queue.
Put url_6 to queue...
Get url_6 from queue.
Put url_2 to queue...
Get url_2 from queue.
Put url_3 to queue...
Get url_3 from queue.
代码解读:
Quere有两种方法:
- get
get 方法用来从队列中读取 并删除一个元素。有两个参数可选,blocked 和 timeout
当取: blocked = True (默认),timeout 正值
。等待时间内,没有取到任何元素,会抛出 Queue.Empty 异常。
当取:locked = False
。则当quere为空时立即抛出异常。
- put
Put 方法用来插入数据到队列中,有两个可选参数,blocked 和 timeout。
当取:blocked = True(默认值),timeout 为正
。如果query已满,则会阻塞 timeout 指定的时间,直到该队列有剩余空间。如果超时,抛出 Queue.Full 异常。
当取:blocked = False
。如果 Queue 已满,立刻抛出 Queue.Full 异常。
管道:
Pipe([duplex])
在进程之间创建一条管道,并返回元组(con1,con2),其中con1,con2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道。dumplex:默认管道是双向的,如果将duplex射成False,con1只能用于接收,con2只能用于发送。
看下面例子:
# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe
def proc1(pipe):
pipe.send('Hello,This is proc1')
def proc2(pipe):
while True:
print("proc2 recieve:", pipe.recv())
if __name__ == "__main__":
(con1,con2) = Pipe()
p1 = Process(target=proc1, args=(con1,))
p2 = Process(target=proc2, args=(con2,))
p1.start()
p2.start()
p1.join()
p2.join(2) #限制执行时间最多为2秒
print('\nend all processes.')
结果
$ python index.py
proc2 recieve: Hello,This is proc1
end all processes.
当第二行输出时,因为管道中没有数据传来,Proc2处于阻塞状态,2秒后被强制结束。
以下是单向管道的例子,注意pipe[0],pipe[1]的分配。
# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe
def proc1(pipe):
pipe.send('Hello,This is proc1')
def proc2(pipe):
while True:
print("proc2 recieve:", pipe.recv())
if __name__ == "__main__":
(con1,con2) = Pipe(duplex=False)
p1 = Process(target=proc1, args=(con1,)) #con1为发送端
p2 = Process(target=proc2, args=(con2,)) #con2为接收端
p1.start()
p2.start()
p1.join()
p2.join(2) # 限制执行时间最多为2秒
print('\nend all processes.')
执行结果同上。
3. 总结
在Unix/Linux下,可以使用fork()调用实现多进程。
要实现跨平台的多进程,可以使用multiprocessing模块。
进程间通信是通过Queue、Pipes等实现的。
参考: