WEBKT

Node.js 多线程进阶:worker_threads 中 Atomics 与 SharedArrayBuffer 的深度同步实践

8 0 0 0

为什么需要 Atomics 和 SharedArrayBuffer?

Atomics 对象的核心方法

1. Atomics.load(typedArray, index)

2. Atomics.store(typedArray, index, value)

3. Atomics.compareExchange(typedArray, index, expectedValue, replacementValue)

4. Atomics.wait(typedArray, index, value, timeout)

5. Atomics.notify(typedArray, index, count)

其他原子操作方法

使用 Atomics 实现互斥锁和条件变量

总结与注意事项

你好,我是你们的“老朋友”——“代码挖掘机”。今天咱们不聊那些花里胡哨的框架,来聊点 Node.js 多线程编程中的硬核知识:worker_threads 模块里的 Atomics 对象以及它在 SharedArrayBuffer 线程同步中的作用。如果你已经对 Node.js 的多线程编程有了一定了解,那么相信这篇文章能让你对线程同步的理解更上一层楼。

为什么需要 Atomics 和 SharedArrayBuffer?

在传统的 JavaScript 单线程模型中,我们很少需要考虑并发问题。但是,当 Node.js 引入 worker_threads 后,多线程编程成为可能,并发问题也随之而来。多个线程同时访问和修改同一块内存区域(SharedArrayBuffer)时,如果没有适当的同步机制,就会出现数据竞争(Data Race),导致程序行为不可预测。

Atomics 对象提供了一组原子操作,可以保证对 SharedArrayBuffer 的读写操作是“原子”的,即不可中断的。这意味着,在一个线程执行原子操作期间,其他线程无法访问或修改同一内存位置的数据。这就像给共享数据加了一把“锁”,确保同一时刻只有一个线程能操作它。

Atomics 对象的核心方法

Atomics 对象提供了一系列静态方法,用于对 SharedArrayBuffer 进行原子操作。下面介绍几个核心方法,并通过代码示例演示其用法。

1. Atomics.load(typedArray, index)

原子地读取 typedArray 中指定 index 位置的值。

const sab = new SharedArrayBuffer(1024);
const int32 = new Int32Array(sab);
// 初始值
int32[0] = 10;
// 在一个 worker 线程中
// const { port } = require('worker_threads');
// port.on('message', () => {
// console.log('从主线程读取的值:', Atomics.load(int32, 0)); // 原子地读取值
// port.close();
// });
// 模拟主线程的操作,直接打印
console.log('从主线程读取的值:', Atomics.load(int32, 0));

2. Atomics.store(typedArray, index, value)

原子地将 value 写入 typedArray 中指定 index 的位置,并返回旧值。

const sab = new SharedArrayBuffer(1024);
const int32 = new Int32Array(sab);
// 在一个 worker 线程中
// const { port } = require('worker_threads');
// port.on('message', () => {
// const oldValue = Atomics.store(int32, 0, 20); // 原子地写入新值
// console.log('旧值:', oldValue); // 输出旧值
// console.log('新值:', Atomics.load(int32, 0)); // 再次读取确认
// port.close();
// });
// 模拟主线程
const oldValue = Atomics.store(int32, 0, 20); // 原子地写入新值
console.log('旧值:', oldValue); // 输出旧值
console.log('新值:', Atomics.load(int32, 0)); // 再次读取确认

3. Atomics.compareExchange(typedArray, index, expectedValue, replacementValue)

原子地比较 typedArrayindex 位置的值与 expectedValue。如果相等,则将 index 位置的值替换为 replacementValue,并返回旧值;否则,不进行任何操作,直接返回旧值。

这个方法是实现互斥锁(Mutex)的基础。

