场景
假设有 10
个请求,但是最大的并发数目是 5
个,并且要求拿到请求结果,这样就是一个简单的并发请求控制
模拟
利用 setTimeout
实行简单模仿一个请求
const timeout = (timeout: number, ret: number) => {
return (idx?: any) =>
new Promise((resolve) => {
setTimeout(() => {
const compare = Date.now() - startTime;
console.log(`At ${Math.floor(compare / 100)}00 return`, ret);
resolve(idx);
}, timeout);
});
};
const timeout1 = timeout(1000, 1);
const timeout2 = timeout(300, 2);
const timeout3 = timeout(400, 3);
const timeout4 = timeout(500, 4);
const timeout5 = timeout(200, 5);
通过这样来模拟请求,本质就是 Promise
没有并发控制的时候
const run = async () => {
await Promise.all([
timeout1(),
timeout2(),
timeout3(),
timeout4(),
timeout5(),
]);
};
run();
At 200 return 5
At 300 return 2
At 400 return 3
At 500 return 4
At 1000 return 1
可以看到输出是 5 2 3 4 1
,按 timeout
的时间输出了
并发条件
假设同时间最大并发数目是 2
,创建一个类
class Concurrent {
private maxConcurrent: number = 2;
constructor(count: number = 2) {
this.maxConcurrent = count;
}
}
第一种并发控制
想一下,按最大并发数拆分 Promise
数组,如果有 Promise
被 fulfilled
的时候,就移除掉,然后把 pending
状态的 Promise
,加进来。Promise.race
可以帮我们满足这个需求
class Concurrent {
private maxConcurrent: number = 2;
constructor(count: number = 2) {
this.maxConcurrent = count;
}
public async useRace(fns: Function[]) {
const runing: any[] = [];
// 按并发数,把 Promise 加进去
// Promise 会回调一个索引,方便我们知道哪个 Promise 已经 resolve 了
for (let i = 0; i < this.maxConcurrent; i++) {
if (fns.length) {
const fn = fns.shift()!;
runing.push(fn(i));
}
}
const handle = async () => {
if (fns.length) {
const idx = await Promise.race<number>(runing);
const nextFn = fns.shift()!;
// 移除已经完成的 Promise,把新的进去
runing.splice(idx, 1, nextFn(idx));
handle();
} else {
// 如果数组已经被清空了,表面已经没有需要执行的 Promise 了,可以改成 Promise.all
await Promise.all(runing);
}
};
handle();
}
}
const run = async () => {
const concurrent = new Concurrent();
startTime = Date.now();
await concurrent.useRace([timeout1, timeout2, timeout3, timeout4, timeout5]);
};
At 300 return 2
At 700 return 3
At 1000 return 1
At 1200 return 5
At 1200 return 4
可以看到输出已经变了,为什么会这样呢,分析一下,最大并发数 2
// 首先执行的是 1 2
1 需要 1000 MS 才执行完
2 需要 300 MS
2 执行完,时间线变成 300 移除 2 加入 3 开始执行 3
3 需要 400MS 执行完时间变成 700 移除 3 加入 4 开始执行 4
4 需要 500MS
时间线来到 1000MS,1 执行完 移除 1 加入 5 开始执行 5
时间线来到 1200MS,4 和 5 刚好同时执行完
第二种方案
可以利用 await
的机制,其实也是一个小技巧
如果当前的并发数已经超过最大的并发数目了,可以设置一个新的 Promise
,并且 await
,等待其他的请求完成的时候,resolve
,移除等待,所以需要新增两个状态,当前的并发数目,还有用来存储 resolve
这个回调函数的数组
class Concurrent {
private maxConcurrent: number = 2;
private list: Function[] = [];
private currentCount: number = 0;
constructor(count: number = 2) {
this.maxConcurrent = count;
}
public async add(fn: Function) {
this.currentCount += 1;
// 如果最大已经超过最大并发数
if (this.currentCount > this.maxConcurrent) {
// wait 是一个 Promise,只要调用 resolve 就会变成 fulfilled 状态
const wait = new Promise((resolve) => {
this.list.push(resolve);
});
// 在没有调用 resolve 的时候,这里会一直阻塞
await wait;
}
// 执行函数
await fn();
this.currentCount -= 1;
if (this.list.length) {
// 把 resolve 拿出来,调用,这样 wait 就完成了,可以往下面执行了
const resolveHandler = this.list.shift()!;
resolveHandler();
}
}
}
const run = async () => {
const concurrent = new Concurrent();
startTime = Date.now();
concurrent.add(timeout1);
concurrent.add(timeout2);
concurrent.add(timeout3);
concurrent.add(timeout4);
concurrent.add(timeout5);
};
run();
At 300 return 2
At 700 return 3
At 1000 return 1
At 1200 return 5
At 1200 return 4
总结
这两种方式都可以实现并发控制,只不过实现的方式不太一样,主要都是靠 Promise
实现,另外实现方式里面没有考虑异常的情况,这个可以自己加上
常见问题FAQ
- 免费下载或者VIP会员专享资源能否直接商用?
- 本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。更多说明请参考 VIP介绍。
- 提示下载完但解压或打开不了?
- 找不到素材资源介绍文章里的示例图片?
- 模板不会安装或需要功能定制以及二次开发?
发表评论
还没有评论,快来抢沙发吧!