并发编程

加入链接循环的套接字服务端

1 操作系统发展史

手工操作-穿孔卡片 >>> 批处理 >>> 多道程序系统 >>> 分时系统 >>> 通用操作系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1 手工操作——穿孔卡片
-用户独占全机
-CPU的利用不充分
2 批处理
把一个操作整个写到磁带中,以后要进行这个操作,直接拿着磁带,读入即可
-脱机批处理
-联机批处理

3 多道程序系统
-当一道程序因I/O请求而暂停运行时,CPU便立即转去运行另一道程序
-各道程序轮流地用CPU,并交替运行
4 分时系统
-多个程序在运行,时间片的概念,cpu执行完固定的时间,就会转去另一个程序
5 通用操作系统
-多道批处理系统,分时

io操作:(统统不占用cpu)

2 进程基础

2.1 定义

1
2
3
4
5
6
7
8
9
10
11
12
13
狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。

广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元


# 进程是资源分配的最小单位,线程是cpu执行的最小单位
#一个程序运行,最少有一个进程
#一个进程里最少有一条线程


# 进程和程序的区分
程序可以作为一种软件资料长期存在,而进程是有一定生命期的。
程序是永久的,进程是暂时的。

2.2 进程状态

167-同步异步阻塞非阻塞-01.png

在了解其他概念之前,我们首先要了解进程的几个状态。在程序运行的过程中,由于被操作系统的调度算法控制,程序会进入几个状态:就绪,运行和阻塞。

  1. 就绪(Ready)状态:当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可立即执行,这时的进程状态称为就绪状态。
  2. 执行/运行(Running)状态当进程已获得处理机会,其程序正在处理机上执行,此时的进程状态称为执行状态。
  3. 阻塞(Blocked)状态正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)等。

167-同步异步阻塞非阻塞-02.png

2.3 并发和并行

1
2
3
4
并发:你在跑步,鞋带开了,停下跑步,系鞋带,系完以后,继续跑步,在一个时间段内来看,你干了多个事
-单核下的并发
并行:你在跑步,你用随身听在听着,同一时刻,在干多个事
-只有多核才涉及到并行

两者的区别
并行是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理器。
并发是从宏观上,在一个时间段上可以看出是同时执行的,比如一个服务器同时处理多个session。

3 进程高级

3.1 如何开启多进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from multiprocessing import Process


#如果在win下开多进程,必须写main,否则报错
import time
def wirte_file(s):

time.sleep(5)
with open('a.txt','a') as f:
f.write(s)
f.write('\n')

if __name__ == '__main__':
time.sleep(5)
# wirte_file()
# 开启多进程的第一个方式
p=Process(target=wirte_file,args=['lqz is nb'])
# 执行该进程
p.start()


# 又开了一个进程
p1 = Process(target=wirte_file,args=['egon is dsb'])
# 执行该进程
p1.start()

3.2作业

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
1 写一个支持并发的TCP套接字服务端(可以有多个客户端同时跟服务端交互)

# server端
# -*- coding: utf-8 -*-
import socket
from socket import SOL_SOCKET, SO_REUSEADDR
from multiprocessing import Process
def qq(conn,addr):
print(conn,addr)
while True:
try:
data = conn.recv(1024)
if len(data) == 0:break
print(data.decode())
conn.send(data.upper())
except Exception as e:
print(e)
break
conn.close()

if __name__ == '__main__':
'''
在python3.8后,下面四行代码必须放在__main__下,否则会报错: OSError: [Errno 48] Address already in use

server = socket.socket()
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8081))
server.listen(5)
'''
server = socket.socket()
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8081))
server.listen(5)
while True:
conn,addr = server.accept()
p = Process(target=qq,args=(conn,addr))
p.start()


# client端
# -*- coding: utf-8 -*-
import socket

client = socket.socket()
client.connect(('127.0.0.1', 8081))
while True:
input_data = input('请输入:').strip()
if input_data == 'q':
break
client.send(input_data.encode('utf-8'))
data = client.recv(1024)
print(data.decode())

client.close()

3.3 进程调度算法(了解)

1
2
3
4
-先来先服务调度算法
-短作业优先调度算法
-时间片轮转法
-多级反馈队列

