这是我参与8月更文挑战的第5
天,活动详情查看:8月更文挑战
Worker Threads
node中的多线程是在node版本v10.5.0引入的一个的一个新特性,在很长一段时间内Worker Thread
都是实验性质的,到目前为止,node稳定的版本已经到了14.17.4,这个特性已经变成稳定可用了。
worker_thread是什么
先来来看看官网的描述
The `worker_threads` module enables the use of threads that
execute JavaScript in parallel.
意思就是:worker_thread
模块允许使用线程来并行执行JavaScript。
我们以前Javascript都是单线程然后利用一个事件循环队列(event loop
)不断监听执行栈
是否有函数进入。对于worker_thread,其实可以理解为一个event loop
中有多个javascript工作线程,创建一个线程相当于创建一个新的js执行环境。多线程的运行如下图
与child_process的区别
child_process
是可以创建一个新的node进程,worker_thread
与它的最大区别就是:worker_thread可以共享内存,公共的数据可以在线程之间公用,而child_process只能通过JSON去传递数据。
还有就是:因为线程是在一个进程内的,创建一个线程的开销会比创建一个进程要小
worker_thread的适用范围
worker_thread
在CPU密集型的JS操作中非常有用,但是在IO密集型操作中性能不会有太多的改善,反而Node自带的一步IO操作会比工作线程更有用。
使用worker_thread
介绍完worker_thread的概念,现在来介绍一下他的用法
创建工作线程
const { isMainThread, Worker, parentPort } = require('worker_threads');
if (isMainThread) {
createChildThread();
} else {
parentPort.on('message', (msg) => {
console.log('parent thread listen:', msg);
})
parentPort.postMessage('hello child thread')
}
// 创建线程方法
function createChildThread() {
const worker = new Worker(__filename)
worker.on('message', (val) => {
console.log('child thread listen:', val);
worker.postMessage('hello parent thread');
})
}
上面代码在主线程的时候调用创建工作线程的方法,创建一个工作线程并且新增一个,而创建工作线程后调用主线程的端口向工作线程发送消息,工作线程接受到消息后再向主线程回应。大概的流程图如下
执行后返回
创建线程前
创建线程后,线程数量+1
线程间通信
上文提到,worker_threads
可以通过ArrayBuffer
或SharedArrayBuffer
共享内存。接下来看一下,它在代码中是如何实现的.
const { isMainThread, Worker, parentPort } = require('worker_threads');
if (isMainThread) {
createChildThread();
} else {
// 创建一个长度为4byte的SharedArrayBuffer
const shareBuf = new SharedArrayBuffer(4);
// 创建一个8位无符号整型数组
const bufInt = new Uint8Array(shareBuf);
parentPort.on('message', () => {
console.log('parent thread listen:', bufInt);
});
// 发送共享内存创建的整型
parentPort.postMessage({ bufInt });
}
// 创建线程方法
function createChildThread() {
const worker = new Worker(__filename);
worker.on('message', ({ bufInt }) => {
console.log('child thread listen:', bufInt);
bufInt[0] = 11;
worker.postMessage('finished');
});
}
运行后返回,可以看到在子线程创建一个SharedArrayBuffer
,用主线程广播的一个数据,在子线程中接收后赋值,因为是线程间共享的Buffer,所以主线程这边也可以看到在子线程中修改的数据。
如图所示,使用SharedArrayBuffer
创建的值会分配到共享内存中,所有线程都可以共用这块内存。
线程间通信
我们已经学会创建线程和使用共享内存了,从上面代码可以看到,线程都是从主线程中发送消息,然后子线程向主线程回复消息,没有办法让两个子线程直接直接通信,如果我想让两个子线程直接通信,那就需要用到MessageChannel
这个类了,MessageChannel
的具体用法可以点击这里。现在就来实现一下子线程之间的直接通信。
const {
isMainThread,
Worker,
parentPort,
MessageChannel,
threadId,
} = require('worker_threads');
if (isMainThread) {
createChildThread();
} else {
parentPort.on('message', ({ port }) => {
// 主线程接收到端口后配置通信端口方法
port.on('message', (msg) => {
console.log(`port${threadId} listen:`, msg);
});
port.postMessage(`hello, im thread ${threadId}`);
});
parentPort.postMessage('hello');
}
// 创建线程方法
function createChildThread() {
const { port1, port2 } = new MessageChannel(); // 创建一个MessageChannel
const worker1 = new Worker(__filename); // 创建子线程1
const worker2 = new Worker(__filename); // 创建子线程2
worker2.postMessage({ port: port1 }, [port1]); // 向主线程发送Channel的端口1
worker1.postMessage({ port: port2 }, [port2]); // 向主线程发送Channel的端口1
}
运行代码后返回
从代码实现可以看到,最终建立子线程直接通信的步骤还是在主线程的message事件中。建立通信的流程图如下
实战
了解完线程的概念和用法,现在来实战一下:比如在数组中有100万条数据需要md5加密,对比一下使用工作线程和不使用工作线程的实现速度怎么样
const {
isMainThread,
Worker,
parentPort,
threadId,
} = require('worker_threads');
const { createHash } = require('crypto');
const ARR_NUM = 1000000; // 数组长度
const WORKER_NUM = 1; // 线程数
const size = Math.ceil(ARR_NUM / WORKER_NUM); // 每个线程需要处理的数据量
if (isMainThread) {
createChildThread();
} else {
parentPort.on('message', ({ status, index, startTime }) => {
if (index === WORKER_NUM) {
const usedTime = Date.now() - startTime;
console.log(`finish bussiness time: ${usedTime}ms`);
process.exit(threadId);
}
});
const data = addHasCode(threadId, size, (threadId - 1) * size);
// 完成后
parentPort.postMessage({
business: 'finish work',
data,
});
}
// 创建线程方法
function createChildThread() {
let finishNumBuf = new SharedArrayBuffer(4);
let finishNum = new Uint8Array(finishNumBuf);
const startTime = Date.now();
for (let x = 0; x < WORKER_NUM; x++) {
const worker = new Worker(__filename, {});
worker.on('message', ({ business, data }) => {
if (business === 'finish work') {
finishNum[0]++;
worker.postMessage({
status: `finish worker ${x}`,
index: finishNum[0],
startTime: startTime,
data,
});
}
});
}
console.log(`${WORKER_NUM} thread start working`);
}
// 加密方法
function addHasCode(index, size, limit) {
const result = [];
for (let x = limit, num = index * size; x < num; x++) {
result.push(createHash('md5').update('hello world').digest('base64'));
}
return result;
}
使用1
个线程计算,平均需要2700ms
左右
使用5
个线程计算,平均需要2000ms
左右
使用20
个线程计算,平均需要2500ms
左右
使用60
个线程计算,平均需要3800ms
左右
可见,线程不是越多越好,过多的线程可能会增加过多的系统开销,速度也不如单线程时候运行。
小结
本文介绍了nodeJs中的worker_threads
的概念,去多进程的区别,和它的优点。
介绍了worker_threads
是如何使用,共享内存,还有子线程之间的通信。
最后用一个测试子进程的效率的例子说明worker_threads
对比单线程运行。
若文章中有不严谨或出错的地方请在评论区域指出~
参考
- Worker Theread
常见问题FAQ
- 免费下载或者VIP会员专享资源能否直接商用?
- 本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。更多说明请参考 VIP介绍。
- 提示下载完但解压或打开不了?
- 找不到素材资源介绍文章里的示例图片?
- 模板不会安装或需要功能定制以及二次开发?
发表评论
还没有评论,快来抢沙发吧!