关于python3进程池传参队列的问题
文章最后更新时间为:2019年01月20日 22:04:43
最近打算没事就写个python脚本来巩固一下,最后完成一个完整的渗透测试框架。但是我发现python在io和网络方面的速度实在是太慢了,于是就要使用到多进程和多线程,但是关于进程池的传参问题,我遇到了一个问题。
首先需要来看一段简单的多进程代码:
from multiprocessing import Process, Queue
def read(q):
while not q.empty():
value = q.get()
print('Get %s from queue.' % value)
if __name__=='__main__':
q = Queue()
for i in range(4):
q.put(i)
for i in range(4):
p = Process(target=read, args=(q,))
p.start()
p.join()
print("down")
很显然,这里是一个简单的多进程代码,用队列来通信。
结果如下:
但是同样的用线程池试试?
from multiprocessing import Pool, Queue
def read(q):
print("flag")
while not q.empty():
value = q.get()
print('Get %s from queue.' % value)
if __name__=='__main__':
q = Queue()
for i in range(4):
q.put(i)
p = Pool(4)
for i in range(4):
p.apply_async(read, args=(q,))
p.close()
p.join()
print("down")
结果:
什么?子进程怎么没有运行,于是我就测试了一下,发现子进程真的没有运行。
于是我猜测和参数有关,于是我把参数给换成最普通的:
from multiprocessing import Pool, Queue
def read(i):
print("flag")
print(i)
#while not q.empty():
#value = q.get()
#print('Get %s from queue.' % value)
if __name__=='__main__':
q = Queue()
for i in range(4):
q.put(i)
p = Pool(4)
for i in range(4):
p.apply_async(read, args=(i,))
p.close()
p.join()
print("down")
结果:
看来还真的是这样,难道线程池和多进程使用队列的方式不一样吗?
于是google开始,发现stackowerflow有个类似问题。https://stackoverflow.com/questions/9908781/sharing-a-result-queue-among-several-processes。
我尝试不用multiprocessing提供的Queue, 而是使用multiprocessing.Manager来管理队列,于是简单修改一下代码如下:
结果:
from multiprocessing import Pool, Manager
def read(q):
while not q.empty():
value = q.get()
print('Get %s from queue.' % value)
if __name__=='__main__':
m = Manager()
q = m.Queue()
for i in range(4):
q.put(i)
p = Pool(4)
for i in range(4):
p.apply_async(read, args=(q,))
p.close()
p.join()
print("down")
看到结果是正确的。
查阅官方文档显示:
A manager object returned by Manager() controls a server process which holds Python objects and allows > > other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore,
BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
可以看出multiprocess.Manager()可以用来管理临界区。
问题来了第一个例子是否也可以这么用?直接试试便知:
from multiprocessing import Process, Manager
def read(q):
while not q.empty():
value = q.get()
print('Get %s from queue.' % value)
if __name__=='__main__':
q = Manager().Queue()
for i in range(4):
q.put(i)
for i in range(4):
p = Process(target=read, args=(q,))
p.start()
p.join()
print("down")
结果:
可以看出肯定也是正确的。
总结一下就是:
- 进程池中的队列,需要使用multiprocessing.Manager()中的Queue()
- 多进程中的参数也可以用multiprocessing.Manager()来管理
- 最好都使用multiprocessing.Manager()来统一管理