Node.js 多线程进阶:worker_threads 中 Atomics 与 SharedArrayBuffer 的深度同步实践
为什么需要 Atomics 和 SharedArrayBuffer?
Atomics 对象的核心方法
1. Atomics.load(typedArray, index)
2. Atomics.store(typedArray, index, value)
3. Atomics.compareExchange(typedArray, index, expectedValue, replacementValue)
4. Atomics.wait(typedArray, index, value, timeout)
5. Atomics.notify(typedArray, index, count)
其他原子操作方法
使用 Atomics 实现互斥锁和条件变量
总结与注意事项
你好,我是你们的“老朋友”——“代码挖掘机”。今天咱们不聊那些花里胡哨的框架,来聊点 Node.js 多线程编程中的硬核知识:worker_threads
模块里的 Atomics
对象以及它在 SharedArrayBuffer
线程同步中的作用。如果你已经对 Node.js 的多线程编程有了一定了解,那么相信这篇文章能让你对线程同步的理解更上一层楼。
为什么需要 Atomics 和 SharedArrayBuffer?
在传统的 JavaScript 单线程模型中,我们很少需要考虑并发问题。但是,当 Node.js 引入 worker_threads
后,多线程编程成为可能,并发问题也随之而来。多个线程同时访问和修改同一块内存区域(SharedArrayBuffer
)时,如果没有适当的同步机制,就会出现数据竞争(Data Race),导致程序行为不可预测。
Atomics
对象提供了一组原子操作,可以保证对 SharedArrayBuffer
的读写操作是“原子”的,即不可中断的。这意味着,在一个线程执行原子操作期间,其他线程无法访问或修改同一内存位置的数据。这就像给共享数据加了一把“锁”,确保同一时刻只有一个线程能操作它。
Atomics 对象的核心方法
Atomics
对象提供了一系列静态方法,用于对 SharedArrayBuffer
进行原子操作。下面介绍几个核心方法,并通过代码示例演示其用法。
1. Atomics.load(typedArray, index)
原子地读取 typedArray
中指定 index
位置的值。
const sab = new SharedArrayBuffer(1024); const int32 = new Int32Array(sab); // 初始值 int32[0] = 10; // 在一个 worker 线程中 // const { port } = require('worker_threads'); // port.on('message', () => { // console.log('从主线程读取的值:', Atomics.load(int32, 0)); // 原子地读取值 // port.close(); // }); // 模拟主线程的操作,直接打印 console.log('从主线程读取的值:', Atomics.load(int32, 0));
2. Atomics.store(typedArray, index, value)
原子地将 value
写入 typedArray
中指定 index
的位置,并返回旧值。
const sab = new SharedArrayBuffer(1024); const int32 = new Int32Array(sab); // 在一个 worker 线程中 // const { port } = require('worker_threads'); // port.on('message', () => { // const oldValue = Atomics.store(int32, 0, 20); // 原子地写入新值 // console.log('旧值:', oldValue); // 输出旧值 // console.log('新值:', Atomics.load(int32, 0)); // 再次读取确认 // port.close(); // }); // 模拟主线程 const oldValue = Atomics.store(int32, 0, 20); // 原子地写入新值 console.log('旧值:', oldValue); // 输出旧值 console.log('新值:', Atomics.load(int32, 0)); // 再次读取确认
3. Atomics.compareExchange(typedArray, index, expectedValue, replacementValue)
原子地比较 typedArray
中 index
位置的值与 expectedValue
。如果相等,则将 index
位置的值替换为 replacementValue
,并返回旧值;否则,不进行任何操作,直接返回旧值。
这个方法是实现互斥锁(Mutex)的基础。
const sab = new SharedArrayBuffer(1024); const int32 = new Int32Array(sab); // 初始值 int32[0] = 0; // 0 表示锁空闲,1 表示锁被占用 // 尝试获取锁 function acquireLock() { while (true) { const oldValue = Atomics.compareExchange(int32, 0, 0, 1); if (oldValue === 0) { // 获取锁成功 return; } // 锁被占用,等待一段时间后重试,或者使用 Atomics.wait Atomics.wait(int32, 0, 1); //等待 } } // 释放锁 function releaseLock() { const oldValue = Atomics.store(int32, 0, 0); if (oldValue !== 1) { throw new Error('释放锁失败'); } Atomics.notify(int32, 0, 1); } // 在一个 worker 线程中 // const { port } = require('worker_threads'); // port.on('message', () => { // acquireLock(); // console.log('Worker 线程获取到锁,开始执行临界区代码...'); // // 模拟临界区代码执行 // for (let i = 0; i < 1000000; i++) { } // console.log('Worker 线程执行完毕,释放锁...'); // releaseLock(); // port.close(); // }); acquireLock(); console.log('主线程获取到锁,开始执行临界区代码...'); // 模拟临界区代码执行 for (let i = 0; i < 1000000; i++) { } console.log('主线程执行完毕,释放锁...'); releaseLock();
4. Atomics.wait(typedArray, index, value, timeout)
原子地检查 typedArray
中 index
位置的值是否等于 value
。如果相等,则当前线程进入休眠状态,直到被 Atomics.notify
唤醒或超时(timeout
,以毫秒为单位)。如果 index
位置的值不等于 value
,则立即返回 'not-equal'
。
Atomics.wait
和 Atomics.notify
结合使用,可以实现条件变量。
5. Atomics.notify(typedArray, index, count)
唤醒在 typedArray
中 index
位置等待的最多 count
个线程。返回实际唤醒的线程数。
// 续上例 compareExchange
其他原子操作方法
除了上述几个核心方法外,Atomics
对象还提供了一些其他原子操作方法,如:
Atomics.add(typedArray, index, value)
:原子地将value
加到typedArray
中index
位置的值上,并返回旧值。Atomics.sub(typedArray, index, value)
:原子地从typedArray
中index
位置的值减去value
,并返回旧值。Atomics.and(typedArray, index, value)
:原子地将typedArray
中index
位置的值与value
进行按位与操作,并返回旧值。Atomics.or(typedArray, index, value)
:原子地将typedArray
中index
位置的值与value
进行按位或操作,并返回旧值。Atomics.xor(typedArray, index, value)
:原子地将typedArray
中index
位置的值与value
进行按位异或操作,并返回旧值。Atomics.exchange(typedArray, index, value)
: 原子地将typedArray
中index
位置的值设置为value
,并返回旧值. 相当于Atomics.store
。
这些方法的用法与 Atomics.load
和 Atomics.store
类似,只是执行的操作不同。你可以根据实际需要选择合适的方法。
使用 Atomics 实现互斥锁和条件变量
上面我们已经通过 Atomics.compareExchange
演示了一个简单的互斥锁实现。下面我们再来看一个更完整的互斥锁和条件变量的实现示例。
// Mutex.js class Mutex { constructor(sharedArrayBuffer, offset = 0) { this.int32 = new Int32Array(sharedArrayBuffer, offset, 1); } acquire() { while (true) { const oldValue = Atomics.compareExchange(this.int32, 0, 0, 1); if (oldValue === 0) { return; } Atomics.wait(this.int32, 0, 1); } } release() { const oldValue = Atomics.store(this.int32, 0, 0); if (oldValue !== 1) { throw new Error('释放锁失败'); } Atomics.notify(this.int32, 0, 1); // 只唤醒一个等待的线程 } } // ConditionVariable.js class ConditionVariable { constructor(sharedArrayBuffer, offset = 0) { this.int32 = new Int32Array(sharedArrayBuffer, offset, 1); this.waiters = 0; // 等待的线程数 } wait(mutex) { this.waiters++; mutex.release(); Atomics.wait(this.int32, 0, 0); // 在条件变量上等待 this.waiters--; mutex.acquire(); } signal() { if (this.waiters > 0) { Atomics.notify(this.int32, 0, 1); // 唤醒一个等待的线程 } } broadcast() { if (this.waiters > 0) { Atomics.notify(this.int32, 0, this.waiters); // 唤醒所有等待的线程 } } } // 使用示例 const sab = new SharedArrayBuffer(1024); const mutex = new Mutex(sab); const cond = new ConditionVariable(sab, 4); // 注意偏移量 let sharedCounter = new Int32Array(sab, 8); //共享计数器 // 在一个 worker 线程中 // const { Worker, isMainThread } = require('worker_threads'); // if (!isMainThread) { // mutex.acquire(); // while (sharedCounter[0] < 10) { // console.log("等待条件满足..."); // cond.wait(mutex); // } // console.log('条件满足,开始执行...'); // sharedCounter[0] = 0; // mutex.release(); // process.exit(); // } // 模拟主线程 mutex.acquire(); console.log("主线程增加计数器..."); for(let i = 0; i< 11; i++){ sharedCounter[0] = i; } cond.signal(); // 发送信号,唤醒一个等待的线程 mutex.release(); // 另一个worker线程 // new Worker(__filename);
这个例子中,我们定义了 Mutex
和 ConditionVariable
两个类,分别用于实现互斥锁和条件变量。Mutex
类使用 Atomics.compareExchange
实现锁的获取和释放,ConditionVariable
类使用 Atomics.wait
和 Atomics.notify
实现线程的等待和唤醒。
在使用示例中,主线程和 worker 线程通过 Mutex
和 ConditionVariable
进行同步。worker 线程首先获取锁,然后检查共享计数器 sharedCounter
的值。如果值小于 10,则调用 cond.wait(mutex)
在条件变量上等待,并释放锁。主线程获取锁后,增加计数器的值,然后调用 cond.signal()
发送信号,唤醒一个等待的 worker 线程。worker 线程被唤醒后,重新获取锁,并继续执行。
总结与注意事项
Atomics
对象和 SharedArrayBuffer
为 Node.js 多线程编程提供了强大的同步机制。通过原子操作,我们可以避免数据竞争,实现线程安全的共享内存访问。但是,在使用 Atomics
和 SharedArrayBuffer
时,需要注意以下几点:
- 性能开销:原子操作虽然可以保证线程安全,但也会带来一定的性能开销。因此,应避免过度使用原子操作,只在必要时使用。
- 死锁:在使用互斥锁时,要注意避免死锁。例如,一个线程持有锁 A,等待锁 B,而另一个线程持有锁 B,等待锁 A,就会导致死锁。避免死锁的常见方法包括:按固定顺序获取锁、避免持有多个锁、使用超时机制等。
- 复杂性:多线程编程本身就比较复杂,使用
Atomics
和SharedArrayBuffer
会进一步增加复杂性。因此,在编写多线程代码时,要格外小心,充分测试,确保代码的正确性。 SharedArrayBuffer
的兼容性:虽然大多数现代浏览器和 Node.js 版本都支持SharedArrayBuffer
,但仍有一些旧版本不支持。在使用SharedArrayBuffer
之前,最好检查一下目标环境的兼容性。Atomics.wait
和Atomics.notify
必须配对使用: 必须在同一个SharedArrayBuffer
的同一个位置上进行wait和notify操作. 否则会出现不可预期的结果。
希望这篇文章能帮助你深入理解 Node.js 中 Atomics
和 SharedArrayBuffer
的用法。如果你有任何问题或想法,欢迎在评论区留言,我们一起探讨!