许多实际应用场景涉及到大规模数据的处理,这往往离不开并行计算。Python 的 multiprocessing 模块是一个强大而实用的工具,利用多核处理器的优势,将任务分配给多个进程并同时执行,从而提高程序的性能和效率。本文将详细总结如何使用 multiprocessing 模块来实现多进程编程

# 概述

现代操作系统如 MacOS,UNIX,Linux,Windows 等,都是支持 “多任务” 的操作系统,即可以同时运行多个任务。

  • 在单核 CPU 环境中,多任务执行是轮流让各个任务占用 CPU 交替执行。由于 CPU 调度执行速度非常快,从而有多个任务同时执行的效果。
  • 在多核 CPU 环境中,操作系统通过合适的调度算法将多个任务调度到各个 CPU 核心上执行,真正实现了并行计算。

实现多任务的方式可以分成:多进程模式、多线程模式、以及多进程 + 多线程等。

  • 进程是计算机中正在运行的程序的实例。每个进程都有自己的地址空间、数据栈和控制信息,可以独立执行任务。
  • 线程是进程中的一个执行单元,可以看作是轻量级的进程。多个线程共享同一进程的资源,包括内存空间、文件描述符等。

本文主要介绍 Python 的多进程编程。多进程编程具有以下优缺点:

优点

  1. 多进程可以同时执行多个任务,充分利用多核处理器的能力,实现并行处理。这可以显著提高程序的性能和效率,特别是在处理密集型任务或需要大量计算的场景中。
  2. 每个进程都有自己的独立地址空间和执行环境,进程之间互不干扰。这意味着每个进程都可以独立地执行任务,不会受到其他进程的影响。这种独立性使得多进程编程更加健壮和可靠。

缺点

  1. 每个进程都需要独立的内存空间和系统资源,包括打开的文件、网络连接等。多进程编程可能会增加系统的资源消耗,尤其是在创建大量进程时。
  2. 在多进程编程中,进程之间的切换需要保存和恢复进程的执行环境,这涉及到上下文切换的开销。频繁的进程切换可能会导致额外的开销,影响程序的性能。
  3. 由于多进程之间的数据是相互隔离的,需要通过特定的机制进行数据共享和同步。这可能涉及到进程间通信(IPC)的复杂性,如队列、管道、共享内存等。正确处理数据共享和同步是多进程编程中的挑战之一。

多进程编程在并行处理和资源隔离方面具有明显的优势,但也涉及到资源消耗、上下文切换开销、数据共享和同步等问题。在实际应用中,开发者应权衡利弊,根据具体场景选择适合的编程模型和工具。

在 Windows 系统中,多进程代码需包裹在 if __name__ == '__main__': 块内,否则可能引发递归创建子进程的错误


# 进程

在 Python 中,可以使用 multiprocessing 模块来创建和管理进程。该模块提供了丰富的类和函数,用于创建、启动和管理进程。

import multiprocessing

multiprocessing.Process 为进程类,用于创建进程对象,可以通过继承该类来自定义进程类。

Process([group [, target [, name [, args [, kwargs]]]]])

参数介绍:

  • group 参数未使用,值始终为 None ,可以省略不写。
  • target 表示目标函数,即子进程要执行的任务。
  • args 表示调用对象的位置参数元组,比如 args=(1,2,'egon',) 。如果只有一个参数,需要在后面加逗号。
  • kwargs 表示目标函数的字典参数,如 kwargs={'name':'egon','age':18}
  • name 为子进程的名称(可选,默认值为 Process-N ,其中 N 是进程的编号)。

创建进程对象时,推荐使用关键字参数的方式指定参数。

def run(name):
    while True:
        print(f"{name} is a good man")
        time.sleep(1)
if __name__ == "__main__":
    p = multiprocessing.Process(target=run, args=("lucky",))

