之前做过一个 Node.js实现分片上传 的功能。当时前端采用文件切片后并发上传,大大提高了上传的速度,使用Promise.race()管理并发池,有效避免浏览器内存耗尽。
现在的问题:Node.js服务端合并大文件分片 内存耗尽导致服务重启
服务端代码
const ws = fs.createWriteStream(`${target}/${filename}`); // 创建将要写入文件分片的流
const bufferList = fs.readdirSync(`${STATIC_TEMPORARY}/${filename}`); // 读取到分片文件名的列表[1,2,3...] 每个分片1M大小
// 2. 不会阻塞EventLoop
bufferList.forEach((hash, index) => {
fs.readFile(`${STATIC_TEMPORARY}/${filename}/${index}`, (err, data) => {
ws.write(data);
});
});
服务器配置:RAM:1024MB 1vCPU (运行了一些其他服务)
测试发现只要上传的文件超过300M,在合并分片时就会导致内存耗尽服务重启,根本无法完成分片合并。
解决方案:能不能在循环中控制读取文件的并发数呢?这样就不会有大量文件同时读取到内存中导致服务崩溃了。
尝试像前端那样使用 Promise.race 控制:
const ws = fs.createWriteStream(`${target}/${filename}`); // 创建将要写入文件分片的流
const bufferList = fs.readdirSync(`${STATIC_TEMPORARY}/${filename}`); // 读取到分片文件名的列表[1,2,3...] 每个分片1M大小
// Promise.race 并发控制
const pool = [];
let finish = 0; // 已经写入完成的分片数量
// 使用Promise包裹读取文件的异步操作
const PR = (index) => {
return new Promise((resolve) => {
fs.readFile(`${STATIC_TEMPORARY}/${filename}/${index}`, (err, data) => {
ws.write(data)
resolve({});
});
});
};
(async function easyRead() {
for (let i = 0; i < bufferList.length; i ++) {
const task = PR(i).then(val => {
finish+=1
const index = pool.findIndex(t => t === task);
pool.splice(index);
if (finish === bufferList.length) {
ws.close();
}
});
pool.push(task);
if (pool.length === 1) { // 这里并发数量只能是1 否则分片写入是乱序的 格式会被损坏
await Promise.race(pool);
}
}
})()
这时神奇的事情就发生了,我们在for循环中使用了Promise.race()控制了同一时间读入到内存中的文件数量。
再次测试,服务已经不会在合并分片时崩溃,即使1个G的文件分片也可以在3秒左右合并完成。
async、await
相信这个是大家更加常用的处理异步的方法,因为它写起来更加优雅。我们上边的Promise.race也可以用它来替换,使代码看起来更加简洁明了:
const ws = fs.createWriteStream(`${target}/${filename}`); // 创建将要写入文件分片的流
const bufferList = fs.readdirSync(`${STATIC_TEMPORARY}/${filename}`); // 读取到分片文件名的列表[1,2,3...] 每个分片1M大小
let finish = 0; // 已经写入完成的分片数量
// 使用Promise包裹读取文件的异步操作
const PR = (index) => {
return new Promise((resolve) => {
fs.readFile(`${STATIC_TEMPORARY}/${filename}/${index}`, (err, data) => {
ws.write(data)
resolve({});
});
});
};
(async function easyRead() {
for (let i = 0; i < bufferList.length; i ++) {
await PR(i)
finish +=1
if (finish === bufferList.length) {
ws.close();
}
}
})()
可以达到相同的效果,看起来是不是清晰了很多?
总结
知道Promise.race()的人很多,这样的面试题也很多,但是能运用到实践中解决实际的问题却不是很多。
希望本文可以帮到你。
文章首发于 IICOOM-技术博客 《使用Promise.race()实现控制并发》
相关文章
暂无评论...