侧边栏壁纸
  • 累计撰写 218 篇文章
  • 累计创建 59 个标签
  • 累计收到 5 条评论

NAPI 笔记 11:指定 JS 回调时的线程安全

barwe
2022-09-06 / 0 评论 / 0 点赞 / 4,890 阅读 / 8,141 字
温馨提示:
本文最后更新于 2023-06-07,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

线程安全函数对象

https://github.com/nodejs/node-addon-api/blob/main/doc/threadsafe.md

JavaScript 函数只能在原生模块的主线程上被正常调用,如果原生模块创建了额外的线程,与 Napi 相关的 API 都不能在其他线程中被使用。

解决办法是,如果有需要在子线程执行结束后才能执行的 JavaScript 函数,则子线程必须与主线程通信,在子线程执行完毕后由主线程代为执行 JavaScript 函数。

一个典型的场景就是,JavaScript 调用原生函数并传递一个回调函数,该回调函数必须在原生函数任务执行完成时才能被调用。

NAPI 提供了线程安全函数对象来解决这个问题,具体实现有ThreadSafeFunctionTypedThreadSafeFunction

表面上是在子线程中通过线程安全函数对象执行回调函数,实际上这个回调函数是由主线程完成执行的。

[typed]ThreadSafeFunction类实现了静态方法New()创建一个可以在多个线程中被调用的 JavaScript 函数的强引用。

[typed]ThreadSafeFunction对象的销毁必须等到所有使用它的线程都正确结束通信,这可以分为两种情况:

  • 调用其BlockingCall()或者NonBlockingCall()方法时返回napi_closing,意味着调用异常
  • 否则,就要在该线程代码最后调用其Release()方法手动结束通信

Release()一般放在线程代码最后调用,因为如果在中间调用,[typed]ThreadSafeFunction对象在该线程中可能就不存在了。

同理,调用[Non]BlockingCall()方法返回napi_closing时也不应该再执行后续任务。

子线程需要在 Finalizer 中 join 到主线程,与[typed]ThreadSafeFunction对象关联的数据也可以在 Finalizer 中被释放。

两种类的实现差异

选择ThreadSafeFunction还是TypedThreadSafeFunction很大程度上取决于如何在 Node.js 线程中执行你的 C++ 代码(回调)。

ThreadSafeFunction

ThreadSafeFunction不支持 Node-API 5 中对可选的回调函数的原生支持,也就是说不能给回调函数参数传递一个空指针

ThreadSafeFunction存在一些动态特性:

  • [Non]BlockingCall()方法提供了一个Function类型的参数用来接收一个函数,该函数用来处理 C++ 主线程提供的数据并调用JavaScript 回调函数
  • 为了与JavaScript 回调函数进行区分,这里将[Non]BlockingCall()方法接收的函数称之为 C++ 回调函数,它的内部应该调用 JavaScript 回调函数
  • C++ 回调函数 是作为参数传递的,所以在不同的线程中多次调用时,每次调用传递的函数可以不一样。
  • 为了适应不同的 C++ 回调函数,每次调用时就需要传入不同类型的 C++ 数据。

这种动态特性可能会带来额外的开销,甚至内存泄漏:

  • ThreadSafeFunction更像是底层napi_threadsafe_function的“代理”,每次调用[Non]BlockingCall()方法时在上构建一个回调函数的封装对象。
  • “代理”调用的也是底层的 API,一旦代理对象不可用(例如所有线程都被 Release 掉了)并且任务队列尚未清空时,传递给[Non]BlockingCall()的 C++ 回调函数将不会被执行
  • 此时,没有执行 C++ 回调函数就不能执行清理工作,如果期间涉及到堆的动态分配,因为没有执行成功的清理步骤,很可能就造成内存泄漏
  • C++ 回调函数的参数是 JavaScript 运行环境对象Env)、JavaScript 回调函数Function)和 C++ 原生类型的数据,并没有接收与线程上下文(即线程安全对象)相关的参数,所以 C++ 回调函数只能通过下面两种方法获取线程上下文:
    • 直接在子线程中执行,自动获得对应线程的上下文
    • 调用线程安全对象GetContext()方法获取线程上下文,但是该方法并不是类型安全

TypedThreadSafeFunction

顾名思义,在ThreadSafeFunction的基础上加上了类型安全,解决了ThreadSafeFunction的一系列缺点。

首先TypedThreadSafeFunction基于 Node-API 5 对可选回调函数参数的支持开发,在调用New()方法时允许开发者向 JavaScript 回调函数参数传递一个空指针(nullptr)而不是Function对象。

它能自动处理 Node-API 4 和 Node-API 5 中的可选回调函数参数,提供一个统一的 API,使开发者在切换 Node-API 版本时无需更改代码,只需要设置环境变量 NAPI_VERSION 即可。

