侧边栏壁纸
  • 累计撰写 218 篇文章
  • 累计创建 59 个标签
  • 累计收到 5 条评论

Piscina 基础知识:Base API

barwe
2022-08-18 / 0 评论 / 0 点赞 / 1,750 阅读 / 3,340 字
温馨提示:
本文最后更新于 2022-08-18,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

piscina 是 Node.js 中提供 工作(进程,线程)池 的工具。

github: https://github.com/piscinajs/piscina

特点:

  • 线程间的快速通信
  • 覆盖固定任务和可变任务场景
    • 固定任务:一个线程执行指定的任务
    • 可变任务:由调度线程分配任务
  • 支持灵活设置工作池大小
  • 提供异步追踪
  • 运行和等待时长统计
  • 支持取消任务
  • 支持限制内存资源使用
  • 支持 CommonJS、ESM 和 TypeScript
  • 自定义任务队列
  • 设置 CPU 调度优先级(Linux)

piscina 是用 TypeScript 写的,需要 Node.js 12.x 及以上版本。

example

分配任务并收集任务结果的是 主线程(main),执行任务并返回结果的是 工作线程(worker)。

每一个工作线程都会执行一个在单独文件中导出的函数,并将该函数的返回值返回给主线程。

API

new Piscina

// https://github.com/piscinajs/piscina#constructor-new-piscinaoptions
new Piscina(options?: Options | undefined): Piscina

支持的选项:

  • filename: string | null 指定一个文件路径,该文件需要默认导出一个函数(同步或者异步)供工作线程执行
  • name: string | null 默认为'default',表示使用上述文件的默认导出函数,可以改为其他名称,从而使用其他非默认导出的函数
  • minThreads: number 线程池运行的最小线程数,默认根据可用的 CPU 进行设置
  • maxThreads: number 线程池运行的最大线程数,默认根据可用的 CPU 进行设置
  • idleTimeout: number 单位 毫秒,表示线程不执行任务时可以继续等待多长时间,超过这个时间空闲线程将关闭。默认情况下线程执行完任务后会立即关闭,默认设置在执行 任务量多、单个任务耗时短 的任务集合时,会损失一定的性能,因为会频繁的关闭和开启线程。这种情况下,显式设置该参数,避免线程频繁被关闭和启动。
  • maxQueue: number | string 设置 可能 运行的最大任务数量(并不等于实际同时运行的任务数量,因为线程可能不够用)。默认情况下,这个数量不做限制。可以指定为'auto',此时 piscina 将其设置为maxThreads参数值的平方,这个值可以通过options.maxQueue获取到
  • concurrentTasksPerWorker: number 默认值为 1,表示一个工作线程可以同时被多少个任务共享,这在任务中包含异步步骤时可能有用,虽然工作线程可以执行异步任务,然而我们需要时刻牢记:工作线程不是为处理 I/O 而设计的
  • useAtomics: boolean 默认开启,使用 原子 API 实现线程间的快速通信,通过设置环境变量PISCINA_DISABLE_ATOmICS=1可以禁用该选项
  • resourceLimits: object (cautious)限制线程使用的资源,达到资源上限 JS 引擎会关闭工作线程。需要注意的是,即时限制了线程的资源上限,主进程仍然可能因为整体资源超过内存而崩溃。该配置项与 Node.js new Worker options 一致,主要有下面这四个东西:
    • maxOldGenerationSizeMb: number 每个工作线程 主堆 的最大值
    • maxYoungGenerationSizeMb: number 最近创建的对象占用的 的最大值
    • codeRangeSizeMb: number 为生成代码预分配的内存范围
    • stackSizeMb: number 默认为 4,线程栈的最大值,值太小可能导致 Worker 实例不可用
  • env: object 参考 new Worker 的 options.env 部分,默认值是process.env,用来初始化工作线程的 process.env。设置为require('worker_threads').SHARE_ENV表示工作线程与主线程共享 process.env,当一个线程中 process.env 变化时,另一个线程中的 process.env 也会跟着变化
  • argv: any[] 参考 new Worker 的 options.argv 部分。指定一个参数列表,它将以命令行参数的形式传递给工作线程,因此会被复制到工作线程的process.argv上。它与workData有些类似,都是向工作进程中传递数据。
  • execArgv: string[] 参考 new Worker 的 options.execArgv 部分。将 node CLI 选项(node 启动主线程的参数)传递给工作线程(一些能影响 主进程 的参数会被忽略),这些参数在工作线程中能通过process.execArgv获取到。默认情况下,该值会自动从父线程继承
  • workerData: any 参考 new Worker 的 options.workData 部分。argv只能传递可以被字符串化的参数,而这个workerData则可以传递任意一个可以被克隆的 JavaScript 对象
  • taskQueue: TaskQueue 默认情况下,piscina 使用 先进先出 的队列提交任务,这个参数允许修改为其他自定义类型的任务队列
  • niceIncrement: number (optional, Linux) 管理工作线程的优先级,需要配合 nice-napi 模块使用
  • trackUnmanagedEds: boolean 默认为开启,允许工作线程追踪通过fs.open()fs.close()管理的 文件描述符,工作线程在退出时会自动关闭它们。

piscina.run(task, [options])

https://github.com/piscinajs/piscina#method-runtask-options

将一个任务分配给一个工作线程,即执行对应的函数,task参数就是传递给那个函数的参数。

选项:

  • transferList: (optional) 指定一个对象列表,这些数据会被 转交 给工作线程,而不是 克隆
  • filename: (optional) 任务函数所在的文件,会覆盖默认的文件(new Piscina 时指定的文件)
  • name: (optional) 任务函数,使用默认导出的函数值为default,其他非默认函数为函数名
  • abortSignal: AbortSignalp[] 用来取消任务、停止任务

此函数是 异步 的。

piscina.destroy()

停止所有工作线程,被挂起的任务的 Promise 将被 reject

如果所有线程停下了,此函数返回的 Promise 将变为 fullfilled

它也是异步函数。

Event

  • 'error' 由工作线程抛出的未捕获的异常,或者工作线程发送的未知的消息
  • 'drain' 任务队列中没有任务时
  • 'message' 从工作线程中收到消息时

Property

  • completed readonly 当前完成的任务数
  • duration readonly 距离 Piscina 实例创建的时间,单位毫秒
  • options readonly 构造 Piscina 实例时传入的选项对象的拷贝
  • runTime readonly 统计已完成任务的直方图数据,可以用来绘图
  • threads readonly 线程池对应的 Worker 实例列表
  • queueSize readonly 任务队列中的任务数量
  • utilization readonly
  • waitTime readonly

Static Property

  • isWorkerThread readonly 代码是否在工作线程中执行
  • versin readonly

Static Method

move

import { move } from 'piscina'

默认情况下,工作线程执行完任务函数的返回值会被 克隆 到主线程,即时这个返回值是 可转移的

这个函数可以包装 可转移的 返回值,是它们 转移到 主线程,而不是 克隆 回去。

move 函数包装的对象必须是 Node.js 支持的 可转移 对象(实现了Transferable接口),例如ArrayBuffer, TypedArray or MessagePort),如果封装对象是不可转移的,move 函数将抛出错误。

可转移对象应直接传递给 move 函数,而不是使用嵌套的方式。

0

评论区