Process 对象有以下几个主要属性:

  • p.daemon :默认值为 False ,如果设为 True ,代表 p 为后台运行的守护进程,当 p 的父进程终止时, p 也随之终止。注意, daemon 属性必须在 p.start() 之前设置,且守护进程不能创建自己的子进程。
  • p.name :进程的名称。
  • p.pid :进程的 pid ,在调用 p.start() 之前, pidNone

Process 对象有以下几个主要方法:

  • p.run() :进程启动时运行的方法,正是它去调用 target 指定的函数。当继承 Process 类自定义类时,通常需要重写该方法。
  • p.start() :启动进程,并调用该子进程中的 p.run()start() 方法只能调用一次,否则会抛出异常。
  • p.terminate() :强制终止进程,不会进行任何清理操作。如果该进程创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果该进程还保存了一个锁那么也将不会被释放,进而导致死锁。
  • p.is_alive() :如果进程仍然运行,返回 True ,否则返回 False
  • p.join([timeout]) :主线程等待 p 终止。 timeout 是可选的超时时间。需要注意, p.join 只能用于 start 方法开启的进程,而不能用于 run 方法开启的进程。

调用 Process 对象方法开启进程的进程称为主进程,对于 Process 对象为子进程。默认情况下,父进程的结束不会影响子进程,父进程会等待子进程结束再结束。如果希望父进程结束时子进程也随之结束,可以将子进程设置为守护进程( daemon=True )。


# 进程间通信

进程间通信(Inter-Process Communication,IPC)是指不同进程之间进行数据交换和共享信息的机制。子进程在创建时会对全局变量做一个备份,因此不能简单地使用全局变量等方式在进程间进行通信。

Python 的 multiprocessing 模块提供了多种 IPC 方式,包括队列、管道、共享内存等,需根据场景选择合适机制。

# 队列

队列是一种常用的进程间通信方式,通过队列可以实现进程之间的数据传输。

Python 的 multiprocessing 模块提供了 Queue 类来实现多进程之间的队列通信。进程可以通过 put() 方法将数据放入队列,其他进程则可以通过 get() 方法从队列中获取数据。

from multiprocessing import Process, Queue
import time
def producer(q):
    for i in range(3):
        q.put(f"Message {i}")
        time.sleep(0.5)
def consumer(q):
    while True:
        item = q.get()  # 阻塞操作
        if item is None: break  # 终止信号
        print(f"Received: {item}")
if __name__ == "__main__":
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    q.put(None)  # 发送终止信号
    p2.join()

队列是线程和进程安全的,支持任意可序列化的 Python 对象。默认情况下, put()get() 是阻塞操作,可以通过设置 block=Falsetimeout 参数实现非阻塞操作。

队列需显式传递给子进程(通过参数传递)。队列在大规模数据传输时性能较低,建议用于小规模控制指令。

# 管道

管道是另一种常用的进程间通信方式,通过管道可以实现进程之间的双向通信

Python 的 multiprocessing 模块提供了 Pipe 类来创建管道对象。 Pipe() 方法返回两个连接的管道端,一个用于发送数据,另一个用于接收数据。

from multiprocessing import Process, Pipe
def worker(conn):
    conn.send("Ready")         # 发送给父进程
    cmd = conn.recv()          # 接收父进程指令
    if cmd == "EXECUTE":
        conn.send("Task Done")
if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    p.start()
    print(parent_conn.recv())  # 输出: Ready
    parent_conn.send("EXECUTE")
    print(parent_conn.recv())  # 输出: Task Done
    p.join()

recv() 方法默认是阻塞的。这意味着当调用 recv() 时,如果管道中没有可用的数据,调用方会阻塞,直到管道中有数据可读。若两端同时调用 recv() 可能产生死锁。

管道适合少量进程间的直接通信。

# 共享内存

# 基础类型共享

共享内存是一种在多进程之间共享数据的高效方式,直接操作共享内存,性能最高。Python 的 multiprocessing 模块提供了 ValueArray 类来实现进程间共享数据。 Value 用于共享单个值,而 Array 用于共享数组。

