Python语言并发之threading模块(2)
小标 2018-11-14 来源 : 阅读 545 评论 0

摘要:本文主要向大家介绍了Python语言并发之threading模块(2),通过具体的内容向大家展示,希望对大家学习Python语言有所帮助。

本文主要向大家介绍了Python语言并发之threading模块(2),通过具体的内容向大家展示,希望对大家学习Python语言有所帮助。

一句话概括本文:
本节继续把Python里threading线程模块剩下的Condition,Semaphore,
Event,Timer和Barrier讲解完毕,文档是枯燥无味的,希望通过简单
有趣的例子,可以帮你快速掌握这几个东东的用法~
啃文档是比较乏味的,先来个小姐姐提提神吧~

 别问高清原图,程序猿自己动手,丰(营)衣(养)足(跟)食(不上),
脚本自取:https://github.com/coder-pig/ReptileSomething

引言:
如果你忘记了threading上一部分内容,可以移步至:
小猪的Python学习之旅 —— 7.Python并发之threading模块(1)
温故知新,官方文档依旧是:
https://docs.python.org/3/library/threading.html

1.条件变量(Condition)
上节学习了Python为我们提供的第一个用于线程同步的东东——互斥锁,
又分Lock(指令锁)与RLock(可重入锁),但是互斥锁只是最简单的同步机制,
Python为我们提供了Condition(条件变量),以便于处理复杂线程同步问题,
比如最经典的生产者与消费者问题。
Condition除了提供与Lock类似的acquire()与release()函数外,还提供了
wait()与notify()函数。用法如下:

1.调用threading.Condition获得一个条件变量对象;
2.线程调用acquire获得Condition对象;
3.进行条件判断,不满足条件调用wait函数,满足条件,进行一些处理改变
条件后,调用notify函数通知处于wait状态的线程,重新进行条件判断。

写个简单的生产者与消费者例子体验下:
import threading
import time

condition = threading.Condition()
products = 0  # 商品数量

# 定义生产者线程类
class Producer(threading.Thread):
    def run(self):
        global products
        while True:
            if condition.acquire():
                if products >= 99:
                    condition.wait()
                else:
                    products += 2
                    print(self.name + "生产了2个产品,当前剩余产品数为:" + str(products))
                    condition.notify()
                condition.release()
                time.sleep(2)

# 定义消费者线程类
class Consumer(threading.Thread):
    def run(self):
        global products
        while True:
            if condition.acquire():
                if products < 3:
                    condition.wait()
                else:
                    products -= 3
                    print(self.name + "消耗了3个产品,当前剩余产品数为:" + str(products))
                    condition.notify()
            condition.release()
            time.sleep(2)

if __name__ == '__main__':
    # 创建五个生产者线程
    for i in range(5):
        p = Producer()
        p.start()
    # 创建两个消费者线程
    for j in range(2):
        c = Consumer()
        c.start()
运行结果:

Condition维护着一个互斥锁对象(默认是RLock),也可以自己实例化一个
在Condition实例化的时候通过构造函数传入,SO,调用的Condition的
acquire与release函数,其实调用就是这个锁对象的acquire与release函数。
下面详解下除了acquire与release函数外Condition提供的相关函数吧:
(注:下述方法只有在acquire之后才能调用,不然会报RuntimeError异常)

wait(timeout=None):释放锁,同时线程被挂起,直到收到通知被唤醒
或超时(如果设置了timeout),当线程被唤醒并重新占有锁时,程序才继续执行;
wait_for(predicate, timeout=None):等待知道条件为True,predicate应该是
一个回调函数,返回布尔值,timeout用于指定超时时间,返回值为回调函数
返回的布尔值,或者超时,返回False(3.2新增);
notify(n=1):默认唤醒一个正在的等待线程,notify并不释放锁!!!
notify_all():唤醒所有等待线程,进入就绪状态,等待获得锁,notify_all 同样不释放锁!!!