3.4 同步异步,阻塞非阻塞(了解)

  • 同步和异步:关注的是消息通信机制
    • 同步:在发起一个调用时,一直在等待结果返回
    • 异步:在调用发起后,不会立刻得到结果,被调用者通过状态、通知来通知调用者,或者通过回调函数处理这个调用
  • 阻塞和非阻塞:关注的是等待调用结果时的状态
    • 阻塞:阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到调用结果后才会返回
    • 非阻塞:非阻塞调用是指在不能立刻得到结果之前,该调用不会阻塞当前线程
  • 例子
    • 同步阻塞:打电话要买书,电话没挂,我也一直在等待,
    • 同步非阻塞:打电话买书,电话没挂,我一边干别的事,一边听一下电话
    • 异步阻塞:打电话买书,电话先挂掉,过一会老板会回回来(回调),老板给回电话的过程一直在等待
    • 异步非阻塞:打电话买书,电话先挂掉,过一会老板会回回来(回调),老板给回电话的过程中,在干别的事

      3.5 Process类

      3.5.1 Process类的参数(重点)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      from multiprocessing import Process


      def task(name,age):
      print(name)
      print(age)


      if __name__ == '__main__':
      # p=Process(target=task,args=['lqz',18])
      # p=Process(target=task,kwargs={'age':19,'name':'lqz'},name='process01')
      p=Process(target=task,kwargs={'age':19,'name':'lqz'})
      p2=Process(target=task,kwargs={'age':19,'name':'lqz'})
      p.start()
      p2.start()

      print(p.name)
      print(p2.name)
      # target=None, 你要执行的任务,函数
      # name=None, 进程名
      # args=(), 以位置参数给任务(函数)传递参数
      # kwargs={} 以关键字的形式给任务(函数)传递参数

      3.5.2 Process类的方法,属性(重点)

  • 1.方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41

    from multiprocessing import Process
    import time

    def task(name,age):
    time.sleep(10)
    print(name)
    print(age)


    if __name__ == '__main__':
    p=Process(target=task,kwargs={'age':19,'name':'lqz'})


    p.start() #启动进程,必须调用start
    # p.run() # 实际进程在执行的时候,执行的是run方法,但是调用run不会开进程,后面我们另一种开启进程的方案使用到它

    # p.join() # 等待子进程执行完成


    print(p.is_alive()) #True

    p.terminate() # 杀死p这个进程,通知操作系统去杀死该进程
    time.sleep(0.1)

    print(p.is_alive()) #可能还是True
    print('ssss')

    print(p.is_alive()) #就是False

    print(p)



    '''
    p.start() #启动进程,必须调用start
    p.run() # 实际进程在执行的时候,执行的是run方法,但是调用run不会开进程,后面我们另一种开启进程的方案使用到它
    p.join() # 等待子进程执行完成
    p.terminate() # 杀死p这个进程,通知操作系统去杀死该进程,并不是立即结束
    p.is_alive() #进程是否还存活
    '''
  • 2.属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from multiprocessing import Process
import time

def task(name,age):
time.sleep(10)
print(name)
print(age)


if __name__ == '__main__':
p=Process(target=task,kwargs={'age':19,'name':'lqz'})
# p.start()
# print(p.name) # 进程名字
# p.daemon=True #主进程结束,子进程也结束,必须在start之前调用
p.start()
print(p.pid) # 进程id号
time.sleep(10)

'''
print(p.name) # 进程名字
print(p.pid) # 进程id号
p.daemon=True #主进程结束,子进程也结束,必须在start之前调用
'''

3.6 主进程和子进程的进程号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from multiprocessing import Process
import time
import os
def task(name,age):
# 如果在任务中取出进程id号,需要使用os模块
print('当前进程(子进程)id号是:',os.getpid()) #当前进程id号
print('当前进程父进程的id号是:',os.getppid()) # 当前进程父进程的id号
time.sleep(10)

print(name)
print(age)


if __name__ == '__main__':
p=Process(target=task,kwargs={'age':19,'name':'lqz'})
p.start()
print('p这个进程的id号是:',p.pid) # 进程id号
print('当前进程id(主进程)号是:', os.getpid()) # 当前进程id号
print('当前进程父进程(pycharm)的id号是:', os.getppid()) # 当前进程父进程的id号
time.sleep(10)

'''

如果有p对象,就是用p.pid获取进程id号
如果没有p对象,就是用os模块的
os.getpid() #当前进程id号
os.getppid() #父进程id号
'''

3.7 同时开启多个进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

from multiprocessing import Process
import time

def task1(i):
time.sleep(2)
print("我是task1",i)


if __name__ == '__main__':
ctime=time.time()
ll=[]
for i in range(5):
p1=Process(target=task1,args=[i,])
p1.start()
#p1.join() #等待子进程执行完成
ll.append(p1)


[p.join()for p in ll] # 相当于下面两行代码
# for p in ll:
# p.join()
print('主进程')
print(time.time()-ctime)


'''
开启多个进程
如果想等待多个进程同时执行完,先把进程开启完成,再统一join
'''

3.8 开启进程的另一种方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
### 通过继承Process类的方式来实现,重写run方法,run方法就是你要执行的任务,实例化得到对象,调用start方法开启进程

class MyProcess(Process):
def __init__(self,name1,age):
self.name1=name1
self.age=age
# 这个必须写
super().__init__()

def run(self) :
time.sleep(2)
print(self.name)
print(self.name1)
print(self.age)

if __name__ == '__main__':
p=MyProcess('lqz',19)
p.start() #调用p.start(),不要调用run
print('主进程')

3.9 进程之间数据隔离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

from multiprocessing import Process
import time
import os


def task():
print('我是task')
global n
n=100
print('子进程的:',n)



if __name__ == '__main__':
# 在主进程中定义了一个n=10
n=10

###如果这样写,n会被改掉
# task()
# print(n)

##如果这样写,n不会被改掉
p=Process(target=task)
p.start()
print('主进程的:',n)

3.10 高并发的tcp服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import socket
from multiprocessing import Process