TypedThreadSafeFunction移除了ThreadSafeFunction的动态特性:

  • 正确清理所有资源:当 Node.js 释放线程安全函数对象后,如果任务队列不为空,回调将继续在空的 Env 环境中执行,这保证了每个回调都能被正确执行完毕,释放动态分配的资源
  • 上下文类型安全:C++ 回调函数默认将上下文作为参数,因此不存在获取上下文的问题,上下文在创建TypedThreadSafeFunction对象时由第一个参数(Env)指定
  • 声明TypedThreadSafeFunction对象类型时需要指定 C++ 回调函数的签名,而 C++ 回调函数本身就包含了上下文对象,所以说TypedThreadSafeFunction是类型安全的
  • [Non]BlockingCall()方法不再接收函数作为参数,C++ 回调函数签名在构建TypedThreadSafeFunction对象时就已经确定了,消除了函数的动态性,动态性应由开发者自己实现

使用建议

下面情况使用TypedThreadSafeFunction

  • 想使用可选的 JavaScript 回调函数参数
  • 在使用[Non]BlockingCall()方法时固定 C++ 回调函数的签名
  • 需要回收在调用过程中动态分配的资源

其他简单情况更适合用ThreadSafeFunction

class ThreadSafeFunction

https://github.com/nodejs/node-addon-api/blob/main/doc/threadsafe_function.md

创建 tsfn 对象