const sab = new SharedArrayBuffer(1024);
const int32 = new Int32Array(sab);
// 初始值
int32[0] = 0; // 0 表示锁空闲,1 表示锁被占用
// 尝试获取锁
function acquireLock() {
while (true) {
const oldValue = Atomics.compareExchange(int32, 0, 0, 1);
if (oldValue === 0) {
// 获取锁成功
return;
}
// 锁被占用,等待一段时间后重试,或者使用 Atomics.wait
Atomics.wait(int32, 0, 1); //等待
}
}
// 释放锁
function releaseLock() {
const oldValue = Atomics.store(int32, 0, 0);
if (oldValue !== 1) {
throw new Error('释放锁失败');
}
Atomics.notify(int32, 0, 1);
}
// 在一个 worker 线程中
// const { port } = require('worker_threads');
// port.on('message', () => {
// acquireLock();
// console.log('Worker 线程获取到锁,开始执行临界区代码...');
// // 模拟临界区代码执行
// for (let i = 0; i < 1000000; i++) { }
// console.log('Worker 线程执行完毕,释放锁...');
// releaseLock();
// port.close();
// });
acquireLock();
console.log('主线程获取到锁,开始执行临界区代码...');
// 模拟临界区代码执行
for (let i = 0; i < 1000000; i++) { }
console.log('主线程执行完毕,释放锁...');
releaseLock();

4. Atomics.wait(typedArray, index, value, timeout)

原子地检查 typedArrayindex 位置的值是否等于 value。如果相等,则当前线程进入休眠状态,直到被 Atomics.notify 唤醒或超时(timeout,以毫秒为单位)。如果 index 位置的值不等于 value,则立即返回 'not-equal'

Atomics.waitAtomics.notify 结合使用,可以实现条件变量。

5. Atomics.notify(typedArray, index, count)

唤醒在 typedArrayindex 位置等待的最多 count 个线程。返回实际唤醒的线程数。

// 续上例 compareExchange

其他原子操作方法

除了上述几个核心方法外,Atomics 对象还提供了一些其他原子操作方法,如:

  • Atomics.add(typedArray, index, value):原子地将 value 加到 typedArrayindex 位置的值上,并返回旧值。
  • Atomics.sub(typedArray, index, value):原子地从 typedArrayindex 位置的值减去 value,并返回旧值。
  • Atomics.and(typedArray, index, value):原子地将 typedArrayindex 位置的值与 value 进行按位与操作,并返回旧值。
  • Atomics.or(typedArray, index, value):原子地将 typedArrayindex 位置的值与 value 进行按位或操作,并返回旧值。
  • Atomics.xor(typedArray, index, value):原子地将 typedArrayindex 位置的值与 value 进行按位异或操作,并返回旧值。
  • Atomics.exchange(typedArray, index, value): 原子地将typedArrayindex位置的值设置为value,并返回旧值. 相当于 Atomics.store

这些方法的用法与 Atomics.loadAtomics.store 类似,只是执行的操作不同。你可以根据实际需要选择合适的方法。

使用 Atomics 实现互斥锁和条件变量

上面我们已经通过 Atomics.compareExchange 演示了一个简单的互斥锁实现。下面我们再来看一个更完整的互斥锁和条件变量的实现示例。