def talk(sock,addr):
print('客户端连接成功',addr)
while True:
try:
data = sock.recv(1024)
if len(data) == 0:
break
print('从客户端收到了:', data)
sock.send(data.upper())
except Exception as e:
print(e)
break
sock.close()
if __name__ == '__main__':
server = socket.socket()
server.bind(('127.0.0.1', 81))
server.listen(5)
while True:
print('等待客户端连接')
sock, addr = server.accept()
p=Process(target=talk,args=[sock,addr])
p.start()
server.close()

3.11 进程同步(进程锁)(次重点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import time
## 不加锁

import json
from multiprocessing import Process,Lock


## 查询票余额
def check(i):
with open('ticket', 'rt', encoding='utf-8') as f:
res = json.load(f)
print('%s:在查询票,票还剩%s张' % (i, res['count']))
if res['count'] > 0:
return True


## 抢票,票的余额减一

def buy(i):
with open('ticket', 'rt', encoding='utf-8') as f:
res = json.load(f)
time.sleep(1) # 模拟查票延迟
if res['count'] > 0:
print('%s现在要买了,票数还是大于0'%i)
res['count'] -= 1
time.sleep(2) # 模拟买票延迟
with open('ticket', 'wt', encoding='utf-8') as f1:
json.dump(res, f1)
print('%s这个人购票成功' % i)
else:
print('%s这个人购票失败,票不够了,买不了了' % i)


def task(i,lock):
res = check(i)
if res:
lock.acquire() # 加锁是为了保证数据的
buy(i)
lock.release()


##模拟10个人买票

if __name__ == '__main__':
lock=Lock()
for i in range(10):
p = Process(target=task, args=[i,lock ])
p.start()

3.12 作业

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
作业:
1 使用两种方式开启进程
2 开启5个进程,同时计算1加到100,主进程等待5个进程执行完成(join)
3 打印出上面5个进程的进程id和父进程id
4 给上面5个进程分别命名为 process-001 process-002 process-003 ...
5 写一个主进程,开一个子进程(守护进程),在资源管理器里停止主进程,查看子进程是否结束(使用daemon=True
6 写一个并发抢票程序,保证不会出现数据错乱
# 1
### 通过继承Process类的方式来实现,重写run方法,run方法就是你要执行的任务,实例化得到对象,调用start方法开启进程

class MyProcess(Process):
def __init__(self,name1,age):
self.name1=name1
self.age=age
# 这个必须写
super().__init__()

def run(self) :
time.sleep(2)
print(self.name)
print(self.name1)
print(self.age)

if __name__ == '__main__':
p=MyProcess('lqz',19)
p.start() #调用p.start(),不要调用run
print('主进程')
# 2 3 4
import os
from multiprocessing import Process
import time
import os

def cal_num(i):
print('开始第%s次计算,进程id:%s' % (i,os.getpid()))
time.sleep(1)
sum = 0
for num in range(101):
sum += num
print('结果是%s' % sum)


if __name__ == '__main__':
p_list = []
for i in range(1,6):
p = Process(target=cal_num,args=(i,))
p.name = 'process-00' + str(i)
p.start()
print(p.name)
p_list.append(p)

[p.join() for p in p_list]
print('这是父进程,父进程id为%s' % os.getppid())

#5
from multiprocessing import Process
import time

def task(name,age):
time.sleep(1)
print(name)
print(age)

if __name__ == '__main__':
p = Process(target=task,kwargs={'age':19,'name':'lqz'})
p.daemon = True
p.start()
sum = 0
for i in range(100000000):
sum += i
print(sum)
time.sleep(20)


# import time
# ## 不加锁

#6
import json
from multiprocessing import Process,Lock


## 查询票余额
def check(i):
with open('ticket', 'rt', encoding='utf-8') as f:
res = json.load(f)
print('%s:在查询票,票还剩%s张' % (i, res['count']))
if res['count'] > 0:
return True


## 抢票,票的余额减一

def buy(i):
with open('ticket', 'rt', encoding='utf-8') as f:
res = json.load(f)
time.sleep(1) # 模拟查票延迟
if res['count'] > 0:
print('%s现在要买了,票数还是大于0'%i)
res['count'] -= 1
time.sleep(2) # 模拟买票延迟
with open('ticket', 'wt', encoding='utf-8') as f1:
json.dump(res, f1)
print('%s这个人购票成功' % i)
else:
print('%s这个人购票失败,票不够了,买不了了' % i)


def task(i,lock):
res = check(i)
if res:
lock.acquire() # 加锁是为了保证数据的
buy(i)
lock.release()


#模拟10个人买票

if __name__ == '__main__':
lock=Lock()
for i in range(10):
p = Process(target=task, args=[i,lock ])
p.start()

进程Queue、进程间通信、生产者消费者模型、线程概念

3.13 进程Queue介绍

1
2
3
4
5
6
7
1 进程间数据隔离,两个进程进行通信,借助于Queue

2 进程间通信:IPC
-借助于Queue实现进程间通信
-借助于文件
-借助于数据库
-借助于消息队列:rabbitmq,kafka....

3.13.1 基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from multiprocessing import Process,Queue


if __name__ == '__main__':
# maxsize表示Queue的大小是多少,能放多少东西
queue=Queue(3)
'''
## 实例化得到一个对象,数字表示queue的长度
queue=Queue(3)

## 放值
queue.put(item, block=True, timeout=None)
* item:放入队列中的数据元素
* block:是否阻塞. 默认值为True。
当队列中元素个数达到上限继续往里放数据时:
如果 block=False,直接引发 queue.Full 异常;
如果 block=True,且timeout=None,则一直等待直到有数据出队列后可以放入数据;
如果 block=True,且timeout=N,N 为某一正整数时,则等待 N 秒,如果队列中还没有位置放入数据就引发 queue.Full 异常。
* timeout,设置超时时间

queue.get(block=True, timeout=None)
* block:当队列中没有数据元素继续取数据时:
如果 block=False,直接引发 queue.Empty 异常;
如果 block=True,且 timeout=None,则一直等待直到有数据入队列后可以取出数据
如果 block=True,且 timeout=N,N 为某一正整数时,则等待 N 秒,如果队列中还没有数据放入的话就引发 queue.Empty 异常。
timeout,设置超时时间

## 不等待,如果满了,就报错
queue.put_nowait()

## 去取值,如果没有值,直接报错
res=queue.get_nowait()

##查看这个queue是否满
queue.full()

##查看queue是否是空的
queue.empty()

## 查看queue中有几个值
queue.qsize()
'''

3.13.2 通过Queue实现进程间通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Process,Queue

import os
import time

def task(queue):
print('我这个进程%s开始放数据了'%os.getpid())
time.sleep(10)
queue.put('lqz is handsome')
print('%s我放完了' % os.getpid())


if __name__ == '__main__':
#不写数字,表示可以任意长度
queue=Queue()
p=Process(target=task,args=[queue,])
p.start()

res=queue.get() #会卡在这
print(res)

3.13.3 批量生产数据放入Queue再批量取出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from multiprocessing import Process,Queue
import os

def get_task(queue):
res=queue.get()
print('%s这个进程取了数据:%s'%(os.getpid(),res))


def put_task(queue):
queue.put('%s:放了数据'%os.getpid())

if __name__ == '__main__':
queue=Queue(1) # 这里队列数目为1,为什么能放进两个数据?答:因为第一个进程放完第一个数据时,第二个进程挂在那里,一直在等待,等到第一个数据取走的时候,第二个进程就可以把数据放进去了
p1=Process(target=put_task,args=[queue])
p2=Process(target=put_task,args=[queue])
p1.start()
p2.start()


p3=Process(target=get_task,args=[queue])
p4=Process(target=get_task,args=[queue])
p3.start()
p4.start()




3.13.4 生产者消费者模型(重点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from multiprocessing import Process, Queue
import os
import time
import random
def producer(queue):
# 生产的东西,放到Queue中
for i in range(10):
data = '%s这个厨师,整了第%s个包子' % (os.getpid(), i)
print(data)
# 模拟一下延迟
time.sleep(random.randint(1,3))
queue.put('第%s个包子'%i)

def consumer(queue):
# 消费者从queue中取数据,消费(吃包子)
while True:
res=queue.get()
# 模拟一下延迟
time.sleep(random.randint(1, 3))
print('%s这个消费者,吃了%s'%(os.getpid(),res))
if __name__ == '__main__':
queue=Queue(3)
p=Process(target=producer,args=[queue,])
p.start()

p1=Process(target=consumer,args=[queue,])
p1.start()


###### 改良(生产者以及不生产东西了,但是消费者还在等着拿)
import os
import time
import random
def producer(queue):
# 生产的东西,放到Queue中
for i in range(10):
data = '%s这个厨师,整了第%s个包子' % (os.getpid(), i)
print(data)
# 模拟一下延迟
time.sleep(random.randint(1,3))
queue.put('第%s个包子'%i)
# 生产完了,在queue中放一个None
queue.put(None)
def consumer(queue):
# 消费者从queue中取数据,消费(吃包子)
while True:

res=queue.get()
if not res:break # 如果去到空,说明打烊了(生产者不生产了),退出
# 模拟一下延迟
time.sleep(random.randint(1, 3))
print('%s这个消费者,吃了%s'%(os.getpid(),res))



if __name__ == '__main__':
queue=Queue(3)
p=Process(target=producer,args=[queue,])
p.start()
p1=Process(target=consumer,args=[queue,])
p1.start()

#### 把put none 放在主进程中执行
import os

import time
import random
def producer(queue):
# 生产的东西,放到Queue中
for i in range(10):
data = '%s这个厨师,整了第%s个包子' % (os.getpid(), i)
print(data)
# 模拟一下延迟
time.sleep(random.randint(1,3))
queue.put('第%s个包子'%i)
def consumer(queue):
# 消费者从queue中取数据,消费(吃包子)
while True:

res=queue.get()
if not res:break # 如果去到空,说明打烊了(生产者不生产了),退出
# 模拟一下延迟
time.sleep(random.randint(1, 3))
print('%s这个消费者,吃了%s'%(os.getpid(),res))



if __name__ == '__main__':
queue=Queue(3)
p=Process(target=producer,args=[queue,])
p.start()

p1=Process(target=consumer,args=[queue,])
p1.start()

# 如果把put None放在这,会有问题
# 主进程会先执行这句话,消费进程读到None,直接结束,生产者进程没有结束,于是生产一直在生产,消费已经不消费了
# 直到Queue满了,就一直卡在这了
# queue.put(None)

### 现在就要放在这,你把问题解决
p.join()
queue.put(None)



3.13.5 多个生产者多个消费者的生产者消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# 多个生产者在生产,多个消费者在消费
import time
import random
def producer(queue,food):
# 生产的东西,放到Queue中
for i in range(10):
data = '%s这个厨师,做了第%s个%s' % (os.getpid(), i,food)
print(data)
# 模拟一下延迟
time.sleep(random.randint(1,3))
queue.put('第%s个%s'%(i,food))


def consumer(queue):
# 消费者从queue中取数据,消费(吃包子)
while True:
res=queue.get()
if not res:break # 如果去到空,说明打烊了(生产者不生产了),退出
# 模拟一下延迟
time.sleep(random.randint(1, 3))
print('%s这个消费者,吃了%s'%(os.getpid(),res))



if __name__ == '__main__':
queue=Queue(3)
##起了三个生产者
p1=Process(target=producer,args=[queue,'包子'])
p2=Process(target=producer,args=[queue,'骨头'])
p3=Process(target=producer,args=[queue,'泔水'])
p1.start()
p2.start()
p3.start()



# 起了两个消费者
c1=Process(target=consumer,args=[queue,])
c2=Process(target=consumer,args=[queue,])
c1.start()
c2.start()

##等三个生产者都生产完,放三个None
p1.join()
p2.join()
p3.join()
queue.put(None)
queue.put(None)
queue.put(None)

##如果消费者多,比生产者多出来的消费者不会停

import time
import random


def producer(queue, food,name):
# 生产的东西,放到Queue中
for i in range(10):
data = '%s:这个厨师,做了第%s个%s' % (name, i, food)
print(data)
# 模拟一下延迟
time.sleep(random.randint(1, 3))
queue.put('第%s个%s' % (i, food))


def consumer(queue,name):
# 消费者从queue中取数据,消费(吃包子)
while True:
try:
res = queue.get(timeout=20)
# 模拟一下延迟
time.sleep(random.randint(1, 3))
print('%s这个消费者,吃了%s' % (name, res))
except Exception as e:
print(e)
break


if __name__ == '__main__':
queue = Queue(3)
##起了三个生产者
p1 = Process(target=producer, args=[queue, '包子','egon'])
p2 = Process(target=producer, args=[queue, '骨头','lqz'])
p3 = Process(target=producer, args=[queue, '泔水','jsason'])
p1.start()
p2.start()
p3.start()

# 起了两个消费者
c1 = Process(target=consumer, args=[queue, '孟良'])
c2 = Process(target=consumer, args=[queue,'池劲涛' ])
c3 = Process(target=consumer, args=[queue,'池劲涛' ])
c4 = Process(target=consumer, args=[queue,'池劲涛' ])
c1.start()
c2.start()
c3.start()
c4.start()

3.14 进程间数据共享(了解)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from multiprocessing import Process,Manager,Lock

# 魔法方法:类内以__开头__结尾的方法,都叫魔法方法,某种情况下会触发它的执行
'''
__init__ :类()触发
__new__:
__getattr__
__setattr__
__getitem__
__setitem__

'''

def task(dic,lock):
# lock.acquire()
# dic['count']-=1
# lock.release()
with lock:
dic['count'] -= 1
if __name__ == '__main__':
lock = Lock()
with Manager() as m:
# 如果直接定义dict,这个dict在多个进程中其实是多份,进程如果改,只改了自己的
#如果定义的是m.dict({'count': 100}),多个进程之间就可以共享这个数据
dic = m.dict({'count': 100})
p_l = []
for i in range(100):
p = Process(target=task, args=(dic, lock))
p_l.append(p)
p.start()
for p in p_l:
p.join()

4 线程

4.1 概念

1
2
3
如果把我们上课的过程看成一个进程的话,那么我们要做的是耳朵听老师讲课,手上还要记笔记,脑子还要思考问题,这样才能高效的完成听课的任务。而如果只提供进程这个机制的话,上面这三件事将不能同时执行,同一时间只能做一件事,听的时候就不能记笔记,也不能用脑子思考,这是其一;如果老师在黑板上写演算过程,我们开始记笔记,而老师突然有一步推不下去了,阻塞住了,他在那边思考着,而我们呢,也不能干其他事,即使你想趁此时思考一下刚才没听懂的一个问题都不行,这是其二

#进程是资源分配的最小单位,线程是CPU调度的最小单位。每一个进程中至少有一个线程。

4.2 如何开启线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from threading import Thread
from queue import Queue
import os
import time


def task():
time.sleep(3)
print('我是子线程执行的')
print(os.getpid())


if __name__ == '__main__':
# 启动线程

ctime = time.time()
t = Thread(target=task)
t.start()
# task()
time.sleep(3)
print(os.getpid())
print(time.time() - ctime)

4.3 作业

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
1 基于进程写生产者消费者模型
2 开启5个线程,执行1+2+..1000000
3 开启5个进程,执行1+2+..1000000
4 打印出这俩的执行时间,比较进程快还是线程快
## 1
from multiprocessing import Process
import time
import random


def producer(queue, food, name):
for i in range(10):
data = '%s这个厨师做了第%s个%s' % (name, i, food)
print(data)
time.sleep(random.randint(1, 3))
queue.put('第%s个%s' % (i, food))


def consumer(queue,name):
while True:
try:
res = queue.get(timeout=5)
time.sleep(random.randint(1,3))
print('%s 这个消费者,吃了%s' % (name,res))
except Exception as e:
print(e)
break


if __name__ == '__main__':
queue = Queue(3)
p1 = Process(target=producer, args=(queue, '包子', 'egon'))
p2 = Process(target=producer, args=(queue, '榴莲', 'lqz'))
p1.start()
p2.start()
c1 = Process(target=consumer,args=(queue,'dgt'))
c2 = Process(target=consumer,args=(queue,'xy'))
c1.start()
c2.start()


## 2
# 开启5个进程执行1+2+3+......10000000
from multiprocessing import Process
import time

def task(num,i):
print('%s正在计算------' % i)
sum =0
for n in range(num+1):
sum += n
print('结果是%s' % sum)
if __name__ == '__main__':
s_time = time.time()
p_list = []
for i in range(5):
p = Process(target=task,args=(100000000,i))
p.start()
p_list.append(p)
[p.join() for p in p_list]
cost_time = time.time() - s_time
print('总共话费了%s秒' % cost_time)

## 3
#开启5个线程执行1+2+3+......10000000
from threading import Thread
import time


def task(num, i):
print('%s正在计算------' % i)
sum = 0
for n in range(num + 1):
sum += n
print('结果是%s' % sum)


if __name__ == '__main__':
s_time = time.time()
t_list = []
for i in range(5):
t = Thread(target=task, args=(10000000, i))
t.start()
t_list.append(t)
[t.join() for t in t_list]
cost_time = time.time() - s_time
print('总共花费了%s秒' % cost_time)

## 4 进程比线程块

4.4 进程和线程的区别

    1. 地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
    1. 通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
    1. 调度和切换:线程上下文切换比进程上下文切换要快得多。
    1. 在多线程操作系统中,进程不是一个可执行的实体。

      4.5 线程的特点

在多线程的操作系统中,通常是在一个进程中包括多个线程,每个线程都是作为利用CPU的基本单位,是花费最小开销的实体。线程具有以下属性。

  • 1.轻型实体
  • 2.独立调度和分派的基本单位

4.6 使用线程的实际场景

176-使用线程的场景-01.png

开启一个字处理软件进程,该进程肯定需要办不止一件事情,比如监听键盘输入,处理文字,定时自动将文字保存到硬盘,这三个任务操作的都是同一块数据,因而不能用多进程。只能在一个进程里并发地开启三个线程,如果是单线程,那就只能是,键盘输入时,不能处理文字和自动保存,自动保存时又不能输入和处理文字。

4.7 内存中的线程

见刘清政老师的博客

4.8 GIL全局解释器锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#定义:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
在CPython中,全局解释器锁(GIL)是一个互斥锁,用于防止多个本机线程同时执行Python字节码。这把锁主要是必要的
因为CPython的内存管理不是线程安全的(然而,由于GIL的存在,其他特性已经发展到依赖于它执行的保证。)

#结论:在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势

#进程和线程速度对比
单核情况下:四个任务
多核情况下:四个任务
计算密集型:一个任务算十秒,四个进程和四个线程,肯定是进程快
IO密集型:任务都是纯io情况下,线程开销比进程小,肯定是线程好

# 总结
* GIL其实就是一把互斥锁(牺牲了效率但是保证了数据的安全)。
* 线程是执行单位,但是不能直接运行,需要先拿到python解释器解释之后才能被cpu执行
* 同一时刻同一个进程内多个线程无法实现并行,但是可以实现并发
* 为什么要有GIL? 是因为它内部的垃圾回收机制不是线程安全的
* 垃圾回收机制也是一个任务,跟你的代码不是串行运行,如果是串行会明显有卡顿
* 这个垃圾回收到底是开进程还是开线程?肯定是线程,线程肯定也是一段代码,所以想运行也必须要拿到python解释器
假设能够并行,会出现什么情况?一个线程刚好要造一个a=1的绑定关系之前,这个垃圾线程来扫描,矛盾点就来了,谁成功都不对!
* 也就意味着在Cpython解释器上有一把GIL全局解释器锁
* 同一个进程下的多个线程不能实现并行但是能够实现并发,多个进程下的线程能够实现并行

Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行。虽然 Python 解释器中可以“运行”多个线程,但在任意时刻只有一个线程在解释器中运行。对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。

在多线程环境中,Python 虚拟机按以下方式执行:

  • 1.设置 GIL;

  • 2.切换到一个线程去运行;

  • 3.运行指定数量的字节码指令或者线程主动让出控制(可以调用 time.sleep(0));

  • 4.把线程设置为睡眠状态;

  • 5.解锁 GIL;

  • 6.再次重复以上所有步骤。

    在调用外部代码(如 C/C++扩展函数)的时候,GIL将会被锁定,直到这个函数结束为止(由于在这期间没有Python的字节码被运行,所以不会做线程切换)编写扩展的程序员可以主动解锁GIL。

4.9 通过threading.Thread类创建线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 创建线程的方式一
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' %name)

if __name__ == '__main__':
t=Thread(target=sayhi,args=('nick',))
t.start()
print('主线程')

# 创建线程的方式二
from threading import Thread
import time
class Sayhi(Thread):
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
time.sleep(2)
print('%s say hello' % self.name)


if __name__ == '__main__':
t = Sayhi('nick')
t.start()
print('主线程')

4.10 多线程和多进程的比较

4.10.1 pid的比较

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from threading import Thread
from multiprocessing import Process
import os

def work():
print('hello',os.getpid())

if __name__ == '__main__':
# part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
t1=Thread(target=work)
t2=Thread(target=work)
t1.start()
t2.start()
print('主线程/主进程pid',os.getpid())

# part2:开多个进程,每个进程都有不同的pid
p1=Process(target=work)
p2=Process(target=work)
p1.start()
p2.start()
print('主线程/主进程pid',os.getpid())

4.10.2 开启效率的较量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from threading import Thread
from multiprocessing import Process
import os

def work():
print('hello')

if __name__ == '__main__':
# 在主进程下开启线程
t=Thread(target=work)
t.start()
print('主线程/主进程')
'''
打印结果:
hello
主线程/主进程
'''

# 在主进程下开启子进程
t=Process(target=work)
t.start()
print('主线程/主进程')
'''
打印结果:
主线程/主进程
hello
'''

4.10.3 内存数据的共享问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from  threading import Thread
from multiprocessing import Process
import os
def work():
global n
n=0

if __name__ == '__main__':
# n=100
# p=Process(target=work)
# p.start()
# p.join()
# print('主',n) # 毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100


n=1
t=Thread(target=work)
t.start()
t.join()
print('主',n) # 查看结果为0,因为同一进程内的线程之间共享进程内的数据

4.11 Thread类的其他方法

Thread实例对象的方法:

  • isAlive():返回线程是否活动的。
  • getName():返回线程名。
  • setName():设置线程名。

threading模块提供的一些方法:

  • threading.currentThread():返回当前的线程变量。
  • threading.enumerate():返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

4.12 多线程实现socket

  • 服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
while True:
data=conn.recv(1024)
print(data)
conn.send(data.upper())

if __name__ == '__main__':

while True:
conn,addr=s.accept()


p=threading.Thread(target=action,args=(conn,))
p.start()
  • 客户端
1
2
3
4
5
6
7
8
9
10
11
12
import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
msg=input('>>: ').strip()
if not msg:continue

s.send(msg.encode('utf-8'))
data=s.recv(1024)
print(data)

4.13 死锁问题(递归锁,可重入锁)

1
2
3
1 所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

2 可重入锁,递归锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from threading import Thread,Lock,RLock
import time
import random

def eat1(lock_1,lock_2,name):
lock_1.acquire()
print('%s:拿到了筷子' % name)
time.sleep(random.random())
lock_2.acquire()
print('%s拿到了面条' % name)
lock_2.release()
print('%s放下了面条' % name)
lock_1.release()
print('%s放下了筷子' % name)

def eat2(lock_1,lock_2,name):
lock_2.acquire()
print('%s:拿到了筷子' % name)
time.sleep(random.random())
lock_1.acquire()
print('%s拿到了面条' % name)
lock_1.release()
print('%s放下了面条' % name)
lock_2.release()
print('%s放下了筷子' % name)

if __name__ == '__main__':
lock_1 = Lock()
lock_2 = Lock()

for i in ['张三','李四','王五']:
t = Thread(target=eat1,args=[lock_1,lock_2,i])
t.start()
for i in ['赵六','王7','傻逼']:
t = Thread(target=eat2,args=[lock_1,lock_2,i])
t.start()



### 解决死锁问题 RLock:可重入,可以重复acquire,获得几次,就要释放几次
from threading import Thread, Lock,RLock
import time
import random


def eat1(lock_1, lock_2, name):
lock_1.acquire()
print('%s:拿到了筷子' % name)
time.sleep(random.random())
lock_2.acquire()
print('%s:拿到了面条' % name)
print('开始吃面')
time.sleep(random.random())
lock_2.release()
print('%s放下了面条' % name)
lock_1.release()
print('%s放下了筷子' % name)


def eat2(lock_1, lock_2, name):
lock_2.acquire()
print('%s:拿到了面条' % name)
time.sleep(random.random())
lock_1.acquire()
print('%s:拿到了筷子' % name)
print('开始吃面')
time.sleep(random.random())
lock_1.release()
print('%s放下了筷子' % name)
lock_2.release()
print('%s放下了面条' % name)


if __name__ == '__main__':
lock_1 = RLock()
lock_2 = lock_1

# lock_1 = Lock()
# lock_2 = lock_1
for i in ['张三', '李四', '王五']:
t = Thread(target=eat1, args=[lock_1, lock_2, i])
t.start()
for i in ['赵6', '往7', '傻逼']:
t = Thread(target=eat2, args=[lock_1, lock_2, i])
t.start()

4.14 线程队列

1
2
1 线程Queue,解决线程间数据共享的问题
2 线程间数据共享可以使用共享变量(可能会存在并发安全的问题)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from threading import Thread
from queue import Queue,LifoQueue,PriorityQueue # 线程queue
# Queue:先进先出
#LifoQueue:后进先出
#PriorityQueue:优先级队列




import time
def task(queue):
time.sleep(3)
queue.put('lqz')



if __name__ == '__main__':
queue=Queue()
t=Thread(target=task,args=[queue,])
t.start()


res=queue.get()
print(res)


# Queue:先进先出
# LifoQueue:后进先出
# PriorityQueue:优先级队列


if __name__ == '__main__':
quque1=Queue()

quque1.put(1)
quque1.put(2)
print(quque1.get())



quque2=LifoQueue() Last in First out
quque2.put(1)
quque2.put(2)
print(quque2.get())


quque3=PriorityQueue()
quque3.put((1,'lqz'))
quque3.put((100,'egon'))
# 数字越小,优先级越高
print(quque3.get())

5 进程池,线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# 线程池,进程池都在这个模块下concurrent.futures
import time
import os

import random
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

def task(n):
print(os.getpid(), '开始执行了')
time.sleep(random.random())
return n * n



def callback(result):
print(result)
if __name__ == '__main__':
# 开进程池去执行
# ProcessPoolExecutor实例化得到一个对象
pool_p=ProcessPoolExecutor(3)
ll=[]
for i in range(10):
# 把任务提交到进程池执行
f=pool_p.submit(task,n=i)
ll.append(f)

# 等待所有子进程执行完成,主进程在执行
pool_p.shutdown()

for l in ll:
res=l.result() # 取到当前进程执行任务的返回值
print(res)

print('我是主进程')

#map取代for循环的,第一个参数是要执行的任务,第二个参数,是一个可迭代对象,迭代一次的结果,会传给任务



for i in range(10):
f=pool_p.submit(task,n=i)
等同于上面
pool_p.map(task,range(10))
pool_p.shutdown()
print('主进程')


## 回调
for i in range(10):
pool_p.submit(task,n=i).add_done_callback()

def task(n):
print(os.getpid(), '开始执行了')
time.sleep(1)
return n * n


def callback(result):
print(result.result())


if __name__ == '__main__':
pool_p = ProcessPoolExecutor(3)
for i in range(10):
pool_p.submit(task, n=i).add_done_callback(callback)
'''

submit
shutdown
result
map(了解)
add_done_callback:回调

'''

6 协程

6.1 介绍

1
2
3
1 协程是:程序级别的切换,单线程下实现并发
python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)

6.2 greenlet模块(初级模块,实现了保存状态加切换)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 安装第三方模块:在命令行下
# pip3 install greenlet
# pip3 uninstall greenlet 卸载第三方模块
# pip3 list # 列出当前解释器环境下安装的第三方模块


from greenlet import greenlet



def eat(name):
print(name,'在吃了一口')
g2.switch(name)

print(name,'在吃了第二口')
g2.switch()

def play(name):
print(name, '玩了一下')
g1.switch()

print(name, '玩了第二下')

g1=greenlet(eat)
g2=greenlet(play)

g1.switch('egon')


### 写两个task,一个计算从1+1w,另一个计算从1乘以到1w,统计一下,切换执行时间快还是不切换快

6.3 gevent模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
## gevent模块,协程模块,遇到io可以自动切换


# pip3 install gevent

import gevent
import time






###使用原来的time的io,不会切,并且变成了串行
# def eat(name):
# print(name, '在吃了一口')
# time.sleep(2)
#
# print(name, '在吃了第二口')
#
#
# def play(name):
# print(name, '玩了一下')
# time.sleep(3)
# print(name, '玩了第二下')
#
#
# res1 = gevent.spawn(eat, 'egon')
# res2 = gevent.spawn(play, 'egon')
#
#
# ctime=time.time()
#
# gevent.joinall([res1,res2]) # 相当于上面那两句
# print('主线程')
# print(time.time()-ctime)


### time的io也要切换

# 猴子补丁:把原来的io全都替换成gevent的io
from gevent import monkey;monkey.patch_all()



def eat(name):
print(name, '在吃了一口')
time.sleep(2)

print(name, '在吃了第二口')


def play(name):
print(name, '玩了一下')
time.sleep(3)
print(name, '玩了第二下')


res1 = gevent.spawn(eat, 'egon')
res2 = gevent.spawn(play, 'egon')


ctime=time.time()

gevent.joinall([res1,res2]) # 相当于上面那两句
print('主线程')
print(time.time()-ctime)

6.4 asyncio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 内置模块   python 3.4 推出这个模块,python作者主导的
import asyncio
import time
import threading
# 这个函数是协程函数
async def task():
res=threading.current_thread().getName()
print(res)
print('xxx')
await asyncio.sleep(2)
print('协程执行完成')

async def task2():
res=threading.current_thread().getName()
print(res)
print('2222')
await asyncio.sleep(3)
print('222协程执行完成')



ctime=time.time()
loop=asyncio.get_event_loop()



tasks=[task(),task2()]



loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print(time.time()-ctime)

作业

1
2
1 使用greenlet写两个task,一个计算从1+1w,另一个计算从1乘以到1w,统计一下,切换执行时间快还是不切换快
2 基于gevent协程,实现一个高并发的TCP服务端