2.信号量(Semaphore)
信号量,也是一个很容易懂的东西,举个简单的例子:

假如厕所里有五个蹲坑,有人来开大,就会占用一个坑位,
所剩余的坑位-1,当五个坑都被人占满的时候,新来的人
就只能在外面等,直到有人出来为止。

这里的五个粪坑就是信号量,蹲坑的人就是线程,
初始值为5,来人-1,走人+1;超过初始值,新来的处于堵塞状态;
原理很简单,试试看下源码:

看下init方法

传入参数value,默认值为1,不能传入负数,否则抛ValueError异常;
创建了一个Condition条件变量,传入一个Lock实例;
接着看下acquire函数:


先是判断,如果没有加锁然后设置了超时时间,抛出ValueError;
循环,如果value == 0,没有加锁或在超时时间内,跳出循环;
否则,调用Condition变量wait函数等待通知或超时;
如果value不为0,跳出循环执行else里的代码,信号量-1,rc = ture,
代表可以调用release函数,最后返回rc;

再接着是release函数,更简单

信号量+1,然后调用Condition变量的notify唤醒一个线程~
剩下的enter和exit就不用说了,重写这两个方法就能直接用with关键字了
就是那么简单,把我们蹲坑的那个例子写成代码吧:
import threading
import time
import random

s = threading.Semaphore(5)  # 粪坑

class Human(threading.Thread):
    def run(self):
        s.acquire()  # 占坑
        print("拉屎拉屎 - " + self.name + " - " + str(time.ctime()))
        time.sleep(random.randrange(1, 3))
        print("拉完走人 - " + self.name + " - " + str(time.ctime()))
        s.release()  # 走人

if __name__ == '__main__':
    for i in range(10):
        human = Human()
        human.start()
输出结果:


3.通用的条件变量(Event)
Python提供的用于线程间通信的信号标志,一个线程标识了一个事件,
其他线程处于等待状态,直到事件发生后,所有线程都会被激活。
Event对象属性实现了简单的线程通信机制,提供了设置信号,清楚信号,
等待等用于实现线程间的通信。提供以下四个可供调用的方法:

is_set():判断内部标志是否为真
set():设置信号标志为真
clear():清除Event对象内部的信号标志(设置为false)
wait(timeout=None):使线程一直处于堵塞,知道标识符变为True

感觉有点蒙圈,看一波源码吧~

先是init函数

又是用到Condition条件变量,还有设置了一个_flag = False,这个就是标记吧!
is_set函数比较简单,返回_flag,

然后是set()函数:

加锁,然后设置_flag为true,然后notify_all唤醒所有线程,最后释放锁,
简单,接着clear函数呢?

注释的意思是:重置内部标记为false,随后,调用wait()的线程将被堵塞,
直到调用set()将内部标记再次设置为true。也很简单,最后是wait方法:

判断标志是否为False,False的话进入堵塞状态,(⊙v⊙)嗯
源码就那么简单,感觉看完还是蒙圈不知道怎么用,写个简单的例子?
汽车过红绿灯的例子:
import threading
import time
import random

class CarThread(threading.Thread):
    def __init__(self, event):
        threading.Thread.__init__(self)
        self.threadEvent = event

    def run(self):
        # 休眠模拟汽车先后到达路口时间
        time.sleep(random.randrange(1, 10))
        print("汽车 - " + self.name + " - 到达路口...")
        self.threadEvent.wait()
        print("汽车 - " + self.name + " - 通过路口...")

if __name__ == '__main__':
    light_event = threading.Event()

    # 假设有20台车子
    for i in range(20):
        car = CarThread(event=light_event)
        car.start()

    while threading.active_count() > 1:
        light_event.clear()
        print("红灯等待...")
        time.sleep(3)
        print("绿灯通行...")
        light_event.set()
        time.sleep(2)
输出结果:


