1.线程与进程的区别
2.为何会选择多线程
3.创建多线程的方式
4.线程对象的属性和方法
5.守护线程
6.线程池与进程池
7.多线程,到底该设置多少个线程?
8.队列

线程顾名思义,就是一条流水线工作的过程(流水线的工作需要电源,电源就相当于cpu),而一条流水线必须属于一个车间,一个车间的工作过程是一个进程,车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一条流水线。
所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源调度集合),而线程才是cpu上的执行单位

1.线程与进程的区别

1、每启动一个进程,进程内都至少有一个线程。

2、进程本身只是一个资源调度集合,并不能真正执行,进程内的线程才是真正的运行单位。

3、一个进程内可以启动多个线程,同一进程内线程间共享资源。

4、启动线程的开销远远小于开进程。

5、线程可以相当程度控制相同进程下的线程,进程只能控制其子进程。

6、对主线程的更改(取消、优先级更改等)可能会影响其他线程的行为,对父进程的修改则不会影响子进程。

7、进程之间是竞争关系,线程之间是协作关系

2.为何会选择多线程?

  • 同一个进程内的多个线程共享该进程内的地址资源
  • 线程比进程更轻量级,线程比进程更容易创建可撤销

join方法:join 会卡住主线程,并让当前已经 start 的子线程继续运行,直到调用.join的这个线程运行完毕

3.创建线程的方式★★★

Python创建多线程主要有两种方法:函数式,类创建

函数式创建

在Python3中,Python提供了一个内置模块 threading.Thread,可以很方便地让我们创建多线程。threading.Thread() 一般接收两个参数:

  • 线程函数名:要放置线程让其后台执行的函数,由我们自已定义,注意不要加();
  • 线程函数的参数:线程函数名所需的参数,以元组的形式传入。若不需要参数,可以不指定
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    from threading import Thread, currentThread
    # 简单多线程,解释:
    # 1)一个工人只做一个任务,做完就撤了;
    # 2)有多少个任务就得有多少个工人;
    # 3)这个方式处理任务需要快,但人员成本开销高。


    def task(taskId):
    thread_name = currentThread().getName()
    print('工人【%s】正在处理任务【%d】:do something...' % (thread_name, taskId))


    if __name__ == '__main__':
    threads = []
    # 这里弄5个线程(一个线程相当于一个工人)
    for i in range(5):
    # target 参数指定线程要处理的任务函数,args 参数传递参数到任务函数去
    t = Thread(target=task, args=(i+1,))
    threads.append(t)
    # 启动线程
    for t in threads:
    t.start()
    # 阻塞线程
    for t in threads:
    t.join()

类创建

相比较函数而言,使用类创建线程,会比较麻烦一点。首先,我们要自定义一个类,对于这个类有两点要求:

  • 必须继承 threading.Thread 这个父类
  • 必须覆写 run 方法

这里的 run 方法,和我们上面线程函数的性质是一样的,可以写我们的业务逻辑程序。在 start() 后将会调用。

来看一下例子,为了方便对比,run函数我复用上面的main。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# -*- coding:utf-8 -*-
import time
from threading import Thread


class MyThread(Thread):
def __init__(self, name="Python"):
"""
类的继承:写法一
super(父类名, self).__init__(父类的属性参数一,父类的属性参数二)
eg:
1. 线程类的继承 super(MyThread, self).__init__()
2. 自定义类的继承 super(Animal, self).__init__(name, types)

类的继承:写法二
super().__init__(父类的属性参数一,父类的属性参数二)
"""

# 注意,super().__init__() 一定要写,而且要写在最前面,否则会报错。
# super(MyThread, self).__init__()
super().__init__()
self.name = name

def run(self):
for i in range(2):
print("hello {} ==> index: {}".format(self.name, i + 1))
time.sleep(1)


if __name__ == '__main__':
print("这是主线程!")
start_time = time.time()
# 创建线程01,不指定参数
thread_01 = MyThread()
# 创建线程02,指定参数
thread_02 = MyThread(name="Badwoman")
thread_01.start()
thread_02.start()
print("主线程结束!")
print(time.time() - start_time)

4.线程对象的属性和方法

4.1 Thread实例对象的方法

isAlive():返回线程是否活动的
getName():返回线程名
setName():设置线程名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from threading import Thread, currentThread   # 得到线程对象的方法
from threading import active_count # 得到活跃进程数
from threading import enumerate # 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
import time

