文章目录
-
- 定义分片大小
- 急速上传
-
- 文件MD5
- 分片上传
-
- 创建分片上传任务
-
- 前端计算文件分片数量
- 自定义MINIO CLIENT
- 调用后台接口创建上传任务
- 创建单文件上传任务
- 创建分块上传任务
- 校验桶是否存在(不存在则创建)
- 分片文件合并
-
- 获取指定 uploadId 下已上传的分块信息
- MINIO 文件合并
- 过期任务清理
-
- 按照任务ID清理
- MINIO 过时上传任务清理配置(MINIO 最新版本有效)
- 调用接口批量清理超过一定时间的任务(不能实现)
- 设置桶生命周期策略(不能实现)
记录一下自己在实现大文件上传时的简单思路和核心代码。
大体思路如下:
1、数据库中存放文件路径,所有文件保存在 MINIO 中,文件名即是文件的 MD5。
2、当用户上传文件时,首先判断该文件信息是否存在在数据库中,如果存在则直接显示上传成功(急速上传),若不存在则执行上传操作。
3、文件在真正上传之前先判断文件大小,太小的不需要创建分片上传任务,一次性上传即可。
4、后台调用 MINIO 的 API 创建分片上传任务(得到一个任务 ID ),并为该任务生成分片上传链接(上传地址列表)后返回给前端,前端将对应分片按照到对应的连接传递到 MINIO 中。
5、分片上传成功后更新进度信息。
6、所有分片上传结束后,调用 MINIO 的 API 将当前任务的分片全部合并形成整个文件。
定义分片大小
const chunkSize = 5 * 1024 * 1024; // 切片大小为5M
急速上传
文件MD5
前端使用 SparkMD5 获取文件的 MD5 信息,当该 MD5 信息已经存在在数据库中时,即上传完成(急速上传)
下面是获取文件 MD5 的方法
import SparkMD5 from 'spark-md5';
//获取文件的MD5信息 分片获取
const ReadFileMD5 = (param) => {
return new Promise((resolve, reject) => {
const file = param.file;
const fileReader = new FileReader();
const md5 = new SparkMD5();
let index = 0;
const loadFile = () => {
const slice = file.slice(index, index + chunkSize);
fileReader.readAsBinaryString(slice);
};
loadFile();
fileReader.onload = (e) => {
md5.appendBinary(e.target.result);
if (index < file.size) {
index += chunkSize;
loadFile();
} else {
// md5.end() 就是文件md5码
var md5Str = md5.end();
return resolve(md5Str);
}
};
fileReader.onerror = () => {
reject('文件MD5获取失败');
};
});
};
当确认该文件的 MD5 在数据库中不存在时,开始触发我们的上传操作
分片上传
MINIO 依赖版本 8.4.3
创建分片上传任务
前端计算文件分片数量
let chunks = Math.ceil(file.file.size / chunkSize);
自定义MINIO CLIENT
我们需要重新写一个 MINIO 客户端来实现我们的分片上传。
/**
* MINIO 遵循 AmazonS3 规则,S3 有的方法他都有实现
* 关于其他方法
* 参考 MINIO 网站
* https://minio-java.min.io/
* 结合 亚马逊官方文档
* https://docs.aws.amazon.com/AmazonS3/latest/API
* 查看方法使用和效果
*/
package com.dm.cloud.utils.minio;
import com.google.common.collect.Multimap;
import io.minio.*;
import io.minio.errors.*;
import io.minio.messages.Part;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CustomMinioClient extends MinioAsyncClient {
/**
* 需要清理的文件时间范围
* 一天
*/
private static int sevenDays = 1000 * 60 * 24;
protected CustomMinioClient(MinioAsyncClient client) {
super(client);
}
//初始化分块上传任务
public String initMultiPartUpload(String bucket, String region, String object, Multimap<String, String> headers, Multimap<String, String> extraQueryParams) throws IOException, InvalidKeyException, NoSuchAlgorithmException, InsufficientDataException, ServerException, InternalException, XmlParserException, InvalidResponseException, ErrorResponseException, ExecutionException, InterruptedException {
CompletableFuture<CreateMultipartUploadResponse> response = this.createMultipartUploadAsync(bucket, region, object, headers, extraQueryParams);
return response.get().result().uploadId();
}
//中止分块上传任务
public String removeMultipartUpload(String bucket, String region, String object,String uploadId, Multimap<String, String> headers, Multimap<String, String> extraQueryParams) throws IOException, InvalidKeyException, NoSuchAlgorithmException, InsufficientDataException, ServerException, InternalException, XmlParserException, InvalidResponseException, ErrorResponseException, ExecutionException, InterruptedException {
CompletableFuture<AbortMultipartUploadResponse> response = this.abortMultipartUploadAsync(bucket, region, object, uploadId,headers, extraQueryParams);
return response.get().uploadId();
}
//TODO 批量清理过期上传任务
//获取不到桶内的上传任务 ListMultipartUploadsResponse result为空 待更新
public void clearMultipartUpload(String bucket, String region, Multimap<String, String> headers, Multimap<String, String> extraQueryParams) throws IOException, InvalidKeyException, NoSuchAlgorithmException, InsufficientDataException, ServerException, InternalException, XmlParserException, InvalidResponseException, ErrorResponseException {
//通过MINIO接口获取桶未完成的上传任务(获取不到上传任务)
CompletableFuture<ListMultipartUploadsResponse> multiUploads = this.listMultipartUploadsAsync(bucket, region,null,null,null,1000,null,null,headers, extraQueryParams);
// System.out.println(multiUploads);
//直接调用 AWS接口 清理过期上传任务(获取不到上传任务)
// Date oneWeekAgo = new Date(System.currentTimeMillis() - sevenDays);
// Credentials creds = this.provider == null ? null : this.provider.fetch();
//
// AWSCredentials awsCredentials = new AWSCredentials() {
// @Override
// public String getAWSAccessKeyId() {
// return creds.accessKey();
// }
//
// @Override
// public String getAWSSecretKey() {
// return creds.secretKey();
// }
// };
//
// AmazonS3Client s3 = new AmazonS3Client(awsCredentials);
// s3.setEndpoint("http://127.0.0.1:9000");
// TransferManager tm = new TransferManager(s3);
// try {
// tm.abortMultipartUploads(bucket, oneWeekAgo);
// } catch (Exception ex) {
// throw new RuntimeException(ex.getMessage());
// }
}
//合并分块文件
public ObjectWriteResponse mergeMultipartUpload(String bucketName, String region, String objectName, String uploadId, Part[] parts, Multimap<String, String> extraHeaders, Multimap<String, String> extraQueryParams) throws IOException, InvalidKeyException, NoSuchAlgorithmException, InsufficientDataException, ServerException, InternalException, XmlParserException, InvalidResponseException, ErrorResponseException, ExecutionException, InterruptedException {
return this.completeMultipartUploadAsync(bucketName, region, objectName, uploadId, parts, extraHeaders, extraQueryParams).get();
}
//列出全部分块文件
public ListPartsResponse listMultipart(String bucketName, String region, String objectName, Integer maxParts, Integer partNumberMarker, String uploadId, Multimap<String, String> extraHeaders, Multimap<String, String> extraQueryParams) throws NoSuchAlgorithmException, InsufficientDataException, IOException, InvalidKeyException, ServerException, XmlParserException, ErrorResponseException, InternalException, InvalidResponseException, ExecutionException, InterruptedException {
return this.listPartsAsync(bucketName, region, objectName, maxParts, partNumberMarker, uploadId, extraHeaders, extraQueryParams).get();
}
}
调用后台接口创建上传任务
...
if (partCount == 1) {
//只有一个分片的情况下 直接返回上传地址
String uploadObjectUrl = MinioUtils.getUploadObjectUrl(MinioUtils.FILE_WAREHOUSE,objectName);
result.setUploadUrl(new ArrayList<String>(){{add(uploadObjectUrl);}});
}else {
Map<String, Object> initRsl = MinioUtils.initMultiPartUpload(MinioUtils.FILE_WAREHOUSE, objectName, partCount, contentType);
result.setFinished(false);
result.setUploadId(initRsl.get("uploadId").toString());
result.setUploadUrl((List<String>)initRsl.get("uploadUrls"));
}
...
其中比较重要的创建 MINIO 上传任务的方法如下
创建单文件上传任务
/**
* 单文件上传
*
* @param objectName 文件全路径名称
* @return /
*/
public static String getUploadObjectUrl(String bucketName, String objectName) {
try {
//创建 MINIO 连接
CustomMinioClient customMinioClient = new CustomMinioClient(MinioAsyncClient.builder()
.endpoint(properties.getUrl())//MINIO 服务地址
.credentials(properties.getAccessKey(), properties.getSecureKey())//用户名和密码
.build());
return customMinioClient.getPresignedObjectUrl(
GetPresignedObjectUrlArgs.builder()
.method(Method.PUT)//GET方式请求
.bucket(bucketName)//存储桶的名字
.object(objectName)//文件的名字
.expiry(1, TimeUnit.DAYS)//上传地址有效时长
.build()
);
} catch (Exception e) {
return null;
}
}
创建分块上传任务
/**
* 创建分块任务
*
* @param bucketName 存储桶名称
* @param objectName 文件全路径名称
* @param partCount 分片数量
* @return /
*/
public static Map<String, Object> initMultiPartUpload(String bucketName,String objectName, int partCount,String contentType) {
Map<String, Object> result = new HashMap<>();
try {
//如果类型使用默认流会导致无法预览
contentType = "application/octet-stream";
HashMultimap<String, String> headers = HashMultimap.create();
headers.put("Content-Type", contentType);
customMinioClient = new CustomMinioClient(MinioAsyncClient.builder()
.endpoint(properties.getUrl())
.credentials(properties.getAccessKey(), properties.getSecureKey())
.build());
checkAsyncBucket(customMinioClient,false,bucketName);
String uploadId = customMinioClient.initMultiPartUpload(bucketName, null, objectName, headers, null);
result.put("uploadId", uploadId);
List<String> partList = new ArrayList<>();
Map<String, String> reqParams = new HashMap<>();
reqParams.put("uploadId", uploadId);
for (int i = 1; i <= partCount; i++) {
reqParams.put("partNumber", String.valueOf(i));
String uploadUrl = customMinioClient.getPresignedObjectUrl(
GetPresignedObjectUrlArgs.builder()
.method(Method.PUT)
.bucket(bucketName)
.object(objectName)
.expiry(1, TimeUnit.DAYS)
.extraQueryParams(reqParams)
.build());
partList.add(uploadUrl);
}
result.put("uploadUrls", partList);
} catch (Exception e) {
return null;
}
return result;
}
校验桶是否存在(不存在则创建)
/**
* 检查是否存在指定桶 不存在则先创建
* @param minioClient
* @param versioning
* @param bucket
* @throws Exception
*/
private static void checkAsyncBucket(MinioAsyncClient minioClient ,boolean versioning, String bucket) throws Exception {
CompletableFuture<Boolean> exists = minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucket).build());
if (exists.isDone() && !exists.get()) {
minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucket).build());
//设置Procy属性 默认所有请求都能读取
String config = "{ " +
" \"Id\": \"Policy1\", " +
" \"Version\": \"2012-10-17\", " +
" \"Statement\": [ " +
" { " +
" \"Sid\": \"Statement1\", " +
" \"Effect\": \"Allow\", " +
" \"Action\": [ " +
" \"s3:ListBucket\", " +
" \"s3:GetObject\" " +
" ], " +
" \"Resource\": [ " +
" \"arn:aws:s3:::"+bucket+"\", " +
" \"arn:aws:s3:::"+bucket+"/*\" " +
" ]," +
" \"Principal\": \"*\"" +
" } " +
" ] " +
"}";
minioClient.setBucketPolicy(
SetBucketPolicyArgs.builder().bucket(bucket).config(config).build());
}
// 版本控制
CompletableFuture<VersioningConfiguration> configuration = minioClient.getBucketVersioning(GetBucketVersioningArgs.builder().bucket(bucket).build());
if(configuration.isDone()) {
boolean enabled = configuration.get().status() == VersioningConfiguration.Status.ENABLED;
if (versioning && !enabled) {
minioClient.setBucketVersioning(SetBucketVersioningArgs.builder()
.config(new VersioningConfiguration(VersioningConfiguration.Status.ENABLED, null)).bucket(bucket).build());
} else if (!versioning && enabled) {
minioClient.setBucketVersioning(SetBucketVersioningArgs.builder()
.config(new VersioningConfiguration(VersioningConfiguration.Status.SUSPENDED, null)).bucket(bucket).build());
}
}
}
分片文件合并
前端全部上传完毕之后,通知后台进行文件合并操作
...
//先判断文件列表是否完整
List<String> partList = MinioUtils.getExsitParts(MinioUtils.FILE_WAREHOUSE, md5, uploadId);
if (CollectionUtils.isNotEmpty(partList)) {
//上传列表不是空 判断上传列表是否完整
if (chuncks.compareTo(partList.size()) < 0) {
//缺少分片
return R.failure("文件分片缺失,请重新上传");
} else {
//分片完整 整合并返回
boolean success = MinioUtils.mergeMultipartUpload(MinioUtils.FILE_WAREHOUSE, md5, uploadId);
if (!success) {
//合并失败
return R.failure("合并文件异常");
}
}
} else {
return R.failure("文件分片缺失,请重新上传");
}
...
获取指定 uploadId 下已上传的分块信息
public static List<String> getExsitParts(String bucketName, String objectName, String uploadId) {
List<String> parts = new ArrayList<>();
try {
/**
* 最大分片1000
*/
customMinioClient = new CustomMinioClient(MinioAsyncClient.builder()
.endpoint(properties.getUrl())
.credentials(properties.getAccessKey(), properties.getSecureKey())
.build());
ListPartsResponse partResult = customMinioClient.listMultipart(bucketName, null, objectName, 1024, 0, uploadId, null, null);
for (Part part : partResult.result().partList()) {
parts.add(part.etag());
}
//合并分片
} catch (Exception e) {
//
log.error("查询任务分片错误");
}
return parts;
}
MINIO 文件合并
/**
* 文件合并
* @param bucketName
* @param objectName
* @param uploadId
* @return
*/
public static boolean mergeMultipartUpload(String bucketName, String objectName, String uploadId) {
try {
Part[] parts = new Part[1000];
/**
* 最大分片1000
*/
customMinioClient = new CustomMinioClient(MinioAsyncClient.builder()
.endpoint(properties.getUrl())
.credentials(properties.getAccessKey(), properties.getSecureKey())
.build());
ListPartsResponse partResult = customMinioClient.listMultipart(bucketName, null, objectName, 1000, 0, uploadId, null, null);
int partNumber = 1;
for (Part part : partResult.result().partList()) {
parts[partNumber - 1] = new Part(partNumber, part.etag());
partNumber++;
}
//合并分片
customMinioClient.mergeMultipartUpload(bucketName, null, objectName, uploadId, parts, null, null);
} catch (Exception e) {
return false;
}
return true;
}
过期任务清理
对于一些异常终止的任务,我们需要清理掉任务及其已经上传的文件,可以使用以下方式
按照任务ID清理
如果知道是哪个任务需要清理,我们可以清指定的上传任务
每个上传任务的 ID 都存在数据库中,对于一些长时间没完成的任务,我们通过 uploadId 去删除
工具类 MinioUtils 中添加
/**
* 删除指定分片上传任务
* @param bucketName
* @param objectName
* @param uploadId
* @return
*/
public static boolean removeMultipartUpload(String bucketName, String objectName, String uploadId) {
try {
/**
* 最大分片1000
*/
customMinioClient = new CustomMinioClient(MinioAsyncClient.builder()
.endpoint(properties.getUrl())
.credentials(properties.getAccessKey(), properties.getSecureKey())
.build());
customMinioClient.removeMultipartUpload(bucketName,null,objectName,uploadId,null,null);
} catch (Exception e) {
return false;
}
return true;
}
MINIO 过时上传任务清理配置(MINIO 最新版本有效)
有一点比较奇怪,我在修改配置并重启程序之前创建的上传任务,不会被清除掉
MINIO 提供了过时上传任务清理功能 ,参数如下:
修改 config.json配置文件 或者 环境变量 均可生效
- 设置文件超时时长(默认是 24h):stale_uploads_expiry(文件) MINIO_API_STALE_UPLOADS_EXPIRY (变量)
- 清理间隔时长(默认是 6h):stale_uploads_cleanup_interval(文件) MINIO_API_DELETE_CLEANUP_INTERVAL (变量)
- 从垃圾箱中永久删除文件的时间间隔(默认是5m): delete_cleanup_interval(文件) MINIO_API_DELETE_CLEANUP_INTERVAL (变量),这两个配置都没有的时候,他会读 MINIO_DELETE_CLEANUP_INTERVAL 这个环境变量的值
我在 windows 系统下修改配置文件如下:
未完成的上传任务会在 5 - 10 分钟内被清理
调用接口批量清理超过一定时间的任务(不能实现)
MINIO 作者表示虽然添加了 ListMultipartUploads 相关接口,但是其返回的是空的内容。
因为在 MINIO 看来这个接口是不需要的,绝大部分想要获取到当前上传任务列表的需求是用来清理一些过期的任务,但是 MINIO 已经提供了自动清理和按照uploadId清理的策略,因此他们不会实现一个完成的 ListMultipartUploads 功能。
相关链接:
https://github.com/minio/minio/issues/13246
https://github.com/minio/minio/issues/11686
设置桶生命周期策略(不能实现)
很遗憾,这样并不能实现,本来以为他和 AWS 一样可以通过设置桶的生命周期中的 AbortIncompleteMultiUpload 属性去实现自动清理过期未完成的任务,但是在尝试了多次之后发现并未能设置该项配置。
-
MINIO API 类中提供了 AbortIncompleteMultiUpload 的配置类,但是好像并不能生效。
-
使用 java-aws-sdk-s3 包提供的接口在代码中直接对桶的生命周期进行设置或者使用 aws-cli 使用命令行对其生命周期进行求改也不能生效,
设置完之后的规则理想情况下通过 getBucketLifecycleConfiguration(bucketName); 返回的结构应该是这样的
{
"Rules": [
{
"Expiration": {
"Days": 3650
},
"AbortIncompleteMultipartUpload": {
"DaysAfterInitiation": 1
},
"ID": "Archive and then delete rule 2",
"Prefix": "",
"Filter": {},
"Status": "Enabled"
}
]
}
实际上返回的结构是这样的,在配置时它自动忽略了 AbortIncompleteMultipartUpload 的配置
{
"Rules": [
{
"Expiration": {
"Days": 3650
},
"ID": "Archive and then delete rule 2",
"Prefix": "",
"Filter": {},
"Status": "Enabled"
}
]
}
在下载了 MINIO 最新的代码后所有发现确实不支持