piscina 是 Node.js 中提供 工作(进程,线程)池 的工具。
特点:
- 线程间的快速通信
- 覆盖固定任务和可变任务场景
- 固定任务:一个线程执行指定的任务
- 可变任务:由调度线程分配任务
- 支持灵活设置工作池大小
- 提供异步追踪
- 运行和等待时长统计
- 支持取消任务
- 支持限制内存资源使用
- 支持 CommonJS、ESM 和 TypeScript
- 自定义任务队列
- 设置 CPU 调度优先级(Linux)
piscina 是用 TypeScript 写的,需要 Node.js 12.x 及以上版本。
example
分配任务并收集任务结果的是 主线程(main),执行任务并返回结果的是 工作线程(worker)。
每一个工作线程都会执行一个在单独文件中导出的函数,并将该函数的返回值返回给主线程。
API
new Piscina
// https://github.com/piscinajs/piscina#constructor-new-piscinaoptions
new Piscina(options?: Options | undefined): Piscina
支持的选项:
filename
: string | null 指定一个文件路径,该文件需要默认导出一个函数(同步或者异步)供工作线程执行name
: string | null 默认为'default'
,表示使用上述文件的默认导出函数,可以改为其他名称,从而使用其他非默认导出的函数minThreads
: number 线程池运行的最小线程数,默认根据可用的 CPU 进行设置maxThreads
: number 线程池运行的最大线程数,默认根据可用的 CPU 进行设置idleTimeout
: number 单位 毫秒,表示线程不执行任务时可以继续等待多长时间,超过这个时间空闲线程将关闭。默认情况下线程执行完任务后会立即关闭,默认设置在执行 任务量多、单个任务耗时短 的任务集合时,会损失一定的性能,因为会频繁的关闭和开启线程。这种情况下,显式设置该参数,避免线程频繁被关闭和启动。maxQueue
: number | string 设置 可能 运行的最大任务数量(并不等于实际同时运行的任务数量,因为线程可能不够用)。默认情况下,这个数量不做限制。可以指定为'auto'
,此时 piscina 将其设置为maxThreads
参数值的平方,这个值可以通过options.maxQueue
获取到concurrentTasksPerWorker
: number 默认值为 1,表示一个工作线程可以同时被多少个任务共享,这在任务中包含异步步骤时可能有用,虽然工作线程可以执行异步任务,然而我们需要时刻牢记:工作线程不是为处理 I/O 而设计的useAtomics
: boolean 默认开启,使用 原子 API 实现线程间的快速通信,通过设置环境变量PISCINA_DISABLE_ATOmICS=1
可以禁用该选项resourceLimits
: object (cautious)限制线程使用的资源,达到资源上限 JS 引擎会关闭工作线程。需要注意的是,即时限制了线程的资源上限,主进程仍然可能因为整体资源超过内存而崩溃。该配置项与 Node.js new Worker options 一致,主要有下面这四个东西:maxOldGenerationSizeMb
: number 每个工作线程 主堆 的最大值maxYoungGenerationSizeMb
: number 最近创建的对象占用的 堆 的最大值codeRangeSizeMb
: number 为生成代码预分配的内存范围stackSizeMb
: number 默认为 4,线程栈的最大值,值太小可能导致 Worker 实例不可用
env
: object 参考 new Worker 的 options.env 部分,默认值是process.env
,用来初始化工作线程的 process.env。设置为require('worker_threads').SHARE_ENV
表示工作线程与主线程共享 process.env,当一个线程中 process.env 变化时,另一个线程中的 process.env 也会跟着变化argv
: any[] 参考 new Worker 的 options.argv 部分。指定一个参数列表,它将以命令行参数的形式传递给工作线程,因此会被复制到工作线程的process.argv
上。它与workData
有些类似,都是向工作进程中传递数据。execArgv
: string[] 参考 new Worker 的 options.execArgv 部分。将 node CLI 选项(node 启动主线程的参数)传递给工作线程(一些能影响 主进程 的参数会被忽略),这些参数在工作线程中能通过process.execArgv
获取到。默认情况下,该值会自动从父线程继承workerData
: any 参考 new Worker 的 options.workData 部分。argv
只能传递可以被字符串化的参数,而这个workerData
则可以传递任意一个可以被克隆的 JavaScript 对象taskQueue
: TaskQueue 默认情况下,piscina 使用 先进先出 的队列提交任务,这个参数允许修改为其他自定义类型的任务队列niceIncrement
: number (optional, Linux) 管理工作线程的优先级,需要配合 nice-napi 模块使用trackUnmanagedEds
: boolean 默认为开启,允许工作线程追踪通过fs.open()
和fs.close()
管理的 文件描述符,工作线程在退出时会自动关闭它们。
piscina.run(task, [options])
将一个任务分配给一个工作线程,即执行对应的函数,task
参数就是传递给那个函数的参数。
选项:
transferList
: (optional) 指定一个对象列表,这些数据会被 转交 给工作线程,而不是 克隆filename
: (optional) 任务函数所在的文件,会覆盖默认的文件(new Piscina 时指定的文件)name
: (optional) 任务函数,使用默认导出的函数值为default
,其他非默认函数为函数名abortSignal
: AbortSignalp[] 用来取消任务、停止任务
此函数是 异步 的。
piscina.destroy()
停止所有工作线程,被挂起的任务的 Promise 将被 reject
如果所有线程停下了,此函数返回的 Promise 将变为 fullfilled
它也是异步函数。
Event
'error'
由工作线程抛出的未捕获的异常,或者工作线程发送的未知的消息'drain'
任务队列中没有任务时'message'
从工作线程中收到消息时
Property
completed
readonly 当前完成的任务数duration
readonly 距离 Piscina 实例创建的时间,单位毫秒options
readonly 构造 Piscina 实例时传入的选项对象的拷贝runTime
readonly 统计已完成任务的直方图数据,可以用来绘图threads
readonly 线程池对应的 Worker 实例列表queueSize
readonly 任务队列中的任务数量utilization
readonlywaitTime
readonly
Static Property
isWorkerThread
readonly 代码是否在工作线程中执行versin
readonly
Static Method
move
import { move } from 'piscina'
默认情况下,工作线程执行完任务函数的返回值会被 克隆 到主线程,即时这个返回值是 可转移的。
这个函数可以包装 可转移的 返回值,是它们 转移到 主线程,而不是 克隆 回去。
move 函数包装的对象必须是 Node.js 支持的 可转移 对象(实现了Transferable
接口),例如ArrayBuffer
, TypedArray
or MessagePort
),如果封装对象是不可转移的,move 函数将抛出错误。
可转移对象应直接传递给 move 函数,而不是使用嵌套的方式。
评论区