from multiprocessing import Process, Value, Array, Lock
def increment(shared_value, lock):
    with lock:
        shared_value.value += 1
if __name__ == "__main__":
    counter = Value('i', 0)
    lock = Lock()
    procs = [Process(target=increment, args=(counter, lock)) for _ in range(4)]
    for p in procs: p.start()
    for p in procs: p.join()
    print(counter.value)  # 输出: 4

在创建共享值和共享数组时,需要指定数据类型(如整数、浮点数)和初始值。进程可以通过读写共享值和共享数组来进行进程间的数据共享。但要注意,Python 并没有在底层对共享内存实施额外的保护措施,需在编程中自行配合锁来保证操作原子性。

常见数据类型映射:

类型码 C 类型 Python 类型
'i' int int
'd' double float
'c' char bytes (长度 1)

# shared_memory 模块

multiprocessing.shared_memory 模块从 Python 3.8 开始提供,主要用于在多个进程之间实现真正的共享内存。它通过底层操作系统的共享内存机制,让不同进程能够直接访问同一片物理内存。相比传统的 ArrayValue 对象, shared_memory 模块在大数据量传输时往往更高效

通过以下方式创建或连接到一个 SharedMemory 实例,表示一块共享内存。

from multiprocessing import shared_memory
# 创建一块新的共享内存
shm = shared_memory.SharedMemory(create=True, size=1024)
# 或者指定名称
# shm = shared_memory.SharedMemory(name="my_shm", create=True, size=1024)
  • name :共享内存的标识符(字符串)。若不指定,系统会自动生成一个唯一名称。
  • create :布尔值,表示是否新建共享内存。如果设置为 False ,则表示连接到已经存在的共享内存(这时必须提供有效的 name )。
  • size :以字节为单位的共享内存大小,仅在 create=True 时才生效。

如果要在其他进程中访问同一块共享内存,只需使用相同的 name 并确保 create=False ,同时知道其大小(通常不需要显式指定,若成功连接会自动匹配原有大小)。

SharedMemory 实例对象有如下几个主要属性:

  • name :实例的名称,若创建时没有指定,会是系统自动生成的随机字符串。
  • size :该共享内存的大小(字节数)。只有在创建时才可以指定,后续读取该属性即可得知实际大小。
  • buf :这是一个 memoryview 对象,指向共享内存底层的字节缓冲区。可通过切片、索引等操作来直接读写字节数据。
shm.buf[0:5] = b"Hello"
data = bytes(shm.buf[0:5])  # b'Hello'

SharedMemory 实例对象有如下几个主要方法:

  • close() :关闭当前进程对这块共享内存的引用。调用后并不会立即销毁共享内存本身,只是表示当前进程不再使用。
  • unlink() :从系统中彻底移除这块共享内存。一旦调用 unlink() ,其他尚未关闭的进程依然可以继续访问,但在所有进程都关闭之后,这块共享内存将被系统回收。因此通常在主进程使用完毕后才会调用 unlink()

# Manager

multiprocessing.Manager 通过代理对象共享复杂数据结构(列表、字典等),自动处理进程间同步,线程安全,但性能较低,适合非高频操作。

Manager 对象的创建分成两种方式:

  1. 直接创建,使用完后需要调用 manger.shutdown() 手动关闭。

    manager = Manager()
    shared_list = manager.list()
    # ... 使用后需调用 manager.shutdown ()
  2. 上下文管理器(推荐的方法),无需手动关闭。

    with Manager() as mgr:
        shared_dict = mgr.dict()

Manager 支持的共享数据类型如总结成下表:

