博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python之Queue
阅读量:4879 次
发布时间:2019-06-11

本文共 4945 字,大约阅读时间需要 16 分钟。

一、多进程的消息队列

  “消息队列”是在消息的传输过程中保存消息的容器

  消息队列最经典的用法就是消费者和生成者之间通过消息管道来传递消息,消费者和生成者是不通的进程。生产者往管道中写消息,消费者从管道中读消息。

  操作系统提供了很多机制来实现进程中的通信,multiprocessing模块就提供了queue和pipe两种方法来实现

  使用multiprocessing里面的queue来实现消息队列,代码如下:

from multiprocessing import Processfrom multiprocessing import Queuedef write(q):    for i in ["a","b","c","d"]:        q.put(i)        print ("put {0} to queue".format(i))def read(q):    while 1:        result = q.get()        print ("get {0} from queue".format(result))def main():    q = Queue()    pw = Process(target=write,args=(q,))    pr = Process(target=read,args=(q,))    pw.start()    pr.start()    pw.join()    pr.terminate()if __name__ == "__main__":    main()

结果:

put a to queue

put b to queue
put c to queue
put d to queue
get a from queue
get b from queue
get c from queue
get d from queue

  通过multiprocessing里面的pipe来实现消息队列。pipe方法返回(conn1,conn2)代表一个管道的两个端。pipe方法有duplex参数,如果duplex参数为true(默认值),那么管道是全双工模式,也就是说conn1和conn2均可收发。duplex为false,conn1只负责接受消息,conn2只负责发送消息。

  send和recv方法分别是发送和接收消息的方法,close方法表示关闭管道。当消息接收结束以后,关闭管道。实例代码如下:

from multiprocessing import Process,Pipeimport timedef proc1(pipe):    for i in xrange(1,10):        pipe.send(i)        print ("send {0} to pipe".format(i))        time.sleep(1)def proc2(pipe):    n = 9    while n>0:        result = pipe.recv()        print ("recv {0} from pipe".format(result))        n -= 1def main():    pipe = Pipe(duplex=False)    print (type(pipe))    p1 = Process(target = proc1,args=(pipe[1],))    p2 = Process(target= proc2,args=(pipe[0],))    p1.start()    p2.start()    p1.join()    p2.join()    pipe[0].close()    pipe[1].close()if __name__ == "__main__":    main()

结果:

<type 'tuple'>

send 1 to pipe
recv 1 from pipe
send 2 to pipe
recv 2 from pipe
send 3 to pipe
recv 3 from pipe
send 4 to piperecv 4 from pipe
send 5 to pipe
recv 5 from pipe
send 6 to pipe
recv 6 from pipe
send 7 to pipe
recv 7 from pipe
send 8 to pipe
recv 8 from pipe
send 9 to pipe
recv 9 from pipe

二、Queue模块

  Python提供了Queue模块来专门实现消息队列。Queue对象实现一个FIFO队列(其他的还有lifo、priority队列)。Queue只有maxsize一个构造参数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:

  Queue.qsize:返回消息队列的当前空间。返回值不一定可靠

  Queue.empty():判断消息队列是否为空,返回True或False。同样不可靠。

  Queue.full():类似上边,判断消息队列是否满

  Queue.put(item,block=True,timeout=None):往消息队列中存放消息。block可以控制是否阻塞,timeout指定阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception

  Queue.put_nowait(item):相当于put(itme,False)

  Queue.get(block = True,timeout = None):获取一个消息,其他同put

  Queue.task_done():接受消息的线程通过调用这个函数来说明消息对应的任务已完成

  Queue.join():实际上意味着等队列为空,再执行别的操作

程序实例如下:

from Queue import Queuefrom threading import Threadimport timeclass Proceduer(Thread):    def __init__(self,queue):        super(Proceduer,self).__init__()        self.queue = queue    def run(self):        try:            for i in xrange(1,10):                print ("put data is : {0} to queue".format(i))                self.queue.put(i)        except Exception as e:            print ("put data error")            raise eclass Consumer_odd(Thread):    def __init__(self,queue):        super(Consumer_odd,self).__init__()        self.queue = queue    def run(self):        try:            while not self.queue.empty:                number = self.queue.get()                if number % 2 != 0:                    print ("get {0} from queue ODD,thread name is :{1}".format(number,self.getName()))                else:                    self.queue.put(number)                time.sleep(1)        except Exception as e :            raise  eclass Consumer_even(Thread):    def __init__(self,queue):        super(Consumer_even,self).__init__()        self.queue = queue    def run(self):        try:            while not self.queue.empty:                number = self.queue.get()                if number % 2 == 0:                    print ("get {0} from queue Even,thread name is :{1}".format(number,self.getName()))                else:                    self.queue.put(number)                time.sleep(1)        except Exception as e :            raise  edef main():    queue = Queue()    p = Proceduer(queue = queue)    p.start()    p.join()    time.sleep(1)    c1 = Consumer_odd(queue=queue)    c2 = Consumer_even(queue=queue)    c1.start()    c2.start()    c1.join()    c2.join()    print ("All threads is terminate!")if __name__ == '__main__':    main()

结果:

put data is : 2 to queue

put data is : 3 to queue
put data is : 4 to queue
put data is : 5 to queue
put data is : 6 to queue
put data is : 7 to queue
put data is : 8 to queue
put data is : 9 to queue
get 1 from queue ODD,thread name is :Thread-2
get 2 from queue Even,thread name is :Thread-3
get 3 from queue ODD,thread name is :Thread-2
 get 4 from queue Even,thread name is :Thread-3
get 5 from queue ODD,thread name is :Thread-2
 get 6 from queue Even,thread name is :Thread-3
get 9 from queue ODD,thread name is :Thread-2
get 8 from queue Even,thread name is :Thread-3
 get 7 from queue ODD,thread name is :Thread-2

转载于:https://www.cnblogs.com/huangdongju/p/8043484.html

你可能感兴趣的文章
MAC sublime text 无法自动补齐标签
查看>>
NgBook留言本开发全过程(1)
查看>>
LeetCode-指针法
查看>>
Python之路,Day12 - 那就做个堡垒机吧
查看>>
linux之shell之if、while、for语句介绍
查看>>
Mysql phpStudy升级Mysql版本,流产了怎么办?
查看>>
SQLServer之数据库行锁
查看>>
OFDM仿真
查看>>
浅谈linux内核中内存分配函数
查看>>
走近SpringBoot
查看>>
python程序之profile分析
查看>>
写在读研初期
查看>>
开环增益对负反馈放大电路的影响
查看>>
MySQL-ERROR 2003
查看>>
SQL Server2012-SSIS的包管理和部署
查看>>
JavaScript内置对象
查看>>
如何把js的循环写成异步的
查看>>
ER图是啥?
查看>>
too many include files depth = 1024错误原因
查看>>
HTTP协议详解(三)
查看>>