// Mutex.js
class Mutex {
constructor(sharedArrayBuffer, offset = 0) {
this.int32 = new Int32Array(sharedArrayBuffer, offset, 1);
}
acquire() {
while (true) {
const oldValue = Atomics.compareExchange(this.int32, 0, 0, 1);
if (oldValue === 0) {
return;
}
Atomics.wait(this.int32, 0, 1);
}
}
release() {
const oldValue = Atomics.store(this.int32, 0, 0);
if (oldValue !== 1) {
throw new Error('释放锁失败');
}
Atomics.notify(this.int32, 0, 1); // 只唤醒一个等待的线程
}
}
// ConditionVariable.js
class ConditionVariable {
constructor(sharedArrayBuffer, offset = 0) {
this.int32 = new Int32Array(sharedArrayBuffer, offset, 1);
this.waiters = 0; // 等待的线程数
}
wait(mutex) {
this.waiters++;
mutex.release();
Atomics.wait(this.int32, 0, 0); // 在条件变量上等待
this.waiters--;
mutex.acquire();
}
signal() {
if (this.waiters > 0) {
Atomics.notify(this.int32, 0, 1); // 唤醒一个等待的线程
}
}
broadcast() {
if (this.waiters > 0) {
Atomics.notify(this.int32, 0, this.waiters); // 唤醒所有等待的线程
}
}
}
// 使用示例
const sab = new SharedArrayBuffer(1024);
const mutex = new Mutex(sab);
const cond = new ConditionVariable(sab, 4); // 注意偏移量
let sharedCounter = new Int32Array(sab, 8); //共享计数器
// 在一个 worker 线程中
// const { Worker, isMainThread } = require('worker_threads');
// if (!isMainThread) {
// mutex.acquire();
// while (sharedCounter[0] < 10) {
// console.log("等待条件满足...");
// cond.wait(mutex);
// }
// console.log('条件满足,开始执行...');
// sharedCounter[0] = 0;
// mutex.release();
// process.exit();
// }
// 模拟主线程
mutex.acquire();
console.log("主线程增加计数器...");
for(let i = 0; i< 11; i++){
sharedCounter[0] = i;
}
cond.signal(); // 发送信号,唤醒一个等待的线程
mutex.release();
// 另一个worker线程
// new Worker(__filename);

这个例子中,我们定义了 MutexConditionVariable 两个类,分别用于实现互斥锁和条件变量。Mutex 类使用 Atomics.compareExchange 实现锁的获取和释放,ConditionVariable 类使用 Atomics.waitAtomics.notify 实现线程的等待和唤醒。

在使用示例中,主线程和 worker 线程通过 MutexConditionVariable 进行同步。worker 线程首先获取锁,然后检查共享计数器 sharedCounter 的值。如果值小于 10,则调用 cond.wait(mutex) 在条件变量上等待,并释放锁。主线程获取锁后,增加计数器的值,然后调用 cond.signal() 发送信号,唤醒一个等待的 worker 线程。worker 线程被唤醒后,重新获取锁,并继续执行。

总结与注意事项

Atomics 对象和 SharedArrayBuffer 为 Node.js 多线程编程提供了强大的同步机制。通过原子操作,我们可以避免数据竞争,实现线程安全的共享内存访问。但是,在使用 AtomicsSharedArrayBuffer 时,需要注意以下几点:

  1. 性能开销:原子操作虽然可以保证线程安全,但也会带来一定的性能开销。因此,应避免过度使用原子操作,只在必要时使用。
  2. 死锁:在使用互斥锁时,要注意避免死锁。例如,一个线程持有锁 A,等待锁 B,而另一个线程持有锁 B,等待锁 A,就会导致死锁。避免死锁的常见方法包括:按固定顺序获取锁、避免持有多个锁、使用超时机制等。
  3. 复杂性:多线程编程本身就比较复杂,使用 AtomicsSharedArrayBuffer 会进一步增加复杂性。因此,在编写多线程代码时,要格外小心,充分测试,确保代码的正确性。
  4. SharedArrayBuffer 的兼容性:虽然大多数现代浏览器和 Node.js 版本都支持 SharedArrayBuffer,但仍有一些旧版本不支持。在使用 SharedArrayBuffer 之前,最好检查一下目标环境的兼容性。
  5. Atomics.waitAtomics.notify 必须配对使用: 必须在同一个SharedArrayBuffer的同一个位置上进行wait和notify操作. 否则会出现不可预期的结果。

希望这篇文章能帮助你深入理解 Node.js 中 AtomicsSharedArrayBuffer 的用法。如果你有任何问题或想法,欢迎在评论区留言,我们一起探讨!

代码挖掘机 Node.js多线程Atomics

评论点评

打赏赞助
sponsor

感谢您的支持让我们更好的前行

分享

QRcode

https://www.webkt.com/article/7939