09-线程和进程

线程和进程 #

1. 同步和异步 #

针对结果

  • 同步 - 多任务,多个任务执行的时候有先后的顺序, 必须一个先执行后, 另外一个才能继续执行, 只有一条运行主线
  • 异步 - 多任务, 多个任务之间执行没有想先后顺序, 可以同时运行, 执行时先后顺序不会对程序有什么影响, 存在多条运行主线

2. 阻塞和非阻塞 #

针对运行状态 线程的状态(就绪、运行、阻塞)

  • 阻塞 - 从调用者的角度出发, 如果在调用的时候, 被卡住, 不能再继续往下执行, 需要等待, 就是 阻塞
  • 非阻塞 - 从调用者的角度出发, 如果在调用的时候, 没有被卡住, 能够继续向下执行, 无需等待, 就是 非阻塞

3. 并发和并行 #

  • 并发 - 同时处理任务
  • 并行 - 交替处理任务, 类似线程之间不断切换

并发的关键是你有处理多个任务的能力,不一定要同时。

并行的关键是你有同时处理多个任务的能力,强调的是同时.

下面这篇文章可以参考解释上述概念

https://blog.csdn.net/timemachine119/article/details/54091323

进程和线程使用 #

  • 进程:内存独立, 线程共享同一进程的内存, 一个进程就像是一个应用程序(app)
  • 进程是资源的组合, 线程是执行的单位
  • 进程之间不能直接相互访问, 同一进程中的线程可以相互通讯
  • 创建新的进程很消耗系统资源, 线程非常轻量, 只需要保存线程运行时的必要数据, 如上下文, 程序的堆栈信息
  • 同一进程里的线程可以相互控制, 父进程可以控制子进程

开多进程

  • redis 缓存问题 读写分离, 单独设置缓存服务器来保存缓存, 读写都在该服务器上进行

关于 IO 密集型任务 和 计算密集型任务 #

  • CPU密集型 - 多进程 计算
  • IO密集型 - 多线程 文本操作

如果多线程的进程是CPU密集型的,那多线程并不能有多少效率上的提升,相反还可能会因为线程的频繁切换,导致效率下降,推荐使用多进程;如果是IO密集型,多线程进程可以利用IO阻塞等待时的空闲时间执行其他线程,提升效率。所以我们根据实验对比不同场景的效率

线程常用方法 #


t.start() 激活线程  开始
t.getName()  获取线程名称
t.setName()  设置
t.name : 获取或设置线程的名称

t.is_alive()  判断线程是否为激活状态

t.isAlive() 判断线程是否为激活状态

t.setDaemon() 设置为后台线程或前台线程默认False;通过一个布尔值设置线程是否为守护线程必须在执行start()方法之后才可以使用如果是后台线程主线程执行过程中后台线程也在进行主线程执行完毕后后台线程不论成功与否均停止如果是前台线程主线程执行过程中前台线程也在进行主线程执行完毕后等待前台线程也执行完成后程序停止

t.isDaemon()  判断是否为守护线程

t.ident 获取线程的标识符线程标识符是一个非零整数只有在调用了start()方法之后该属性才有效否则它只返回None

t.join() 逐个执行每个线程执行完毕后继续往下执行该方法使得多线程变得无意义

t.run() 线程被cpu调度后自动执行线程对象的run方法

==进程==



import os
import time
import random

from multiprocessing import Process


def coding():
    while True:
        print('AAAAAA, 进程号:%s' % os.getpid())
        time.sleep(random.randint(1, 5))
        print('BBBBBBB, 进程号:%s' % os.getpid())


def play():
    while True:
        print('1111111111, 进程号:%s' % os.getpid())
        time.sleep(random.randint(1, 5))
        print('2222222222, 进程号:%s' % os.getpid())


def main():
    p1 = Process(target=coding)
    p2 = Process(target=play)
    
    p1.start()   # 进程之间在不阻塞的情况下是没有影响的,
    # 阻塞   等执行完才会进行下一个
    # p1.join()
    # p2.join(timeout=3)  # 设置超时时间

    p2.start()


if __name__ == '__main__':
    main()

==线程==




import threading
import time


class Study(threading.Thread):

    def __init__(self, name):
        super(Study, self).__init__()
        self.s_name = name

    def run(self):  # 重构方法
        print('当前线程名称- %s' % threading.current_thread().name)
        print('开始学习!- %s' % self.s_name)
        time.sleep(3)
        print('学习结束')
        print('当前线程名称- %s' % threading.current_thread().name)
        # print('---' * 20)


def main():
    
    s1 = Study('语文')
    s2 = Study('数学')
    
    # 守护线程  在 start 前  主线程结束 子线程会被强制结束
    # s1.daemon = True
    # s2.daemon = True
    
    # s1.start()
    # 阻塞
    # s1.join()  # 阻塞在这里  等 s1 结束 再向下执行程序
    
    # s2.start()
    
    s1.run()   # 都变为主线程  程序会顺序执行, 不存在同时执行
    s2.run()   # 相当于只是在执行函数, 并没有使用 多线程
    
    print('测试结束-----')
    
    # s1.run()