4.定时器(Timer)
与Thread类似,只是要等待一段时间后才会开始运行,单位秒,用法也很简单:
import threading
import time

def skill_ready():
    print("!!!!!!大招已经准备好了!!!!!!")

if __name__ == '__main__':
    t = threading.Timer(5, skill_ready)
    t.start()
    while threading.active_count() > 1:
        print("======大招蓄力中======")
        time.sleep(1)
输出结果:


5.栅栏(Barrier)
Barrier直译栅栏,感觉不是很好理解,网上有个形象化的例子,把他比喻
成赛马用的栅栏,然后马(线程)依次来到栅栏前等待(wait),直到所有的马
都停在栅栏面前了,然后所有马开始同时出发(start)。
简单点说就是,多个线程间的相互等待,调用了wait()方法的线程进入堵塞,
直到所有的线程都调用了wait()方法,然后所有线程同时进行就绪状态,
等待调度运行。
构造函数:
Barrier(parties,action=None,timeout=None)

parties:创建一个可容纳parties条线程的栅栏;
action:全部线程被释放时可被其中一条线程调用的可调用对象;
timeout:线程调用wait()方法时没有显式设定timeout,就用的这个作为默认值;

相关函数:

wait(timeout=None):表示线程就位,返回值是一个0到parties-1之间的整数,
每条线程都不一样,这个值可以用作挑选一条线程做些清扫工作,另外如果你在
构造函数里设置了action的话,其中一个线程在释放之前将会调用它。如果调用
出错的话,会让栅栏进入broken状态,超时同样也会进入broken状态,如果栅栏
在处于broke状态的时候调用reset函数,会抛出一个BrokenBarrierError异常。
reset():本方法将栅栏置为初始状态,即empty状态。所有已经在等待的线程
都会接收到BrokenBarrierError异常,注意当有其他处于unknown状态的线程时,
调用此方法将可能获取到额外的访问。因此如果一个栅栏进入了broken状态,
最好是放弃他并新建一个栅栏,而不是调用reset方法。
abort():将栅栏置为broken状态。本方法将使所有正在等待或将要调用
wait()方法的线程收到BrokenBarrierError异常。本方法的使用情景为,比如:
有一条线程需要abort(),又不想给其他线程造成死锁的状态,或许设定
timeout参数要比使用本方法更可靠。
parites:将要使用本 barrier 的线程的数量
n_waiting:正在等待本 barrier 的线程的数量
broken:栅栏是否为broken状态,返回一个布尔值

BrokenBarrierError:RuntimeError的子类,当栅栏被reset()或broken时引发;
(感觉都不知所云,写个简单的例子来熟悉下用法吧~)
例子:公司一起去旅游
import threading
import time
import random

class Staff(threading.Thread):
    def __init__(self, barriers):
        threading.Thread.__init__(self)
        self.barriers = barriers

    def run(self):
        print("员工 【" + self.name + "】" + "出门")
        time.sleep(random.randrange(1, 10))
        print("员工 【" + self.name + "】" + "已签到")
        self.barriers.wait()

def ready():
    print(threading.current_thread().name + ":人齐,出发,出发~~~")

if __name__ == '__main__':
    print("要出去旅游啦,大家快集合~")
    b = threading.Barrier(10, action=ready, timeout=20)
    for i in range(10):
        staff = Staff(b)
        staff.start()
运行结果:

PS:这里可以试下设置超时,还有修改ready方法,故意引起异常,
然后会抛出BrokenBarrierError异常。


6.小结
加了下班,终于把threading模块啃完了,不然爬小姐姐好玩,有成就感,
当然Python线程肯定不止那么简单,后面还有队列这些东西~慢慢来,不急。

本文由职坐标整理并发布,希望对同学们学习Python有所帮助,更多内容请关注职坐标编程语言Python频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 1 不喜欢 | 0
看完这篇文章有何感觉?已经有1人表态,100%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程