一、多进程的消息队列
“消息队列”是在消息的传输过程中保存消息的容器
消息队列最经典的用法就是消费者和生成者之间通过消息管道来传递消息,消费者和生成者是不通的进程。生产者往管道中写消息,消费者从管道中读消息。
操作系统提供了很多机制来实现进程中的通信,multiprocessing模块就提供了queue和pipe两种方法来实现
使用multiprocessing里面的queue来实现消息队列,代码如下:
from multiprocessing import Processfrom multiprocessing import Queuedef write(q): for i in ["a","b","c","d"]: q.put(i) print ("put {0} to queue".format(i))def read(q): while 1: result = q.get() print ("get {0} from queue".format(result))def main(): q = Queue() pw = Process(target=write,args=(q,)) pr = Process(target=read,args=(q,)) pw.start() pr.start() pw.join() pr.terminate()if __name__ == "__main__": main()
结果:
put a to queue
put b to queueput c to queueput d to queueget a from queueget b from queueget c from queueget d from queue通过multiprocessing里面的pipe来实现消息队列。pipe方法返回(conn1,conn2)代表一个管道的两个端。pipe方法有duplex参数,如果duplex参数为true(默认值),那么管道是全双工模式,也就是说conn1和conn2均可收发。duplex为false,conn1只负责接受消息,conn2只负责发送消息。
send和recv方法分别是发送和接收消息的方法,close方法表示关闭管道。当消息接收结束以后,关闭管道。实例代码如下:
from multiprocessing import Process,Pipeimport timedef proc1(pipe): for i in xrange(1,10): pipe.send(i) print ("send {0} to pipe".format(i)) time.sleep(1)def proc2(pipe): n = 9 while n>0: result = pipe.recv() print ("recv {0} from pipe".format(result)) n -= 1def main(): pipe = Pipe(duplex=False) print (type(pipe)) p1 = Process(target = proc1,args=(pipe[1],)) p2 = Process(target= proc2,args=(pipe[0],)) p1.start() p2.start() p1.join() p2.join() pipe[0].close() pipe[1].close()if __name__ == "__main__": main()
结果:
<type 'tuple'>
send 1 to piperecv 1 from pipesend 2 to piperecv 2 from pipesend 3 to piperecv 3 from pipesend 4 to piperecv 4 from pipesend 5 to piperecv 5 from pipesend 6 to piperecv 6 from pipesend 7 to piperecv 7 from pipesend 8 to piperecv 8 from pipesend 9 to piperecv 9 from pipe二、Queue模块
Python提供了Queue模块来专门实现消息队列。Queue对象实现一个FIFO队列(其他的还有lifo、priority队列)。Queue只有maxsize一个构造参数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:
Queue.qsize:返回消息队列的当前空间。返回值不一定可靠
Queue.empty():判断消息队列是否为空,返回True或False。同样不可靠。
Queue.full():类似上边,判断消息队列是否满
Queue.put(item,block=True,timeout=None):往消息队列中存放消息。block可以控制是否阻塞,timeout指定阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception
Queue.put_nowait(item):相当于put(itme,False)
Queue.get(block = True,timeout = None):获取一个消息,其他同put
Queue.task_done():接受消息的线程通过调用这个函数来说明消息对应的任务已完成
Queue.join():实际上意味着等队列为空,再执行别的操作
程序实例如下:
from Queue import Queuefrom threading import Threadimport timeclass Proceduer(Thread): def __init__(self,queue): super(Proceduer,self).__init__() self.queue = queue def run(self): try: for i in xrange(1,10): print ("put data is : {0} to queue".format(i)) self.queue.put(i) except Exception as e: print ("put data error") raise eclass Consumer_odd(Thread): def __init__(self,queue): super(Consumer_odd,self).__init__() self.queue = queue def run(self): try: while not self.queue.empty: number = self.queue.get() if number % 2 != 0: print ("get {0} from queue ODD,thread name is :{1}".format(number,self.getName())) else: self.queue.put(number) time.sleep(1) except Exception as e : raise eclass Consumer_even(Thread): def __init__(self,queue): super(Consumer_even,self).__init__() self.queue = queue def run(self): try: while not self.queue.empty: number = self.queue.get() if number % 2 == 0: print ("get {0} from queue Even,thread name is :{1}".format(number,self.getName())) else: self.queue.put(number) time.sleep(1) except Exception as e : raise edef main(): queue = Queue() p = Proceduer(queue = queue) p.start() p.join() time.sleep(1) c1 = Consumer_odd(queue=queue) c2 = Consumer_even(queue=queue) c1.start() c2.start() c1.join() c2.join() print ("All threads is terminate!")if __name__ == '__main__': main()
结果:
put data is : 2 to queue
put data is : 3 to queueput data is : 4 to queueput data is : 5 to queueput data is : 6 to queueput data is : 7 to queueput data is : 8 to queueput data is : 9 to queueget 1 from queue ODD,thread name is :Thread-2get 2 from queue Even,thread name is :Thread-3get 3 from queue ODD,thread name is :Thread-2 get 4 from queue Even,thread name is :Thread-3get 5 from queue ODD,thread name is :Thread-2 get 6 from queue Even,thread name is :Thread-3get 9 from queue ODD,thread name is :Thread-2get 8 from queue Even,thread name is :Thread-3 get 7 from queue ODD,thread name is :Thread-2