类型 创建方法 线程安全 备注
List manager.list() 类似 Python 的列表,支持动态增删改查
Dict manager.dict() 类似 Python 的字典,支持键值对操作
Namespace manager.Namespace() 可以通过 “点” 属性方式访问 / 修改数据
Value manager.Value(typecode, value) 存放单个 C 类型变量(如 intdouble 等)
Array manager.Array(typecode, iterable) 存放一组同类型元素,类似定长数组
Queue manager.Queue([maxsize]) 先进先出队列,用于跨进程通信
JoinableQueue manager.JoinableQueue([maxsize]) 可阻塞队列,支持 task_done()join()
Lock manager.Lock() 互斥锁,保证同一时刻只有一个任务访问资源
RLock manager.RLock() 可重入锁,同一线程可多次获取锁
Semaphore manager.Semaphore([value]) 信号量,控制同时访问共享资源的任务数量
BoundedSemaphore manager.BoundedSemaphore([value]) 有上限的信号量,初始值即最大可用信号量
Condition manager.Condition([lock]) 条件变量,常配合 Lock / RLock 使用
Event manager.Event() 事件同步机制,可通过 set() / wait() 控制

下面是一个使用示例:

def worker(m_list, m_dict):
    m_list.append("new_item")  # 操作与普通列表一致
    m_dict["pid"] = os.getpid()
if __name__ == "__main__":
    with Manager() as mgr:
        data_list = mgr.list(["init_data"])
        data_dict = mgr.dict({"version": 3.9})
        
        p = Process(target=worker, args=(data_list, data_dict))
        p.start(); p.join()
        
        print(data_list)  # ['init_data', 'new_item']
        print(data_dict)  # {'version': 3.9, 'pid': 12345}

# 多种通信方式对比

下面给出几种常见的多进程通信与共享方式的比较与建议,供参考。

方式 性能 数据类型限制 同步机制 适用场景
Value/Array 仅限于基础数值类型(int、float 等) 需手动加锁 对少量数值进行频繁读写,追求更高效率的场合
Manager 中等 可以存储任意可序列化的 Python 对象 自动同步 需要共享复杂数据结构(如列表、字典)或追求易用性、多进程灵活访问
Queue 中等 任意可序列化对象 内部已实现同步 典型的生产者 - 消费者模型,流式任务分发与结果收集
Pipe 较高 任意可序列化对象 无自动同步 双向或单向的点对点通信,适合简单的命令或数据在两个进程间快速传递
shared_memory 较高 底层字节缓冲,可结合 NumPy 数组等进行操作 需自行管理同步 大数据量、零拷贝共享,如图像处理、科学计算等,需要手动控制并发安全

使用建议:

  1. 优先使用 Queue 进行生产者 - 消费者式的任务队列,写法简洁且自带同步机制。
  2. 如果需要频繁读写大规模数值数据(如矩阵或图像),可优先考虑 shared_memory Value/Array + Lock 的方式来减少拷贝、提高效率。
  3. 当共享数据结构较为复杂且希望自动化处理同步时,可使用 Manager 提供的 list() , dict() 等方法。
  4. Pipe 更适合简单、点对点的通信,不太适合复杂的并发场景。
  5. 在高并发或需要更灵活控制同步的场合,可结合 LockSemaphoreEvent 等原语进行更细粒度的并发管理。

需要特殊说明的是,自动同步并不代表不需要加锁,这里谈到的线程安全指的是调用数据单个方法(如列表的 append()__setitem__() )是原子操作,而对于复杂的复合操作,依然可能会有共享数据的竞争,需要手动加锁。


# 进程间同步

进程间同步是确保多个进程按照特定顺序执行或在共享资源上进行互斥访问的一种机制。进程间同步的目的是避免竞态条件和数据不一致的问题。

Python 提供了多种机制来实现进程间的同步,包括锁、信号量、事件、条件变量等。

#

锁是一种最基本的同步机制,用于保护共享资源的互斥访问,确保在任意时刻只有一个进程可以访问共享资源。在 Python 中,可以使用 multiprocessing 模块的 Lock 类来实现锁。

from multiprocessing import Lock
lock = Lock()

进程通过调用 acquire()release() 函数来显式控制锁的持有与释放。

lock.acquire()  # 获取锁
try:
    # 需要同步的代码块(如修改共享资源)
finally:
    lock.release()  # 确保释放锁

