python多进程

文章最后更新时间为:2018年08月14日 11:16:39

以下主要是python程序实现多进程的方法

1.fork()

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

python在Unix/Linux下实现多进程是非常方便的,直接调用fork()函数即可实现:

请看代码:

# -- coding:utf-8 --
import os
print(os.getpid()) #获取子进程的进程号
pid = os.fork()
if pid == 0 :
  print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else :
  print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

上述代码执行结果为

2219
I (2219) just created a child process (2221).
I am child process (2221) and my parent is 2219.

2. multiprocessing

对于windows是没有fork()函数的,但是python为了实现跨平台的多进程,创建了一个模块用于实现跨平台的多进程程序。

对于这个函数的介绍,请看官方文档

multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

2.1 创建进程--Process

关于创建子进程,请看下面代码:

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',)) # 创建一个进程
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

执行结果如下:

$ python index.py
Parent process 10992.
Child process will start.
Run child process test (1328)...
Child process end.

可以看到multiprocessing中的Process类可以很方便的创建一个子进程。关于其格式如下:

multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

其中

  • target 是函数名字,需要调用的函数
  • args 函数需要的参数,以 tuple 的形式传入,必须有逗号

关于子进程类的方法,有如下说明:

  • star()方法用于启动进程
  • join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。
  • close()用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。(关于进程池请看下文)

2.2 创建进程池--pool

Pool 可以提供指定数量的进程供用户使用,默认是 CPU 核数。当有新的请求提交到 Poll 的时候,如果池子没有满,会创建一个进程来执行,否则就会让该请求等待。

  • Pool 对象调用 join 方法会等待所有的子进程执行完毕
  • 调用 join 方法之前,必须调用 close
  • 调用 close 之后就不能继续添加新的 Process 了

pool.apply_async

apply_async 方法用来同步执行进程,允许多个进程同时进入池子。

from multiprocessing import Pool
import os
import time

def run_task(name):
    print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
    time.sleep(1)
    print('Task {0} end.'.format(name))