if __name__ == '__main__':
    main() 
  • 线程锁

多线程会共享资源, 需要线程锁, 当多个线程需要对同一资源进行修改时, 需要线程锁来保护资源, 避免操作资源出错

未加锁



import threading


class MyThread(threading.Thread):

    def __init__(self):
        super(MyThread, self).__init__()
        # self.s_name = s_name

    def run(self):
        global n
        print('number: %s,  threading name: %s'% (n, self.name))
        n += 1


def main():
    thread_list = []
    for i in range(20):
        t1 = MyThread()
        thread_list.append(t1)

    for t in thread_list:
        t.start()


if __name__ == '__main__':
    n = 0
    main()

加线程锁

  • Lock() 和 RLock()
  • lock = threading.RLock() 允许加多把锁
  • lock = threading.Lock() 只能加一把锁
  • lock = threading.BoundedSemaphore(3) 同时允许三个线程进入, 同时进入的线程多于 3 时会引发 ValueError

锁的本质是内部有一个计数器,调用 acquire() 会使这个计数器 -1,release() 则是+1.计数器的值永远不会小于 0,当计数器到 0 时,再调用 acquire() 就会阻塞,直到其他线程来调用release()



import threading


mylock = threading.Lock()  # 添加锁


class MyThread(threading.Thread):

    def __init__(self):
        super(MyThread, self).__init__()
        # self.s_name = s_name

    def run(self):
        if mylock.acquire():  # 锁定
            global n
            print('number: %s,  threading name: %s'% (n, self.name))
            n += 1
            mylock.release()  #释放锁


def main():
    thread_list = []
    for i in range(20):
        t1 = MyThread()
        thread_list.append(t1)

    for t in thread_list:
        t.start()


if __name__ == '__main__':
    n = 0
    main()

事件Event

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

  • clear:将“Flag”设置为False
  • set:将“Flag”设置为True
  • Event.isSet() :判断标识位是否为Ture
 1 # 事件 event
 2 lock = threading.Event()
 3 def task(arg):
 4     time.sleep(1)
 5     # 锁住所有的线程
 6     lock.wait()
 7     print(arg)
 8 for i in range(10):
 9     t = threading.Thread(target=task,args=(i,))
10     t.start()
11 while 1:
12     value = input('>>:').strip()
13     if value == '1':
14         lock.set() # 打开锁,执行上面的print
15         # lock.clear() # 再锁上

线程池 #

  • 会让线程 更具线程池设置的个数进行 执行, 每次同时执行的个数是设置的个数

import time

from concurrent.futures import ThreadPoolExecutor

def task(i):

    time.sleep(2)
    print('hello!, 编号:', i)

pool = ThreadPoolExecutor(3)

for i in range(50):
    pool.submit(task, i)   # 每次输出 三个 hello

线程池的回调函数

  • 将前面函数的返回值作为后面的结果进行传递
  • future.add_done_callback(add1000)
  • num = future.result()

import time

from concurrent.futures import ThreadPoolExecutor


def add100(num):
    print('我是 100 ')
    return num + 100


def add1000(future):
    print('我是 + 1000')
    num = future.result()
    time.sleep(5)
    print(num + 1000)


def main():
    pool = ThreadPoolExecutor(3)
    for num in range(1,50):
        print('开始计算数字:%s !' % num)
        future = pool.submit(add100, num)
        future.add_done_callback(add1000)  # 前面的结果返回后进行下个函数的调用


if __name__ == '__main__':
    main()

多进程 #

  • 多进程之间可以数据共享

import time

from multiprocessing import Process


def task():
    time.sleep(1)
    print('hello!')


def main():

    for i in range(10):
        p = Process(target=task)
        # p.daemon = True
        p.start()
        # p.join()
    print('主进程结束!!!')


if __name__ == '__main__':
    main()
    
    
###  数据共享

import time

from multiprocessing import Process, Array


def task(num, li):
    time.sleep(1)
    li[num] = num
    print(list(li))


def main():
    li = Array('i', 10)
    for i in range(10):
        p = Process(target=task, args=(i, li))
        # p.daemon = True
        p.start()
        # p.join()
    print('主进程结束!!!')


if __name__ == '__main__':
    main()

进程池 #

和线程池差不多

from concurrent.futures import ProcessPoolExecutor as PPE
 
 
 #基本用法
 def task(arg):
     time.sleep(1)
     print(arg)
 
 pool = PPE(5)
 for i in range(10):
     pool.submit(task,i)
 
 
 # 进程池回调
 def call(arg):
     data = arg.result()
     print(data)
 
 def task(arg):
     print(arg)
     return arg+100
 
 pool = PPE(5)
 for i in range(10):
     obj = pool.submit(task,i)
     obj.add_done_callback(call)