其中 lock.acquire(blocking=True, timeout=None)blocking=True (默认)表示阻塞直到锁可用,如果为 False ,则立即返回是否成功获取锁。 timeout 参数指定阻塞等待的最长时间,单位是秒。

为了避免忘记释放锁,推荐使用上下文管理器模式,使用 with 语句自动管理锁的获取与释放。

with lock:
    # 需要同步的代码块

# 信号量

信号量是一种用于控制对共享资源的访问的机制。在多进程编程中,信号量可以用于限制同时访问某个共享资源的进程数量。

Python 的 multiprocessing 模块提供了 Semaphore 类来实现进程间的信号量机制。

Semaphore 对象初始化时需要传入参数指定资源数量,是对锁( Lock )的拓展。

from multiprocessing import Semaphore
sem = Semaphore(3)

sem = Semaphore(3) 表示最多允许 3 个进程同时访问资源。

同样的,信号量也有 acquire()release() 两个方法进行控制:

  • acquire(blocking=True, timeout=None) :获取信号量,若计数器为 0 则阻塞等待。
    • blocking=False 表示非阻塞模式,立即返回是否成功获取信号量。
    • timeout 参数用于设置阻塞等待的最长时间,单位为秒。
  • release() :释放信号量,计数器加 1。需注意释放次数不应超过获取次数,否则可能引发逻辑错误。

此外,信号量也可以使用 with 语句自动管理许可证的获取和释放,避免因异常或忘记释放导致的死锁。

import multiprocessing
import time
def worker(sem, id):
    with sem:
        print(f"进程 {id} 开始执行")
        time.sleep(2)
        print(f"进程 {id} 执行结束")
if __name__ == "__main__":
    sem = multiprocessing.Semaphore(2)  # 允许同时 2 个进程运行
    processes = [multiprocessing.Process(target=worker, args=(sem, i)) for i in range(5)]
    
    for p in processes:
        p.start()
    for p in processes:
        p.join()

在上述例子中,创建了一个信号量,初始值为 2。然后创建了 5 个进程,每个进程在执行前会尝试获取信号量,由此实现每次最多两个进程同时执行,其余进程需等待信号量的释放。

# 事件

事件是一种用于多进程间通信的同步机制,它允许一个或多个进程等待某个事件的发生,然后再继续执行。

Python 的 multiprocessing 模块提供了 Event 类来实现进程间的事件同步机制,通过一个布尔标志位(初始为 False )协调多个进程的执行。当标志为 False 时,调用事件对象的 wait() 方法会阻塞进程;当标志被设为 True 时,所有阻塞的进程会被唤醒继续执行。

from multiprocessing import Event
event = Event()  # 初始标志为 False

Event 对象有如下几种关键方法:

  • set() :将标志设为 True ,唤醒所有阻塞的进程。
  • clear() :将标志重置为 False ,后续调用 wait() 的进程会再次阻塞。
  • is_set() :返回当前标志状态( TrueFalse )。
  • wait(timeout=None) :若标志为 True :立即返回,不阻塞。若标志为 False :阻塞当前进程,直到标志被设为 True 或超时( timeout 参数,单位为秒)。

下面的红绿灯模型进程展示了 Event 的使用。

from multiprocessing import Process, Event
import time
def traffic_light(event):
    while True:
        print("红灯亮")
        event.clear()  # 标志设为 False,行人等待
        time.sleep(3)
        print("绿灯亮")
        event.set()    # 标志设为 True,行人通行
        time.sleep(3)
def pedestrian(event, id):
    event.wait()  # 等待绿灯
    print(f"行人 {id} 通过马路")
if __name__ == "__main__":
    event = Event()
    # 启动交通灯进程
    light_process = Process(target=traffic_light, args=(event,))
    light_process.daemon = True  # 设为守护进程,主进程退出时自动终止
    light_process.start()
    
    # 模拟多个行人进程
    for i in range(5):
        p = Process(target=pedestrian, args=(event, i))
        p.start()
        time.sleep(0.5)  # 行人依次到达
    
    time.sleep(15)