if __name__ == '__main__':
    print('current process {0}'.format(os.getpid()))
    p = Pool(processes=3)
    for i in range(6):
        p.apply_async(run_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All processes done!')

执行结果:

$ python index.py
current process 3728
Waiting for all subprocesses done...
Task 0 pid 5988 is running, parent id is 3728
Task 1 pid 11288 is running, parent id is 3728
Task 2 pid 12340 is running, parent id is 3728
Task 1 end.
Task 2 end.
Task 0 end.
Task 3 pid 11288 is running, parent id is 3728
Task 5 pid 5988 is running, parent id is 3728
Task 4 pid 12340 is running, parent id is 3728
Task 5 end.
Task 4 end.
Task 3 end.
All processes done!

pool.apply

apply(func[, args[, kwds]])

该方法只能允许一个进程进入池子,在一个进程结束之后,另外一个进程才可以进入池子。

from multiprocessing import Pool
import os
import time

def run_task(name):
    print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
    time.sleep(1)
    print('Task {0} end.'.format(name))

if __name__ == '__main__':
    print('current process {0}'.format(os.getpid()))
    p = Pool(processes=3)
    for i in range(6):
        p.apply(run_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All processes done!')

执行结果:

$ python index.py
current process 3848
Task 0 pid 10884 is running, parent id is 3848
Task 0 end.
Task 1 pid 15900 is running, parent id is 3848
Task 1 end.
Task 2 pid 15276 is running, parent id is 3848
Task 2 end.
Task 3 pid 10884 is running, parent id is 3848
Task 3 end.
Task 4 pid 15900 is running, parent id is 3848
Task 4 end.
Task 5 pid 15276 is running, parent id is 3848
Task 5 end.
Waiting for all subprocesses done...
All processes done!

2.3 进程通信

关于进线程能否通过一个全局变量来实现通信,请看这篇文章这篇文章
进程之间是不共享数据的,要实现进程间的通信,multiprocessing模块提供了两种形式:队列和管道。

队列:

Queue([maxsize])

创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。maxsize是队列中允许最大项数,省略则无大小限制。

队列先进先出,当使用Quere进行进程通信的时候,有两种方法,getput

看实例:

from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def proc_write(q,urls):
    print('Process(%s) is writing...' % os.getpid())
    for url in urls:
        q.put(url)
        print('Put %s to queue...' % url)
        time.sleep(random.random())
# 读数据进程执行的代码:
def proc_read(q):
    print('Process(%s) is reading...' % os.getpid())
    while True:
        url = q.get(True)
        print('Get %s from queue.' % url)
if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    proc_writer1 = Process(target=proc_write, args=(q,['url_1', 'url_2', 'url_3']))
    proc_writer2 = Process(target=proc_write, args=(q,['url_4','url_5','url_6']))
    proc_reader = Process(target=proc_read, args=(q,))
    # 启动子进程proc_writer,写入:
    proc_writer1.start()
    proc_writer2.start()
    # 启动子进程proc_reader,读取:
    proc_reader.start()
    # 等待proc_writer结束:
    proc_writer1.join()
    proc_writer2.join()
    # proc_reader进程里是死循环,无法等待其结束,只能强行终止:
    proc_reader.terminate()

执行结果:

$ python index.py             
Process(6420) is writing...   
Put url_1 to queue...         
Process(14808) is reading...  
Get url_1 from queue.         
Process(9084) is writing...   
Put url_4 to queue...         
Get url_4 from queue.         
Put url_5 to queue...         
Get url_5 from queue.         
Put url_6 to queue...         
Get url_6 from queue.         
Put url_2 to queue...         
Get url_2 from queue.         
Put url_3 to queue...         
Get url_3 from queue.         

代码解读:

Quere有两种方法:

  • get

get 方法用来从队列中读取 并删除一个元素。有两个参数可选,blocked 和 timeout
当取: blocked = True (默认),timeout 正值。等待时间内,没有取到任何元素,会抛出 Queue.Empty 异常。
当取:locked = False 。则当quere为空时立即抛出异常。

  • put

Put 方法用来插入数据到队列中,有两个可选参数,blocked 和 timeout。
当取:blocked = True(默认值),timeout 为正。如果query已满,则会阻塞 timeout 指定的时间,直到该队列有剩余空间。如果超时,抛出 Queue.Full 异常。
当取:blocked = False 。如果 Queue 已满,立刻抛出 Queue.Full 异常。

管道:

Pipe([duplex])

在进程之间创建一条管道,并返回元组(con1,con2),其中con1,con2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道。dumplex:默认管道是双向的,如果将duplex射成False,con1只能用于接收,con2只能用于发送。

看下面例子:

# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe

def proc1(pipe):
    pipe.send('Hello,This is proc1')

def proc2(pipe):
    while True:
        print("proc2 recieve:", pipe.recv())

if __name__ == "__main__":
    (con1,con2) = Pipe()
    p1 = Process(target=proc1, args=(con1,))
    p2 = Process(target=proc2, args=(con2,))
    p1.start()
    p2.start()
    p1.join()
    p2.join(2)   #限制执行时间最多为2秒
    print('\nend all processes.')

结果

$ python index.py
proc2 recieve: Hello,This is proc1

end all processes.

当第二行输出时,因为管道中没有数据传来,Proc2处于阻塞状态,2秒后被强制结束。

以下是单向管道的例子,注意pipe[0],pipe[1]的分配。

# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe

def proc1(pipe):
    pipe.send('Hello,This is proc1')

def proc2(pipe):
    while True:
        print("proc2 recieve:", pipe.recv())

if __name__ == "__main__":
    (con1,con2) = Pipe(duplex=False)
    p1 = Process(target=proc1, args=(con1,)) #con1为发送端
    p2 = Process(target=proc2, args=(con2,)) #con2为接收端
    p1.start()
    p2.start()
    p1.join()
    p2.join(2)  # 限制执行时间最多为2秒
    print('\nend all processes.')

执行结果同上。

3. 总结

在Unix/Linux下,可以使用fork()调用实现多进程。

要实现跨平台的多进程,可以使用multiprocessing模块。

进程间通信是通过Queue、Pipes等实现的。

参考:

1 + 3 =
快来做第一个评论的人吧~