使用New()方法在创建新的线程之前创建一个ThreadSafeFunction对象(以//开头的是可选参数):

ThreadSafeFunction ThreadSafeFunction::New(
    napi_env env,
    const Function& callback,
    //const Object& resource,
    ResourceString resourceName,
    size_t maxQueueSize,
    size_t initialThreadCount,
    //ContextType* context,
    //Finalizer finalizeCallback,
    //FinalizerDataType* data
)
  • callback 要执行的 JavaScript 回调函数
  • resourceName 资源类型标识符
  • maxQueueSize 任务队列最大长度,0 表示允许无限大
  • initialThreadCount 初始线程数目 ???
  • (optional) finalizeCallbackThreadSafeFunction对象被销毁前主线程调用,通常用来 join 线程
  • (optional) data 调用finalizeCallback时传递的数据

停止使用 tsfn 对象

在创建的子线程中,执行完所有任务代码后应该调用Release()方法释放对 tsfn 对象的引用:

napi_status Napi::ThreadSafeFunction::Release() const
  • napi_ok: tsfn 对象成功释放
  • napi_invalid_arg: tsfn 对象关联的线程数为 0
  • napi_generic_failure: 释放时发生错误

阻塞/非阻塞式调用

BlockingCall()或者NonBlockingCall()方法用来执行 JavaScript 回调函数。

napi_status Napi::ThreadSafeFunction::BlockingCall(DataType* data, Callback callback) const
napi_status Napi::ThreadSafeFunction::NonBlockingCall(DataType* data, Callback callback) const

(optional) data是传递给callback函数的参数。

(optional) callback是一个 C++ 函数,它的签名是:

void operator()(Env env, Function jsCallback, DataType* data)

这里的jsCallback才是 JavaScript 传递过来的回调函数。

返回状态

  • napi_ok: 调用被成功添加到任务队列中
  • napi_queue_full: 以非阻塞式调用时任务对立满了
  • napi_closing: tsfn 对象异常,不能接受新的调用
  • napi_invalid_arg
  • napi_generic_failure

回调数据

data是传递给 C++ 回调函数的数据,传递多个数据时可以用 struct 指针。

一个例子

#include <thread>
#include <chrono>
#include <napi.h>

using namespace Napi;

ThreadSafeFunction tsfn;
std::thread t;

tsfn对象需要在主线程中创建,子线程中调用,因此声明为全局变量。

使用一个子线程。

struct Person {
    std::string name;
    int year;
};

定义一个结构体,作为数据传递给 C++ 回调函数

Boolean Say(const CallbackInfo &info) {
    /* ... */
    return Boolean::New(env, true);
}

Object Init(Env env, Object exports) {
    exports.Set("say", Function::New(env, Say));
    return exports;
}

NODE_API_MODULE(test, Init)

待暴露的 C++ 函数架子,这样我们就能在 JavaScript 调用这个函数:

const { say } = require('bindings')('test')

say((name, year) => {
  console.log(`${name} will be rich in ${year}`)
}, 2022)

调用该原生函数传入了两个参数:

  • 一个回调函数:(name: string, year: number) => void
  • 一个number类型的整数

同时我们希望原生函数在调用回调时能够传入两个参数,分别是string类型和number类型。

首先我们需要在 C++ 函数中解析调用参数:

Boolean Say(const CallbackInfo &info) {
    Env env = info.Env();
    Function jsCallback = info[0].As<Function>();
    int baseYear = info[1].As<Number>().Int32Value();

    /* ... */

    return Boolean::New(env, true);
}

jsCallbackbaseYear对应了我们在 JavaScript 中调用say()传递的两个参数。

然后我们新开一个线程来执行一些任务,执行完成后再 JavaScript 回调函数。

在此之前,需要先在主线程中创建一个 tsfn 对象,该对象负责子线程与主线程的通信,确保子线程任务执行完成之后由主线程调用回调函数。

Boolean Say(const CallbackInfo &info) {
    Env env = info.Env();
    Function jsCallback = info[0].As<Function>();
    int baseYear = info[1].As<Number>().Int32Value();

    tsfn = ThreadSafeFunction::New(
        env, jsCallback, "xxx", 0, 1,
        [](Env) {
            t.join();
        }    
    );

    /* Block A */

    return Boolean::New(env, true);
}

在创建 tsfn 对象时,我们指定了运行环境、JavaScript 回调函数、资源标识符、任务队列大小和初始化线程数,同时还指定了一个 Finalizer 回调。

在 tsfn 对象被销毁之前,应该等待所有线程(此处只有一个)执行完毕。

然后在 Block A 处我们可以新建一个线程:

t = std::thread([baseYear] () {
    /* Block B */
    tsfn.Release();
});

新建的线程需要执行一个任务函数,该函数是一个匿名函数,需要捕获上文的baseYear变量。

在任务执行完毕时应该调用tsfn.Release(),表示本线程用完你了,不需要再占用你了。

在 Block B 处,我们需要调用tsfn.[Non]BlockingCall()来执行 JavaScript 回调函数

为此我们需要先构造一个 C++ 回调函数

t = std::thread([baseYear] () {
    auto cppCallback = [](Env env, Function jsCallback, Person *person){
        jsCallback.Call({ String::New(env, person->name), Number::New(env, person->year) });
    };
    /* Block C */
    tsfn.Release();
});

该函数的参数一般是固定的:Env 对象,JavaScript 回调函数和一个 C++ 原生数据。

在该函数内部,我们通过Call()正式调用 JavaScript 回调函数,并传入两个 JavaScript 类型的参数,分别对应 name 和 year。

在 Block C 处我们执行一些 C++ 代码,可以单次或者多次调用 JavaScript 函数:

for (int i = 0; i < 3; i++) {
    Person person = {
        name: "person" + std::to_string(i),
        year: baseYear + i
    };
    Person *data = &person;
    
    napi_status st = tsfn.BlockingCall(data, cppCallback);
    
    // 如果想在执行 JavaScript 回调之后还做一些其他事情,必须先检测调用返回状态
    if (st != napi_ok) {
        break;
    }

    // 模拟耗时操作
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

完整的实例代码如下:

#include <thread>
#include <chrono>
#include <napi.h>

using namespace Napi;

ThreadSafeFunction tsfn;
std::thread t;

struct Person {
    std::string name;
    int year;
};

Boolean Say(const CallbackInfo &info) {
    Env env = info.Env();
    Function jsCallback = info[0].As<Function>();
    int baseYear = info[1].As<Number>().Int32Value();

    tsfn = ThreadSafeFunction::New(
        env, jsCallback, "xxx", 0, 1,
        [](Env) {
            t.join();
        }    
    );

    t = std::thread([baseYear] () {
        auto cppCallback = [](Env env, Function jsCallback, Person *person){
            jsCallback.Call({ String::New(env, person->name), Number::New(env, person->year) });
        };

        for (int i = 0; i < 3; i++) {
            Person person = {
                name: "person" + std::to_string(i),
                year: baseYear + i
            };
            Person *data = &person;
            napi_status st = tsfn.BlockingCall(data, cppCallback);
            if (st != napi_ok) {
                break;
            }

            std::this_thread::sleep_for(std::chrono::seconds(1));
        }

        tsfn.Release();
    });

    return Boolean::New(env, true);
}

Object Init(Env env, Object exports) {
    exports.Set("say", Function::New(env, Say));
    return exports;
}

NODE_API_MODULE(test, Init)

class TypedThreadSafeFunction

https://github.com/nodejs/node-addon-api/blob/main/doc/typed_threadsafe_function.md

基本上与 ThreadSafeFunction 差不多,用起来稍微复杂点。

首先声明 tsfn 对象类型时需要指定 C++ 回调的签名:

using Context = Reference<Value>;
using DataType = int;
void cppCallback(Env env, Function jsCallback, Context *context, DataType *data);
using TSFN = TypedThreadSafeFunction<Context, DataType, cppCallback>;
using FinalizerDataType = void;

TSFN tsfn;

在创建 tsfn 对象时需要传入 context,销毁 tsfn 对象时需要释放 context 数据:

Context *context = new Reference<Value>(Persistent(info.This()));

tsfn = TSFN::New(
	env,
    jsCallback,
    "resource name",
    0,
    1,
    context,
    [](Env, FinalizerDataType *, Context *ctx){
        t.join();
        delete ctx;
    }
)

Finalizer 回调中前两个参数都没用到。

实现 C++ 回调:

void cppCallback(Env env, Function jsCallback, Context *context, DataType *data) {
    // env, jsCallback, data 都可能是 nullptr
}

调用[Non]BlockingCall()时不再需要传递 C++ 回调,只需要传递回调需要的数据。

0

评论区