51Testing软件测试论坛

 找回密码
 (注-册)加入51Testing

QQ登录

只需一步,快速开始

微信登录,快人一步

手机号码,快捷登录

查看: 678|回复: 0
打印 上一主题 下一主题

[原创] 数据量大的时候,Promise.all没做控制并发如何处理?

[复制链接]

该用户从未签到

跳转到指定楼层
1#
发表于 2022-10-13 15:28:59 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
需求  



我最近在做一个需求:批量去往数据库里存储一些东西,数量可能一次性达到几百个,也就意味着我需要一次性往数据库里存储几百次,我是这么做的:

  const save = (data) => {

    // 数据库操作(Promise)


    return insert(data)


  }


  const datas = [...几百个数据]


  // 进行存储


  Promise.all(datas.map(save))




  被呵斥

  正当我觉得这个需求很轻松的时候。。在 Code Review 上,我被团队的大佬们给呵斥了一顿,理由是:存储的操作发生在服务器,服务器是很脆弱的,你一次性存储几百个,服务器崩了怎么办?

  随后大佬们提出解决方案:控制并发,大佬们是真的强,感觉这种东西已经是大佬们的常规操作了。

  控制Promise.all并发

  意思就是,比如我有几百个存储操作,我不能一次性去全部执行,而是要控制一次性只能执行10个操作,10个中有一个执行完了,就拿还没执行的操作补上去,就这样一直到这几百个操作全部执行完为


止。。

  其实很简单,可以直接用库,比如async-pool、es6-promise-pool、p-limit,只要是能用库的,我建议不要自己去写,因为不定因素很多,你自己写的肯定没有库写的好,你说呢~

  简单实现

  看到一位兄弟实现的挺不错的,链接:https://segmentfault.com/a/1190000016389127

  这是async-pool这个库的核心源码:

  function asyncPool(poolLimit, array, iteratorFn) {

      let i = 0;


      const ret = [];


      const executing = [];


      const enqueue = function () {


          // 边界处理,array为空数组


          if (i === array.length) {


              return Promise.resolve();


          }


          // 每调一次enqueue,初始化一个promise


          const item = array[i++];


          const p = Promise.resolve().then(() => iteratorFn(item, array));


          // 放入promises数组


          ret.push(p);


          // promise执行完毕,从executing数组中删除


          const e = p.then(() => executing.splice(executing.indexOf(e), 1));


          // 插入executing数字,表示正在执行的promise


          executing.push(e);


          // 使用Promise.rece,每当executing数组中promise数量低于poolLimit,就实例化新的promise并执行


          let r = Promise.resolve();


          if (executing.length >= poolLimit) {


              r = Promise.race(executing);


          }


          // 递归,直到遍历完array


          return r.then(() => enqueue());


      };


      return enqueue().then(() => Promise.all(ret));


  }




  大概的逻辑可以总结为:

  ·从array第1个元素开始,初始化promise对象,同时用一个executing数组保存正在执行的promise

  · 不断初始化promise,直到达到poolLimt

  · 使用Promise.race,获得executing中promise的执行情况,当有一个promise执行完毕,继续初始化promise并放入executing中

  · 所有promise都执行完了,调用Promise.all返回

  使用方式:

  const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));

  return asyncPool(2, [1000, 5000, 3000, 2000], timeout).then(results => {


      ...


  });








分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏
回复

使用道具 举报

本版积分规则

关闭

站长推荐上一条 /1 下一条

小黑屋|手机版|Archiver|51Testing软件测试网 ( 沪ICP备05003035号 关于我们

GMT+8, 2024-9-29 11:23 , Processed in 0.095918 second(s), 24 queries .

Powered by Discuz! X3.2

© 2001-2024 Comsenz Inc.

快速回复 返回顶部 返回列表