线程安全函数对象
https://github.com/nodejs/node-addon-api/blob/main/doc/threadsafe.md
JavaScript 函数只能在原生模块的主线程上被正常调用,如果原生模块创建了额外的线程,与 Napi 相关的 API 都不能在其他线程中被使用。
解决办法是,如果有需要在子线程执行结束后才能执行的 JavaScript 函数,则子线程必须与主线程通信,在子线程执行完毕后由主线程代为执行 JavaScript 函数。
一个典型的场景就是,JavaScript 调用原生函数并传递一个回调函数,该回调函数必须在原生函数任务执行完成时才能被调用。
NAPI 提供了线程安全函数对象来解决这个问题,具体实现有ThreadSafeFunction
和TypedThreadSafeFunction
。
表面上是在子线程中通过线程安全函数对象执行回调函数,实际上这个回调函数是由主线程完成执行的。
[typed]ThreadSafeFunction
类实现了静态方法New()
创建一个可以在多个线程中被调用的 JavaScript 函数的强引用。
[typed]ThreadSafeFunction
对象的销毁必须等到所有使用它的线程都正确结束通信,这可以分为两种情况:
- 调用其
BlockingCall()
或者NonBlockingCall()
方法时返回napi_closing
,意味着调用异常 - 否则,就要在该线程代码最后调用其
Release()
方法手动结束通信
Release()
一般放在线程代码最后调用,因为如果在中间调用,[typed]ThreadSafeFunction
对象在该线程中可能就不存在了。
同理,调用[Non]BlockingCall()
方法返回napi_closing
时也不应该再执行后续任务。
子线程需要在 Finalizer 中 join 到主线程,与[typed]ThreadSafeFunction
对象关联的数据也可以在 Finalizer 中被释放。
两种类的实现差异
选择ThreadSafeFunction
还是TypedThreadSafeFunction
很大程度上取决于如何在 Node.js 线程中执行你的 C++ 代码(回调)。
ThreadSafeFunction
ThreadSafeFunction
不支持 Node-API 5 中对可选的回调函数的原生支持,也就是说不能给回调函数参数传递一个空指针。
ThreadSafeFunction
存在一些动态特性:
[Non]BlockingCall()
方法提供了一个Function
类型的参数用来接收一个函数,该函数用来处理 C++ 主线程提供的数据并调用JavaScript 回调函数。- 为了与JavaScript 回调函数进行区分,这里将
[Non]BlockingCall()
方法接收的函数称之为 C++ 回调函数,它的内部应该调用 JavaScript 回调函数。 - C++ 回调函数 是作为参数传递的,所以在不同的线程中多次调用时,每次调用传递的函数可以不一样。
- 为了适应不同的 C++ 回调函数,每次调用时就需要传入不同类型的 C++ 数据。
这种动态特性可能会带来额外的开销,甚至内存泄漏:
ThreadSafeFunction
更像是底层napi_threadsafe_function
的“代理”,每次调用[Non]BlockingCall()
方法时在堆上构建一个回调函数的封装对象。- “代理”调用的也是底层的 API,一旦代理对象不可用(例如所有线程都被 Release 掉了)并且任务队列尚未清空时,传递给
[Non]BlockingCall()
的 C++ 回调函数将不会被执行。 - 此时,没有执行 C++ 回调函数就不能执行清理工作,如果期间涉及到堆的动态分配,因为没有执行成功的清理步骤,很可能就造成内存泄漏。
- C++ 回调函数的参数是 JavaScript 运行环境对象(
Env
)、JavaScript 回调函数(Function
)和 C++ 原生类型的数据,并没有接收与线程上下文(即线程安全对象)相关的参数,所以 C++ 回调函数只能通过下面两种方法获取线程上下文:- 直接在子线程中执行,自动获得对应线程的上下文
- 调用线程安全对象的
GetContext()
方法获取线程上下文,但是该方法并不是类型安全的
TypedThreadSafeFunction
顾名思义,在ThreadSafeFunction
的基础上加上了类型安全,解决了ThreadSafeFunction
的一系列缺点。
首先TypedThreadSafeFunction
基于 Node-API 5 对可选回调函数参数的支持开发,在调用New()
方法时允许开发者向 JavaScript 回调函数参数传递一个空指针(nullptr
)而不是Function
对象。
它能自动处理 Node-API 4 和 Node-API 5 中的可选回调函数参数,提供一个统一的 API,使开发者在切换 Node-API 版本时无需更改代码,只需要设置环境变量 NAPI_VERSION 即可。
TypedThreadSafeFunction
移除了ThreadSafeFunction
的动态特性:
- 正确清理所有资源:当 Node.js 释放线程安全函数对象后,如果任务队列不为空,回调将继续在空的 Env 环境中执行,这保证了每个回调都能被正确执行完毕,释放动态分配的资源
- 上下文类型安全:C++ 回调函数默认将上下文作为参数,因此不存在获取上下文的问题,上下文在创建
TypedThreadSafeFunction
对象时由第一个参数(Env)指定 - 声明
TypedThreadSafeFunction
对象类型时需要指定 C++ 回调函数的签名,而 C++ 回调函数本身就包含了上下文对象,所以说TypedThreadSafeFunction
是类型安全的 [Non]BlockingCall()
方法不再接收函数作为参数,C++ 回调函数签名在构建TypedThreadSafeFunction
对象时就已经确定了,消除了函数的动态性,动态性应由开发者自己实现
使用建议
下面情况使用TypedThreadSafeFunction
:
- 想使用可选的 JavaScript 回调函数参数
- 在使用
[Non]BlockingCall()
方法时固定 C++ 回调函数的签名 - 需要回收在调用过程中动态分配的资源
其他简单情况更适合用ThreadSafeFunction
。
class ThreadSafeFunction
https://github.com/nodejs/node-addon-api/blob/main/doc/threadsafe_function.md
创建 tsfn 对象
使用New()
方法在创建新的线程之前创建一个ThreadSafeFunction
对象(以//
开头的是可选参数):
ThreadSafeFunction ThreadSafeFunction::New(
napi_env env,
const Function& callback,
//const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
//ContextType* context,
//Finalizer finalizeCallback,
//FinalizerDataType* data
)
callback
要执行的 JavaScript 回调函数resourceName
资源类型标识符maxQueueSize
任务队列最大长度,0 表示允许无限大initialThreadCount
初始线程数目 ???- (optional)
finalizeCallback
在ThreadSafeFunction
对象被销毁前由主线程调用,通常用来 join 线程 - (optional)
data
调用finalizeCallback
时传递的数据
停止使用 tsfn 对象
在创建的子线程中,执行完所有任务代码后应该调用Release()
方法释放对 tsfn 对象的引用:
napi_status Napi::ThreadSafeFunction::Release() const
napi_ok
: tsfn 对象成功释放napi_invalid_arg
: tsfn 对象关联的线程数为 0napi_generic_failure
: 释放时发生错误
阻塞/非阻塞式调用
BlockingCall()
或者NonBlockingCall()
方法用来执行 JavaScript 回调函数。
napi_status Napi::ThreadSafeFunction::BlockingCall(DataType* data, Callback callback) const
napi_status Napi::ThreadSafeFunction::NonBlockingCall(DataType* data, Callback callback) const
(optional) data
是传递给callback
函数的参数。
(optional) callback
是一个 C++ 函数,它的签名是:
void operator()(Env env, Function jsCallback, DataType* data)
这里的jsCallback
才是 JavaScript 传递过来的回调函数。
返回状态
napi_ok
: 调用被成功添加到任务队列中napi_queue_full
: 以非阻塞式调用时任务对立满了napi_closing
: tsfn 对象异常,不能接受新的调用napi_invalid_arg
napi_generic_failure
回调数据
data
是传递给 C++ 回调函数的数据,传递多个数据时可以用 struct 指针。
一个例子
#include <thread>
#include <chrono>
#include <napi.h>
using namespace Napi;
ThreadSafeFunction tsfn;
std::thread t;
tsfn
对象需要在主线程中创建,子线程中调用,因此声明为全局变量。
使用一个子线程。
struct Person {
std::string name;
int year;
};
定义一个结构体,作为数据传递给 C++ 回调函数。
Boolean Say(const CallbackInfo &info) {
/* ... */
return Boolean::New(env, true);
}
Object Init(Env env, Object exports) {
exports.Set("say", Function::New(env, Say));
return exports;
}
NODE_API_MODULE(test, Init)
待暴露的 C++ 函数架子,这样我们就能在 JavaScript 调用这个函数:
const { say } = require('bindings')('test')
say((name, year) => {
console.log(`${name} will be rich in ${year}`)
}, 2022)
调用该原生函数传入了两个参数:
- 一个回调函数:
(name: string, year: number) => void
- 一个
number
类型的整数
同时我们希望原生函数在调用回调时能够传入两个参数,分别是string
类型和number
类型。
首先我们需要在 C++ 函数中解析调用参数:
Boolean Say(const CallbackInfo &info) {
Env env = info.Env();
Function jsCallback = info[0].As<Function>();
int baseYear = info[1].As<Number>().Int32Value();
/* ... */
return Boolean::New(env, true);
}
jsCallback
和baseYear
对应了我们在 JavaScript 中调用say()
传递的两个参数。
然后我们新开一个线程来执行一些任务,执行完成后再 JavaScript 回调函数。
在此之前,需要先在主线程中创建一个 tsfn 对象,该对象负责子线程与主线程的通信,确保子线程任务执行完成之后由主线程调用回调函数。
Boolean Say(const CallbackInfo &info) {
Env env = info.Env();
Function jsCallback = info[0].As<Function>();
int baseYear = info[1].As<Number>().Int32Value();
tsfn = ThreadSafeFunction::New(
env, jsCallback, "xxx", 0, 1,
[](Env) {
t.join();
}
);
/* Block A */
return Boolean::New(env, true);
}
在创建 tsfn 对象时,我们指定了运行环境、JavaScript 回调函数、资源标识符、任务队列大小和初始化线程数,同时还指定了一个 Finalizer 回调。
在 tsfn 对象被销毁之前,应该等待所有线程(此处只有一个)执行完毕。
然后在 Block A 处我们可以新建一个线程:
t = std::thread([baseYear] () {
/* Block B */
tsfn.Release();
});
新建的线程需要执行一个任务函数,该函数是一个匿名函数,需要捕获上文的baseYear
变量。
在任务执行完毕时应该调用tsfn.Release()
,表示本线程用完你了,不需要再占用你了。
在 Block B 处,我们需要调用tsfn.[Non]BlockingCall()
来执行 JavaScript 回调函数。
为此我们需要先构造一个 C++ 回调函数:
t = std::thread([baseYear] () {
auto cppCallback = [](Env env, Function jsCallback, Person *person){
jsCallback.Call({ String::New(env, person->name), Number::New(env, person->year) });
};
/* Block C */
tsfn.Release();
});
该函数的参数一般是固定的:Env 对象,JavaScript 回调函数和一个 C++ 原生数据。
在该函数内部,我们通过Call()
正式调用 JavaScript 回调函数,并传入两个 JavaScript 类型的参数,分别对应 name 和 year。
在 Block C 处我们执行一些 C++ 代码,可以单次或者多次调用 JavaScript 函数:
for (int i = 0; i < 3; i++) {
Person person = {
name: "person" + std::to_string(i),
year: baseYear + i
};
Person *data = &person;
napi_status st = tsfn.BlockingCall(data, cppCallback);
// 如果想在执行 JavaScript 回调之后还做一些其他事情,必须先检测调用返回状态
if (st != napi_ok) {
break;
}
// 模拟耗时操作
std::this_thread::sleep_for(std::chrono::seconds(1));
}
完整的实例代码如下:
#include <thread>
#include <chrono>
#include <napi.h>
using namespace Napi;
ThreadSafeFunction tsfn;
std::thread t;
struct Person {
std::string name;
int year;
};
Boolean Say(const CallbackInfo &info) {
Env env = info.Env();
Function jsCallback = info[0].As<Function>();
int baseYear = info[1].As<Number>().Int32Value();
tsfn = ThreadSafeFunction::New(
env, jsCallback, "xxx", 0, 1,
[](Env) {
t.join();
}
);
t = std::thread([baseYear] () {
auto cppCallback = [](Env env, Function jsCallback, Person *person){
jsCallback.Call({ String::New(env, person->name), Number::New(env, person->year) });
};
for (int i = 0; i < 3; i++) {
Person person = {
name: "person" + std::to_string(i),
year: baseYear + i
};
Person *data = &person;
napi_status st = tsfn.BlockingCall(data, cppCallback);
if (st != napi_ok) {
break;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
tsfn.Release();
});
return Boolean::New(env, true);
}
Object Init(Env env, Object exports) {
exports.Set("say", Function::New(env, Say));
return exports;
}
NODE_API_MODULE(test, Init)
class TypedThreadSafeFunction
https://github.com/nodejs/node-addon-api/blob/main/doc/typed_threadsafe_function.md
基本上与 ThreadSafeFunction 差不多,用起来稍微复杂点。
首先声明 tsfn 对象类型时需要指定 C++ 回调的签名:
using Context = Reference<Value>;
using DataType = int;
void cppCallback(Env env, Function jsCallback, Context *context, DataType *data);
using TSFN = TypedThreadSafeFunction<Context, DataType, cppCallback>;
using FinalizerDataType = void;
TSFN tsfn;
在创建 tsfn 对象时需要传入 context,销毁 tsfn 对象时需要释放 context 数据:
Context *context = new Reference<Value>(Persistent(info.This()));
tsfn = TSFN::New(
env,
jsCallback,
"resource name",
0,
1,
context,
[](Env, FinalizerDataType *, Context *ctx){
t.join();
delete ctx;
}
)
Finalizer 回调中前两个参数都没用到。
实现 C++ 回调:
void cppCallback(Env env, Function jsCallback, Context *context, DataType *data) {
// env, jsCallback, data 都可能是 nullptr
}
调用[Non]BlockingCall()
时不再需要传递 C++ 回调,只需要传递回调需要的数据。
评论区