2.9. 并发编程
2.9.1. 启动与停止线程
import time
def countdown(n):
while n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)
from threading import Thread
t = Thread(target=countdown, args=(10,))
t.start()
class CountDown(Thread):
def __init__(self, n: int) -> None:
super().__init__()
self.n = n
def run(self) -> None:
while self.n > 0:
print('T-minus', self.n)
self.n -= 1
time.sleep(5)
c = CountDown(5)
c.start()
2.9.2. 判断线程是否已经启动
import time
from threading import Thread, Event
def countdown(n):
while n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)
from threading import Thread
t = Thread(target=countdown, args=(10,))
t.start()
class CountDown(Thread):
def __init__(self, n: int,event) -> None:
super().__init__()
self.n = n
def run(self) -> None:
print('Countdown is running 1')
event.set()
while self.n > 0:
print('T-minus', self.n)
self.n -= 1
time.sleep(5)
event = Event()
c = CountDown(5,event)
c.start()
event.wait()
print('Countdown is running 2 ')
2.9.3. 线程间通信
import random
import time
def producer(q):
while True:
q.put(random.randint(0, 1000))
time.sleep(0.5)
def consumer(q):
while True:
print(q.get())
time.sleep(0.5)
if __name__ == '__main__':
from queue import Queue
q = Queue()
from threading import Thread
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
2.9.4. 线程访问加锁
from threading import Lock
class ShareCount(object):
def __init__(self,start_cnt) -> None:
self.start_cnt = start_cnt
self.lock = Lock()
def inc(self) -> None:
with self.lock:
self.start_cnt += 1
def dec(self) -> None:
self.lock.acquire()
self.start_cnt -= 1
self.lock.release()
def get(self) -> int:
return self.start_cnt
2.9.5. 防止死锁的加锁机制
可以对资源进行排序,加锁顺序按照顺序来申请。 避免交叉等待。
2.9.6. 保存线程的状态信息
每个 threading.local() 实例为每个线程维护着一个单独的实例字典。 所有普通实例操作比如获取、修改和删除值仅仅操作这个字典。 每个线程使用一个独立的字典就可以保证数据的隔离了。
2.9.7. 创建一个线程池
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_url(url):
u = requests.get(url)
return u.text
pool = ThreadPoolExecutor(10)
# Submit work to the pool
a = pool.submit(fetch_url, 'http://www.python.org')
b = pool.submit(fetch_url, 'http://www.pypy.org')
# Get the results back
x = a.result()
y = b.result()
print(x,y)
2.9.8. 定义一个Actor任务
from queue import Queue
from threading import Event, Thread
class ActorExit(Exception):
pass
class Actor(object):
def __init__(self) -> None:
self.queue = Queue()
self._terminated = Event ()
self.task = None
def close(self):
self.queue.put(ActorExit)
def send(self, msg):
self.queue.put(msg)
def recv(self):
msg = self.queue.get()
if msg is ActorExit:
raise ActorExit()
return msg
def start(self):
# self.event.set()
self.task = Thread(target=self._bootstrap)
self.task.daemon = True
self.task.start()
pass
def _bootstrap(self):
try:
self.run()
except ActorExit:
pass
finally:
self._terminated.set()
def run(self):
raise NotImplementedError
def join(self):
self._terminated.wait()
class PrintActor(Actor):
def run(self):
while True:
msg = self.recv()
print('Got:', msg)
p = PrintActor()
p.start()
p.send('Hello')
p.send('World')
p.close()
p.join()
2.9.9. 实现消息发布订阅模式
from collections import defaultdict
from contextlib import contextmanager
class Exchange(object):
def __init__(self) -> None:
self.subscribers = set()
def attach(self,task):
self.subscribers.add(task)
def detach(self,task):
self.subscribers.remove(task)
def send(self,msg):
for subscriber in self.subscribers:
subscriber.send(msg)
def attach_tasks(self,*tasks):
for task in tasks:
self.attach(task)
def detach_tasks(self,*tasks):
for task in tasks:
self.detach(task)
def subscribe(self,task):
self.attach(task)
@contextmanager
def subscribe_tasks(self,*tasks):
self.attach_tasks(*tasks)
try:
yield
finally:
self.detach_tasks(*tasks)
def clear(self):
self.subscribers.clear()
def get_subs(self):
return self.subscribers
_exchanges = defaultdict(Exchange)
def get_exchange(name):
return _exchanges[name]
class Task:
def send(self, msg):
print(msg)
d = get_exchange('d')
t1 = Task()
t2 = Task()
d.attach(t1)
d.attach(t2)
d.send('hello')
d.send('world')
d.detach(t1)
print(d.get_subs())
d.clear()
print(d.get_subs())
with d.subscribe_tasks(t1,t2):
d.send('hello')
d.send('world')
print(d.get_subs())
2.9.10. 使用生成器代替线程
from collections import deque
def count_down(n):
while n > 0:
print('T-minus', n)
yield
n -= 1
def count_up(n):
x = 0
while x < n:
print('Counting up', x)
yield
x += 1
class TaskScheduler:
def __init__(self) -> None:
self.tasks =deque()
def new_task(self,task):
self.tasks.append(task)
def run(self):
while self.tasks:
task = self.tasks.popleft()
try:
next(task)
self.tasks.append(task)
except StopIteration:
pass
sched = TaskScheduler()
sched.new_task(count_down(10))
sched.new_task(count_down(5))
sched.new_task(count_up(15))
sched.run()
2.9.11. 多个线程队列轮询
一个套接字被传给 select() 或类似的一个轮询数据到达的函数