# 需要注意的是线程没有子线程的概念,线程都是属于进程的
def task():
print("%s is running" % currentThread().getName()) # 对象下有一个getName()方法
time.sleep(2)
print("%s is done" % currentThread().getName())

if __name__ == '__main__':
# getName()方法返回线程名
t = Thread(target=task, name='子线程1')
t.start()
print("主进程", currentThread().getName())
"""
子线程1 is running
主进程 MainThread
子线程1 is done
"""

5.守护线程

守护线程守护主线程,等到主线程死了才会被销毁。在有其他线程的情况下,主线程代码运行完后,等其他非守护线程结束,守护线程才会死掉。

无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁。需要强调的是:运行完毕并非终止运行。运行完毕的真正含义:

1、对主进程来说,运行完毕指的是主进程代码运行完毕。

2、对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才能运行完毕

1
2
3
4
5
6
# 创建一个线程
t=Thread(target=func)

# 守护线程设置:设置线程是否随主线程退出而退出,默认为False
t.daemon = True
t.daemon = False

6.进程池和线程池

进程池和线程池的接口一模一样,用法也一样。池就是要对数目加以限制,保证机器一个可承受的范围,以一个健康的状态保证它的运行

concurrent.futures 模块提供了高度封装的异步调用接口
ThreadPoolExecutor: 线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用

基本方法:

1、submit(fn, args, *kwargs)异步提交任务

2、map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作

3、shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕,submit和map必须在shutdown之前

4、result(timeout=None) 取得结果

5、add_done_callback(fn) 回调函数

线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 线程池
# 解释:
# 1)一个工人同一时间只做一个任务,但做完一个任务可以接着做下一个任务;
# 2)可以分配多个任务给少量工人,减少人员成本开销。


import threading
from concurrent.futures import ThreadPoolExecutor


# 任务
def task(taskId):
thread_name = threading.current_thread().getName()
print('工人【%s】正在处理任务【%d】:do something...' % (thread_name, taskId))


def main():
# 初始化线程池(商会),定义好池里最多有几个工人
pool = ThreadPoolExecutor(max_workers=5, thread_name_prefix='自定义线程前缀名Thread')
# 准备10个任务
for i in range(10):
# 提交任务到池子(商会)里(它会自动分配给工人)
pool.submit(task, i+1)


if __name__ == '__main__':
main()

shutdown()方法的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os, time, random

def task(name):
print("name: %s pid: %s run" % (name, os.getpid()))
time.sleep(random.randint(1,3))


if __name__ == '__main__':
pool = ProcessPoolExecutor(4) # 指定进程池大小,最大进程数,如果不指定默认是CPU核数

for i in range(10):
"""从始至终四个进程解决这10个任务,谁没事了接新任务"""
pool.submit(task, 'egon%s' %i) # 提交任务的方式————异步调用:提交完任务,不用在原地等任务执行拿到结果。

pool.shutdown() # 把提交任务入口关闭,默认参数wait=True;同时还进行了pool.join()操作,等任务提交结束,再结束主进程

print("主进程")
"""
name: egon0 pid: 12502 run
name: egon1 pid: 12503 run
....
name: egon9 pid: 12505 run
主进程
"""

7.多线程,到底该设置多少个线程?

那我们如何分配线程?我们提供一个公式:

最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目

我们继续上面的任务,我们的服务器CPU核数为4核,一个任务线程cpu耗时为20ms,线程等待(网络IO、磁盘IO)耗时80ms,那最佳线程数目:( 80 + 20 )/20 * 4 = 20。也就是设置20个线程数最佳。

从这个公式上面我们就得出,线程的等待时间越大,线程数就要设置越大,这个正好符合我们上面的分析,可提升CPU利用率。那从另一个角度上面说,线程数设置多大,是根据我们自身的业务的,需要自己去压力测试,设置一个合理的数值。

基础常规标准

我们可以尝试去猜想,因为很多业务集中到一个线程池中,不像上面的案例比较简单,事实上业务太多,怎么设置呢?这个就是要去压力测试去调整。不过我们的前辈已经帮我们总结了一个基础的值(最终还是要看运行情况自行调整)

1、CPU密集型:操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2。核数为4的话,一般设置 5 或 8

