最新公告
  • 欢迎您光临起源地模板网,本站秉承服务宗旨 履行“站长”责任,销售只是起点 服务永无止境!立即加入钻石VIP
  • JS 请求调度器

    正文概述 掘金(孟陬)   2021-03-08   664

    为了获取一批互不依赖的资源,通常从性能考虑可以用 Promise.all(arrayOfPromises)来并发执行。比如我们已有 100 个应用的 id,需求是聚合所有应用的 PV,我们通常会这么写:

    const ids = [1001, 1002, 1003, 1004, 1005];
    const urlPrefix = 'http://opensearch.example.com/api/apps';
    
    // fetch 函数发送 HTTP 请求,返回 Promise
    const appPromises = ids.map(id => `${urlPrefix}/${id}`).map(fetch);
    
    Promise.all(appPromises)
      // 通过 reduce 做累加
      .then(apps => apps.reduce((initial, current) => initial + current.pv, 0))
      .catch((error) => console.log(error));
    

    上面的代码在应用个数不多的情况下,可以运行正常。当应用个数达到成千上万时,对支持并发数不是很好的系统,你的「压测」会把第三放服务器搞挂,暂时无法响应请求:

    <html>
    <head><title>502 Bad Gateway</title></head>
    <body bgcolor="white">
    <center><h1>502 Bad Gateway</h1></center>
    <hr><center>nginx/1.10.1</center>
    </body>
    </html>
    

    如何解决呢?

    一个很自然的想法是,既然不支持这么多的并发请求,那就分割成几大块,每块为一个 chunkchunk 内部的请求依然并发,但块的大小(chunkSize)限制在系统支持的最大并发数以内。前一个 chunk 结束后一个 chunk 才能继续执行,也就是说 chunk 内部的请求是并发的,但 chunk 之间是串行的。思路其实很简单,写起来却有一定难度。总结起来三个操作:分块、串行、聚合

    难点在如何串行执行 Promise,Promise 仅提供了并行(Promise.all)功能,并没有提供串行功能。我们从简单的三个请求开始,看如何实现,启发式解决问题(heuristic)。

    // task1, task2, task3 是三个返回 Promise 的工厂函数,模拟我们的异步请求
    const task1 = () => new Promise((resolve) => {
      setTimeout(() => {
        resolve(1);
        console.log('task1 executed');
      }, 1000);
    });
    
    const task2 = () => new Promise((resolve) => {
      setTimeout(() => {
        resolve(2);
        console.log('task2 executed');
      }, 1000);
    });
    
    const task3 = () => new Promise((resolve) => {
      setTimeout(() => {
        resolve(3);
        console.log('task3 executed');
      }, 1000);
    });
    
    // 聚合结果
    let result = 0;
    
    const resultPromise = [task1, task2, task3].reduce((current, next) => 	  
      current.then((number) => {
        console.log('resolved with number', number); // task2, task3 的 Promise 将在这里被 resolve
        result += number;
    
        return next();
      }),
      
      Promise.resolve(0)) // 聚合初始值
    
      .then(function(last) {
        console.log('The last promise resolved with number', last); // task3 的 Promise 在这里被 resolve
    
        result += last;
    
        console.log('all executed with result', result);
    
        return Promise.resolve(result);
      });
    

    运行结果如图 1:

    JS 请求调度器

    代码解析:我们想要的效果,直观展示其实是 fn1().then(() => fn2()).then(() => fn3())。上面代码能让一组 Promise 按顺序执行的关键之处就在 reduce 这个“引擎”在一步步推动 Promise 工厂函数的执行。

    难点解决了,我们看看最终代码:

    /**
     * 模拟 HTTP 请求
     * @param  {String} url 
     * @return {Promise}
     */
    function fetch(url) {
      console.log(`Fetching ${url}`);
      return new Promise((resolve) => {
        setTimeout(() => resolve({ pv: Number(url.match(/\d+$/)) }), 2000);
      });
    }
    
    const urlPrefix = 'http://opensearch.example.com/api/apps';
    
    const aggregator = {
      /**
       * 入口方法,开启定时任务
       * 
       * @return {Promise}
       */
      start() {
        return this.fetchAppIds()
          .then(ids => this.fetchAppsSerially(ids, 2))
          .then(apps => this.sumPv(apps))
          .catch(error => console.error(error));
      },
      
      /**
       * 获取所有应用的 ID
       *
       * @private
       * 
       * @return {Promise}
       */
      fetchAppIds() {
        return Promise.resolve([1001, 1002, 1003, 1004, 1005]);
      },
    
      promiseFactory(ids) {
        return () => Promise.all(ids.map(id => `${urlPrefix}/${id}`).map(fetch));
      },
      
      /**
       * 获取所有应用的详情
       * 
       * 一次并发请求 `concurrency` 个应用,称为一个 chunk
       * 前一个 `chunk` 并发完成后一个才继续,直至所有应用获取完毕
       *
       * @private
       *
       * @param  {[Number]} ids
       * @param  {Number} concurrency 一次并发的请求数量
       * @return {[Object]}         所有应用的信息
       */
      fetchAppsSerially(ids, concurrency = 100) {
        // 分块
        let chunkOfIds = ids.splice(0, concurrency);
        const tasks = [];
        
        while (chunkOfIds.length !== 0) {
          tasks.push(this.promiseFactory(chunkOfIds));
          chunkOfIds = ids.splice(0, concurrency);
        }
        
        // 按块顺序执行
        const result = [];
        return tasks.reduce((current, next) => current.then((chunkOfApps) => {
          console.info('Chunk of', chunkOfApps.length, 'concurrency requests has finished with result:', chunkOfApps, '\n\n');
          result.push(...chunkOfApps); // 拍扁数组
          return next();
        }), Promise.resolve([]))
        .then((lastchunkOfApps) => {
          console.info('Chunk of', lastchunkOfApps.length, 'concurrency requests has finished with result:', lastchunkOfApps, '\n\n');
    
          result.push(...lastchunkOfApps); // 再次拍扁它
          console.info('All chunks has been executed with result', result);
          return result;
        });
      },
      
      /**
       * 聚合所有应用的 PV
       * 
       * @private
       * 
       * @param  {[]} apps 
       * @return {[type]}      [description]
       */
      sumPv(apps) {
        const initial = { pv: 0 };
    
        return apps.reduce((accumulator, app) => ({ pv: accumulator.pv + app.pv }), initial);
      }
    };
    
    // 开始运行
    aggregator.start().then(console.log);
    

    运行结果如图 2:

    JS 请求调度器

    抽象和复用

    目的达到了,因具备通用性,下面开始抽象成一个模式以便复用。

    串行

    先模拟一个 http get 请求。

    /**
     * mocked http get.
     * @param {string} url
     * @returns {{ url: string; delay: number; }}
     */
    function httpGet(url) {
      const delay = Math.random() * 1000;
    
      console.info('GET', url);
    
      return new Promise((resolve) => {
        setTimeout(() => {
          resolve({
            url,
            delay,
            at: Date.now()
          })
        }, delay);
      })
    }
    

    串行执行一批请求。

    const ids = [1, 2, 3, 4, 5, 6, 7];
    
    // 批量请求函数,注意是 delay 执行的『函数』对了,否则会立即将请求发送出去,达不到串行的目的
    const httpGetters = ids.map(id => 
      () => httpGet(`https://jsonplaceholder.typicode.com/posts/${id}`)
    );
    
    // 串行执行之
    const tasks = await httpGetters.reduce((acc, cur) => {
      return acc.then(cur);
      
      // 简写,等价于
      // return acc.then(() => cur());
    }, Promise.resolve());
    
    tasks.then(() => {
      console.log('done');
    });
    

    注意观察控制台输出,应该串行输出以下内容:

    GET https://jsonplaceholder.typicode.com/posts/1
    GET https://jsonplaceholder.typicode.com/posts/2
    GET https://jsonplaceholder.typicode.com/posts/3
    GET https://jsonplaceholder.typicode.com/posts/4
    GET https://jsonplaceholder.typicode.com/posts/5
    GET https://jsonplaceholder.typicode.com/posts/6
    GET https://jsonplaceholder.typicode.com/posts/7
    

    分段串行,段中并行

    重点来了。本文的请求调度器实现:

    /**
     * Schedule promises.
     * @param {Array<(...arg: any[]) => Promise<any>>} factories 
     * @param {number} concurrency 
     */
    function schedulePromises(factories, concurrency) {
      /**
       * chunk
       * @param {any[]} arr 
       * @param {number} size 
       * @returns {Array<any[]>}
       */
      const chunk = (arr, size = 1) => {
        return arr.reduce((acc, cur, idx) => {
          const modulo = idx % size;
    
          if (modulo === 0) {
            acc[acc.length] = [cur];
          } else {
            acc[acc.length - 1].push(cur);
          }
    
          return acc;
        }, [])
      };
    
      const chunks = chunk(factories, concurrency);
    
      let resps = [];
    
      return chunks.reduce(
        (acc, cur) => {
          return acc
            .then(() => {
              console.log('---');
              return Promise.all(cur.map(f => f()));
            })
            .then((intermediateResponses) => {
              resps.push(...intermediateResponses);
    
              return resps;
            })
        },
    
        Promise.resolve()
      );
    }
    

    测试下,执行调度器:

    // 分段串行,段中并行
    schedulePromises(httpGetters, 3).then((resps) => {
      console.log('resps:', resps);
    });
    

    控制台输出:

    ---
    GET https://jsonplaceholder.typicode.com/posts/1
    GET https://jsonplaceholder.typicode.com/posts/2
    GET https://jsonplaceholder.typicode.com/posts/3
    ---
    GET https://jsonplaceholder.typicode.com/posts/4
    GET https://jsonplaceholder.typicode.com/posts/5
    GET https://jsonplaceholder.typicode.com/posts/6
    ---
    GET https://jsonplaceholder.typicode.com/posts/7
    
    resps: [
      {
        "url": "https://jsonplaceholder.typicode.com/posts/1",
        "delay": 733.010980640727,
        "at": 1615131322163
      },
      {
        "url": "https://jsonplaceholder.typicode.com/posts/2",
        "delay": 594.5056229848931,
        "at": 1615131322024
      },
      {
        "url": "https://jsonplaceholder.typicode.com/posts/3",
        "delay": 738.8230109146299,
        "at": 1615131322168
      },
      {
        "url": "https://jsonplaceholder.typicode.com/posts/4",
        "delay": 525.4604386109747,
        "at": 1615131322698
      },
      {
        "url": "https://jsonplaceholder.typicode.com/posts/5",
        "delay": 29.086379722201183,
        "at": 1615131322201
      },
      {
        "url": "https://jsonplaceholder.typicode.com/posts/6",
        "delay": 592.2345027398272,
        "at": 1615131322765
      },
      {
        "url": "https://jsonplaceholder.typicode.com/posts/7",
        "delay": 513.0684467560949,
        "at": 1615131323284
      }
    ]
    

    验证通过,确实是逐三个请求并发,等待并发请求完毕才继续下三个请求发起,且结果聚合也正确。

    总结

    1. 如果并发请求的数量太大,可以考虑分块串行,块中请求并发。
    2. 问题看似复杂,不放先简化之,然后一步步推导出关键点,最后抽象,就能找到解决方案。
    3. 本文的精髓在于使用 reduce 作为串行推动的引擎,故掌握其对我们日常开发遇到的迷局破解可提供新思路,reduce 精通见上篇 你终于用 Reduce 了 ?。

    起源地下载网 » JS 请求调度器

    常见问题FAQ

    免费下载或者VIP会员专享资源能否直接商用?
    本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。更多说明请参考 VIP介绍。
    提示下载完但解压或打开不了?
    最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,若小于网盘提示的容量则是这个原因。这是浏览器下载的bug,建议用百度网盘软件或迅雷下载。若排除这种情况,可在对应资源底部留言,或 联络我们.。
    找不到素材资源介绍文章里的示例图片?
    对于PPT,KEY,Mockups,APP,网页模版等类型的素材,文章内用于介绍的图片通常并不包含在对应可供下载素材包内。这些相关商业图片需另外购买,且本站不负责(也没有办法)找到出处。 同样地一些字体文件也是这种情况,但部分素材会在素材包内有一份字体下载链接清单。
    模板不会安装或需要功能定制以及二次开发?
    请QQ联系我们

    发表评论

    还没有评论,快来抢沙发吧!

    如需帝国cms功能定制以及二次开发请联系我们

    联系作者

    请选择支付方式

    ×
    迅虎支付宝
    迅虎微信
    支付宝当面付
    余额支付
    ×
    微信扫码支付 0 元