# 条件变量

条件变量是一种用于多进程间协调和同步的机制,它可以用于控制多个进程之间的执行顺序。

Python 的 multiprocessing 模块提供了 Condition 类来实现条件变量。它与 LockRLock 结合使用,通过 wait()notify() 等操作实现进程间通信。

from multiprocessing import Condition
cond = Condition() # 默认关联一个 RLock,也可手动传入其他锁对象

主要方法包括:

  • acquire() :获取关联的锁,进入临界区。
  • release() :释放锁,退出临界区。
  • wait(timeout=None) :释放锁并挂起当前进程,直到收到通知或超时。
  • notify(n=1) :唤醒至多 n 个等待此条件的进程。
  • notify_all() :唤醒所有等待此条件的进程。
from multiprocessing import Manager, Condition, Process
def producer(shared_list, cond):
    with cond:
        for i in range(3):
            shared_list.append(i)
            print(f"Produced {i}")
            cond.notify()
def consumer(shared_list, cond):
    with cond:
        while len(shared_list) == 0:
            cond.wait()
        item = shared_list.pop(0)
        print(f"Consumed {item}")
if __name__ == '__main__':
    with Manager() as manager:
        shared_list = manager.list()
        cond = Condition()
        p1 = Process(target=producer, args=(shared_list, cond))
        p2 = Process(target=consumer, args=(shared_list, cond))
        p1.start()
        p2.start()
        p1.join()
        p2.join()

# 进程池

进程池是一种用于管理和调度多个进程的机制,它可以有效地处理并行任务和提高程序的性能。进程池在 Python 中通常使用 multiprocessing 模块提供的 Pool 类来实现。

进程池的工作原理如下:

  1. 创建进程池时,会启动指定数量的进程,并将它们放入池中。
  2. 池中的进程会等待主进程提交任务。
  3. 主进程通过提交任务给进程池,将任务分配给空闲的进程。
  4. 进程池中的进程执行任务,并将结果返回给主进程。
  5. 主进程获取任务的结果,继续执行其他操作。
  6. 当所有任务完成后,主进程关闭进程池

# 创建进程池

使用 multiprocessing.Pool 创建进程池时,需指定进程数量。若不指定,默认使用当前 CPU 的核心数:

Pool(processes=None, initializer=None, initargs=())

参数:

  • processes :同时运行的进程数,默认为 CPU 核心数。
  • initializer :初始化函数,每个进程启动时会调用。
  • initargs :初始化函数的参数元组。

multiprocessing.cpu_count() 函数可以用来获取当前系统的 CPU 核心数,然后根据需要来指定进程池的大小。

通过 with 语句管理进程池,可自动处理资源释放问题:

with multiprocessing.Pool(4) as pool:
    # 在此范围内使用池

# 提交任务