2、IO密集型:文件操作,网络操作,数据库操作,一般线程设置为:cpu核数 / (1-0.9),核数为4的话,一般设置 40

8.队列

Queue用于建立和操作队列,常和threading类一起用来建立一个简单的线程队列
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

队列的创建

  • Queue.Queue(maxsize)  FIFO(先进先出队列)
  • Queue.LifoQueue(maxsize)  LIFO(先进后出队列)
  • Queue.PriorityQueue(maxsize) 优先级顺序创建队列

maxsize表示创建队列中允许的最大项数(队列总容量超过maxsize值时,将会截取至maxsize),如果设置的maxsize小于1,则表示队列的长度无限长

线程安全队列

在线程中,访问一些全局变量,加锁是一个经常的过程。
Queue模块提供了一个适用于多线程编程的先进先出数据结构,可以用来安全的传递多线程信息。它本身就是线程安全的,使用put和get来处理数据,不会产生对一个数据同时读写的问题,所以是安全的
Python中的queue模块中提供了同步的、线程安全的队列类,包括FIFO(先进先出)队列Queue,LIFO(后入先出)队列LifoQueue。这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么都做完),能够在多线程中直接使用,可以使用队列来实现线程间的同步。

队列的主要方法

FIFO是常用的队列,其一些常用的方法有:

  • Queue.qsize():返回队列大小
  • Queue.empty():判断队列是否为空
  • Queue.full():判断队列是否满了
  • Queue.get([block[,timeout]]):从队列头删除并返回一个item,当队列为空时继续执行get取值时就会引发异常
  • Queue.put(item[,block[,timeout]]):向队尾插入一个item,同样若队列满时继续执行put填值时就会引发异常
  • Queue.task_done():从场景上来说,处理完一个get出来的item之后,调用task_done将向队列发出一个信号,表示本任务已经完成
  • Queue.join():监视所有item并阻塞主线程,直到所有item都调用了task_done之后主线程才继续向下执行。这么做的好处在于,假如一个线程开始处理最后一个任务,它从任务队列中拿走最后一个任务,此时任务队列就空了但最后那个线程还没处理完。当调用了join之后,主线程就不会因为队列空了而擅自结束,而是等待最后那个线程处理完成了。
  • Queue.unfinished_tasks: 返回队列q.task_done()后的队列的大小

简单举例队列与多线程的搭配使用:本示例是队列大小无限制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import time
import queue
from threading import Thread, currentThread


def run_test(q):
while not q.empty():
value = q.get()
print("线程【{}】正在处理队列值【{}】\n".format(currentThread().name, value), end="")
time.sleep(0.8)
# 本次队列任务处理完成,此处不可省略
q.task_done()


def queue_demo():
# 创建线程列表
thread_list = []
# 声明队列
q = queue.Queue(25)
# 队列长度
queue_len = 0
# 放入队列待处理值
for i in range(1, 9):
q.put(i)
queue_len += 1
# 【处理线程】可自定义创建多线程数目
for x in range(1, 3):
t = Thread(target=run_test, args=(q, ))
thread_list.append(t)
for t in thread_list:
t.start()
for t in thread_list:
t.join()


if __name__ == '__main__':
print("=-=-=-=-=-="*5)
print("队列Queue多线程任务开始")
start_time = time.time()
queue_demo()
print("主线程结束!任务总耗时:{}秒!".format(time.time() - start_time))
print("=-=-=-=-=-=" * 5)

"""
程序执行结果:
=-=-=-=-=-==-=-=-=-=-==-=-=-=-=-==-=-=-=-=
队列Queue多线程任务开始
线程【Thread-1】正在处理队列值【1】
线程【Thread-2】正在处理队列值【2】
线程【Thread-2】正在处理队列值【3】
线程【Thread-1】正在处理队列值【4】
线程【Thread-2】正在处理队列值【5】
线程【Thread-1】正在处理队列值【6】
线程【Thread-1】正在处理队列值【7】
线程【Thread-2】正在处理队列值【8】
主线程结束!任务总耗时:3.232334613800049秒!
=-=-=-=-=-==-=-=-=-=-==-=-=-=-=-==-=-=-=-=
"""

参考文章


 评论

联系我 | Contact with me

Copyright © 2019-2020 谁知你知我,我知你知深。此恨经年深,比情度日久

博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议