关于python3进程池传参队列的问题

文章最后更新时间为:2019年01月20日 22:04:43

最近打算没事就写个python脚本来巩固一下,最后完成一个完整的渗透测试框架。但是我发现python在io和网络方面的速度实在是太慢了,于是就要使用到多进程和多线程,但是关于进程池的传参问题,我遇到了一个问题。

首先需要来看一段简单的多进程代码:

from multiprocessing import Process, Queue

def read(q):
    while not q.empty():
        value = q.get()
        print('Get %s from queue.' % value)

if __name__=='__main__':
    q = Queue()
    for i in range(4):
        q.put(i)
    for i in range(4):
        p = Process(target=read, args=(q,))
        p.start()
        p.join()
    print("down")

很显然,这里是一个简单的多进程代码,用队列来通信。

结果如下:

但是同样的用线程池试试?

from multiprocessing import Pool, Queue

def read(q):
    print("flag")
    while not q.empty():
        value = q.get()
        print('Get %s from queue.' % value)

if __name__=='__main__':
    q = Queue()
    for i in range(4):
        q.put(i)
    p = Pool(4)
    for i in range(4):
        p.apply_async(read, args=(q,))
    p.close()
    p.join()
    print("down")

结果:

什么?子进程怎么没有运行,于是我就测试了一下,发现子进程真的没有运行。

于是我猜测和参数有关,于是我把参数给换成最普通的:

from multiprocessing import Pool, Queue

def read(i):
    print("flag")
    print(i)
    #while not q.empty():
        #value = q.get()
        #print('Get %s from queue.' % value)

if __name__=='__main__':
    q = Queue()
    for i in range(4):
        q.put(i)
    p = Pool(4)
    for i in range(4):
        p.apply_async(read, args=(i,))
    p.close()
    p.join()
    print("down")

结果:

看来还真的是这样,难道线程池和多进程使用队列的方式不一样吗?

于是google开始,发现stackowerflow有个类似问题。https://stackoverflow.com/questions/9908781/sharing-a-result-queue-among-several-processes

我尝试不用multiprocessing提供的Queue, 而是使用multiprocessing.Manager来管理队列,于是简单修改一下代码如下:
结果:

from multiprocessing import Pool, Manager

def read(q):
    while not q.empty():
        value = q.get()
        print('Get %s from queue.' % value)

if __name__=='__main__':
    m = Manager()
    q = m.Queue()
    for i in range(4):
        q.put(i)
    p = Pool(4)
    for i in range(4):
        p.apply_async(read, args=(q,))
    p.close()
    p.join()
    print("down")


看到结果是正确的。

查阅官方文档显示:

A manager object returned by Manager() controls a server process which holds Python objects and allows > > other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore,
BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.

可以看出multiprocess.Manager()可以用来管理临界区。
问题来了第一个例子是否也可以这么用?直接试试便知:

from multiprocessing import Process, Manager

def read(q):
    while not q.empty():
        value = q.get()
        print('Get %s from queue.' % value)

if __name__=='__main__':
    q = Manager().Queue()
    for i in range(4):
        q.put(i)
    for i in range(4):
        p = Process(target=read, args=(q,))
        p.start()
        p.join()
    print("down")

结果:

可以看出肯定也是正确的。

总结一下就是:

  • 进程池中的队列,需要使用multiprocessing.Manager()中的Queue()
  • 多进程中的参数也可以用multiprocessing.Manager()来管理
  • 最好都使用multiprocessing.Manager()来统一管理
1 + 8 =
快来做第一个评论的人吧~