multiprocessing.Pool 对象有以下几种常用方法用来提交任务:

  • apply(func, args=(), kwds={}) :同步执行单个任务,阻塞主进程直到任务完成并返回结果。

    • func :要执行的函数;
    • args :函数的位置参数(元组形式);
    • kwds :函数的关键字参数(字典形式)。
    result = pool.apply(function, args=(arg1, arg2))
  • apply_async(func, args=(), kwds={}, callback=None, error_callback=None) :异步执行单个任务,非阻塞主进程,立即返回 AsyncResult 对象,需通过 .get() 阻塞获取结果。

    • callback :任务成功后的回调函数(接收结果作为参数);
    • error_callback :任务异常时的回调函数(接收异常对象作为参数)。
    result = pool.apply_async(square, (5,), callback=print_result)
    print(result.get())  # 阻塞直到结果返回
  • map(func, iterable, chunksize=None) :将函数批量映射到可迭代对象的每个元素,返回结果列表(按输入顺序排列)。同步阻塞主进程,直到所有任务完成。

    • chunksize :将任务分块提交,默认根据进程数和数据长度自动计算。
    results = pool.map(square, [1, 2, 3])  # 返回 [1, 4, 9]
  • map_async(func, iterable, chunksize=None, callback=None, error_callback=None) :异步版本的 map ,返回 AsyncResult 对象,需通过 .get() 获取结果列表。该方法非阻塞主进程,支持回调处理结果或异常,结果顺序与输入一致。

    result = pool.map_async(square, [1, 2, 3])
    print(result.get())  # 输出 [1, 4, 9]
  • imap(func, iterable, chunksize=None) :是 map 方法的惰性求值版本,用于批量处理可迭代数据。与 map 不同,它不会一次性将所有结果加载到内存,而是通过迭代器按需生成结果,适合处理大规模数据集。返回的迭代器按输入顺序逐个产出结果

    from multiprocessing import Pool
    def square(x):
        return x * x
    if __name__ == "__main__":
        with Pool(4) as pool:
            results = pool.imap(square, [1, 2, 3, 4, 5], chunksize=2)
            for res in results:
                print(res)  # 输出顺序:1, 4, 9, 16, 25
  • imap_unordered(func, iterable, chunksize=None) :功能类似于 imap 方法,但不保证结果顺序,哪个任务先完成就先返回结果。

  • close() :关闭进程池,不再接受新任务,但已提交的任务会继续执行。该方法在所有任务提交后调用,通常与 join() 配合使用。

  • terminate()立即终止所有子进程,无论任务是否完成。

  • join() :阻塞主进程,等待所有子进程执行完毕,必须在 close()terminate() 之后调用。

务必调用 pool.close()pool.join() 确保所有进程正常终止。

# 进程池示例

下面通过几个示例代码展示进程池的使用。

批量文件处理:使用进程池高效处理大量图片:

from PIL import Image
import os
import multiprocessing
def process_image(path):
    try:
        with Image.open(path) as img:
            img.resize((800, 600)).save(f"resized_{path}")
            return f"{path}处理成功"
    except Exception as e:
        return f"{path}处理失败:{str(e)}"
if __name__ == '__main__':
    image_files = [f for f in os.listdir() if f.lower().endswith(('.jpg', '.jpeg'))]
    with multiprocessing.Pool(4) as pool:
        results = pool.map(process_image, image_files)
        pool.close()
        pool.join()
    
    print(results)

这段代码使用多进程并行处理当前目录下的所有 JPG/JPEG 图片,将每张图片调整为 800x600 的尺寸并保存为前缀为 resized_ 的新文件,同时捕获并返回处理过程中的成功或失败信息,最后打印所有处理结果。

异步文件下载:异步下载多个文件并实时显示进度。

import multiprocessing
import random
import time
from tqdm import tqdm
def download_file(url):
    try:
        time.sleep(random.uniform(0.5, 2.5)) # 模拟下载延迟
        if random.random() < 0.1:
            raise ConnectionError("连接超时")
        return {"status": "success", "url": url, "message": f"已下载 {url}"}
    except Exception as e:
        return {"status": "error", "url": url, "message": str(e)}
if __name__ == '__main__':
	urls = [f"https://file{i}.zip" for i in range(20)]
    with multiprocessing.Pool(4) as pool:
        with tqdm(total=len(urls), desc="下载进度", unit="file") as pbar:
            results = []
            for result in pool.imap_unordered(download_file, urls, chunksize=3):
                pbar.update(1)
                if result["status"] == "success":
                    pbar.set_postfix_str(f"最新成功: {result['url']}")
                else:
                    pbar.set_postfix_str(f"最新失败: {result['url']}", refresh=False)
                results.append(result)
    
    # 输出统计信息
    success_count = sum(1 for r in results if r["status"] == "success")
    print(f"\n下载完成: 成功 {success_count}/{len(urls)}")

这段代码使用多进程并行下载 20 个模拟文件,通过 tqdm 实时显示下载进度,并捕获下载过程中的成功或失败信息,最后统计并输出成功下载的文件数量。


# 参考资料