使用Promise.race()实现控制并发

2年前 (2022) 程序员胖胖胖虎阿
227 0 0

之前做过一个 Node.js实现分片上传 的功能。当时前端采用文件切片后并发上传,大大提高了上传的速度,使用Promise.race()管理并发池,有效避免浏览器内存耗尽。

现在的问题:Node.js服务端合并大文件分片 内存耗尽导致服务重启

服务端代码

const ws = fs.createWriteStream(`${target}/${filename}`); // 创建将要写入文件分片的流
const bufferList = fs.readdirSync(`${STATIC_TEMPORARY}/${filename}`); // 读取到分片文件名的列表[1,2,3...] 每个分片1M大小

// 2. 不会阻塞EventLoop
bufferList.forEach((hash, index) => {
  fs.readFile(`${STATIC_TEMPORARY}/${filename}/${index}`, (err, data) => {
       ws.write(data);
  });
});

服务器配置:RAM:1024MB 1vCPU (运行了一些其他服务)

测试发现只要上传的文件超过300M,在合并分片时就会导致内存耗尽服务重启,根本无法完成分片合并。

解决方案:能不能在循环中控制读取文件的并发数呢?这样就不会有大量文件同时读取到内存中导致服务崩溃了。

尝试像前端那样使用 Promise.race 控制:

const ws = fs.createWriteStream(`${target}/${filename}`); // 创建将要写入文件分片的流
const bufferList = fs.readdirSync(`${STATIC_TEMPORARY}/${filename}`); // 读取到分片文件名的列表[1,2,3...] 每个分片1M大小

// Promise.race 并发控制
const pool = [];
let finish = 0; // 已经写入完成的分片数量

// 使用Promise包裹读取文件的异步操作
const PR = (index) => {
  return new Promise((resolve) => {
    fs.readFile(`${STATIC_TEMPORARY}/${filename}/${index}`, (err, data) => {
      ws.write(data)
      resolve({});
    });
  });
};

(async function easyRead() {
  for (let i = 0; i < bufferList.length; i ++) {
    const task = PR(i).then(val => {
      finish+=1
      const index = pool.findIndex(t => t === task);
      pool.splice(index);
      if (finish === bufferList.length) {
        ws.close();
      }
    });
    pool.push(task);
    if (pool.length === 1) { // 这里并发数量只能是1 否则分片写入是乱序的 格式会被损坏
      await Promise.race(pool);
    }
  }
})()

这时神奇的事情就发生了,我们在for循环中使用了Promise.race()控制了同一时间读入到内存中的文件数量。

再次测试,服务已经不会在合并分片时崩溃,即使1个G的文件分片也可以在3秒左右合并完成。

async、await

相信这个是大家更加常用的处理异步的方法,因为它写起来更加优雅。我们上边的Promise.race也可以用它来替换,使代码看起来更加简洁明了:

const ws = fs.createWriteStream(`${target}/${filename}`); // 创建将要写入文件分片的流
const bufferList = fs.readdirSync(`${STATIC_TEMPORARY}/${filename}`); // 读取到分片文件名的列表[1,2,3...] 每个分片1M大小

let finish = 0; // 已经写入完成的分片数量

// 使用Promise包裹读取文件的异步操作
const PR = (index) => {
  return new Promise((resolve) => {
    fs.readFile(`${STATIC_TEMPORARY}/${filename}/${index}`, (err, data) => {
      ws.write(data)
      resolve({});
    });
  });
};

(async function easyRead() {
  for (let i = 0; i < bufferList.length; i ++) {
    await PR(i)
    finish +=1
    if (finish === bufferList.length) {
        ws.close();
    }
  }
})()

可以达到相同的效果,看起来是不是清晰了很多?

总结

知道Promise.race()的人很多,这样的面试题也很多,但是能运用到实践中解决实际的问题却不是很多。

希望本文可以帮到你。

文章首发于 IICOOM-技术博客 《使用Promise.race()实现控制并发》

版权声明:程序员胖胖胖虎阿 发表于 2022年10月5日 上午8:32。
转载请注明:使用Promise.race()实现控制并发 | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...