概述
本文是“搭建自己的以图搜图系统”系列的第二篇,在第一篇内容中我们了解了如何利用“机器学习框架 Towhee ¹”和“向量数据库 Milvus ²”快速搭建一个以图搜图的服务原型。那么,如何搭建一个生产级别的服务呢?在真实的业务场景中,我们常常碰到这些技术难题:
- 海量数据的情况下系统延迟高,硬件资源成为瓶颈。
- 系统在一种数据集下召回效果不错,但在另一种数据集下召回效果却很差,想要尝试多种模型或者自己训练,但上手成本都很高。
- 系统容错率低,当我们批量处理图片时,如果存在坏数据系统很容易崩溃。
接下来本文会从性能、模型和业务流程方面讨论如何解决这些痛点,从而优化我们的以图搜图系统,最后会介绍如何使用 FastAPI 实现简单高效的 Web 服务。
本文中的相关代码已上传到 GitHub,欢迎大家参考和使用:https://github.com/towhee-io/...
性能优化
要想提升性能,“堆机器”无疑是最便捷的方式,但在有限的资源下,我们如何充分发挥算力优势呢?一般情况下,我们会采取下面几种方案:并行处理,充分发挥资源性能;数据降维,降低计算复杂度;向量索引,使用近邻搜索的算法加速向量检索。
下文中的代码基本上都来自于以图搜图系列的第一篇内容,如果你想了解更详细的内容,可以移步:《搭建自己的以图搜图系统(一):10 行代码搞定以图搜图》。
并行处理
基于 Towhee 的以图搜图 AI 流水线支持使用并行执行的方式,来提升性能,在下面的代码中,我们只需要简单地调用 set_parallel
方法,就可以并发处理数据。下面的例子演示了如何并行处理图片数据:
import towhee
dc = (
towhee.read_csv('reverse_image_search.csv')
.runas_op['id', 'id'](func=lambda x: int(x))
.set_parallel(3) #3并发处理数据
.image_decode['path', 'img']()
.image_embedding.timm['img', 'vec'](model_name='resnet50')
.tensor_normalize['vec', 'vec']()
.to_milvus['id', 'vec'](collection=collection, batch=100)
)
在上面的例子中,我们设置了 3 个并发来处理数据,我们将会得到 2~3 倍的性能提升。
数据降维
计算高维向量的硬件成本很高,举个例子, ResNet50 模型生成的 Embedding 向量维度是 2048,如果图像数据规模为一亿,那么内存占用大约是 768 GB(4 Bytes 2048 100000000),我们也遇到了不少企业,他们的数据量级都在百亿级别,那么就需要约 76800 GB(75 TB) 内存,那么针对向量数据进行降维就很有必要了,因为数据的计算量降低,性能会有提升。降维的方法有很多种,如 PCA、SVD 和 UMAP 等,我们以最简单的随机投影为例,在图片入库之前将向量维度从 2048 维降低到 512 维:
import numpy as np
projection_matrix = np.random.normal(scale=1.0, size=(2048, 512))
def dim_reduce(vec):
return np.dot(vec, projection_matrix)
dc = (
towhee.read_csv('reverse_image_search.csv')
.runas_op['id', 'id'](func=lambda x: int(x))
.image_decode['path', 'img']()
.image_embedding.timm['img', 'vec'](model_name='resnet50')
.runas_op['vec','vec'](func=dim_reduce) #向量数据降维
.tensor_normalize['vec', 'vec']()
.to_milvus['id', 'vec'](collection=collection, batch=100)
)
随机投影是欧几里得空间中向量的降维方法,这种方法速度快且无需训练,示例中 dim_reduce
函数实现了该方法,并通过 runas_op
算子将其应用到 ResNet50 模型提取的向量数据中。
- 向量索引
在图片检索阶段,我们可以使用面向 AI 的向量数据库 Milvus 对大规模的向量数据进行相似度检索,Milvus 支持多种 ANN 索引³用于加速,包括基于量化的索引, 基于图的索引和基于树的索引等。我们可以创建合适的索引用于近邻搜索,值得一提的是 IVF_SQ8 索引,它不光通过聚类支持快速查找,还可以压缩数据,减少 70-75% 的内存。 - 使用 GPU 加速
不得不说模型加速最好的方法就是指定 GPU(前提是要有),当我们使用 Towhee 算子中的 AI 模型进行特征向量提取,Towhee 框架将会自动根据 GPU 是否可用,来启用 GPU 进行数据处理,也就是说当cuda.is_aviliable=True
就会使用 GPU 。
模型优化
随着 AI 技术的不断发展,CV(Computer Vision) 领域出现了越来越多的算法模型,从 VGG 到 ResNet 再到 Transformer,模型不断升级,那么哪一个模型才是最适合我们的呢?实践是检验真理的唯一标准,最简单的方式是在自己的数据集下试用各种预训练好的模型选最优,除此之外,我们也可以自己训练模型。
模型选型
Towhee 的 image-embedding ⁴算子涵盖了市面上主流的各种模型,通过修改算子参数,可以轻松调用任何模型,而无需额外的折腾。此外 Towhee 还提供关于 Recall、HR 和 mAP 等指标的计算和报告,我们可以基于自己的数据集来对比不同模型的指标结果,从而帮助选择最优的模型。
例如我们指定三个模型 (VGG16, resnet50 和efficientnet-b2) 进行测试并对比,先将图像数据集入库,然后搜索测试图片并返回搜索结果的准确率报告:
model_dim = { #模型与生成向量维度的字典
'vgg16': 4096,
'resnet50': 2048,
'tf_efficientnet_b2': 1408
}
for model in model_dim:
collection = create_milvus_collection(model, model_dim[model])
dc = (towhee.read_csv('reverse_image_search.csv')
.runas_op['id', 'id'](func=lambda x: int(x))
.image_decode['path', 'img']()
.image_embedding.timm['img', 'vec'](model_name=model)
.tensor_normalize['vec', 'vec']()
.to_milvus['id', 'vec'](collection=collection, batch=100)
) #图像数据入库
(towhee.glob['path']('./test/*/*.JPEG')
.image_decode['path', 'img']()
.image_embedding.timm['img', 'vec'](model_name=model)
.tensor_normalize['vec', 'vec']()
.milvus_search['vec', 'result'](collection=collection, limit=10)
.runas_op['path', 'ground_truth'](func=ground_truth) #获取测试数据的 ground truth
.runas_op['result', 'result'](func=lambda res: [x.id for x in res]) #获取搜索结果的 id
.with_metrics(['mean_hit_ratio', 'mean_average_precision']) #指定 HR 和 mAP 两个指标
.evaluate['ground_truth', 'result'](model) #将结果 id 和 ground truth 比较
.report()
) #检索图像并返回指标报告
当然,也可以用使用自己的模型而不是 Towhee 的内置算子,下面以使用 Transformer 模型为例,首先定义 vit_embedding
函数用于提取特征向量,然后通过 runas_op
应用此函数,最后和上面的代码类似,用于计算指定的指标。
feature_extractor = ViTFeatureExtractor.from_pretrained('google/vit-large-patch32-384')
model = ViTModel.from_pretrained('google/vit-large-patch32-384')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
def vit_embedding(img):
img = to_image_color(img, 'RGB')
inputs = feature_extractor(img, return_tensors="pt")
outputs = model(inputs['pixel_values'].to(device))
return outputs.pooler_output.detach().cpu().numpy().flatten()
collection = create_milvus_collection('huggingface_vit', 1024)
dc = (towhee.read_csv('reverse_image_search.csv')
.runas_op['id', 'id'](func=lambda x: int(x))
.image_decode['path', 'img']()
.image_embedding.timm['img', 'vec'](model_name=model)
.tensor_normalize['vec', 'vec']()
.to_milvus['id', 'vec'](collection=collection, batch=100)
)
(towhee.glob['path']('./test/*/*.JPEG')
.image_decode['path', 'img']()
.image_embedding.timm['img', 'vec'](model_name=model)
.tensor_normalize['vec', 'vec']()
.milvus_search['vec', 'result'](collection=collection, limit=10)
.runas_op['path', 'ground_truth'](func=ground_truth)
.runas_op['result', 'result'](func=lambda res: [x.id for x in res])
.with_metrics(['mean_hit_ratio', 'mean_average_precision'])
.evaluate['ground_truth', 'result'](model)
.report()
)
以上四个模型的准确率情况如下图所示,可见在本文数据集中 ViT-large 模型的准确度更高。
模型训练
除了使用这些预训练好的模型,我们还可以基于自己的数据集进行模型训练,Towhee 也提供了模型训练接口,你可以尝试训练任意的模型算子。
流程优化
在稳定性方面,我们希望系统既健壮又可靠,不至于碰到异常就崩溃;在业务方面,我们希望可以定制化流水线,不同的流水线用于处理不同的业务。
异常处理
当以图搜图系统处理大规模数据时,如果其中存在损坏的图像或格式错误的图像,这很可能导致程序中断,但我们又很难清理掉所有出现的坏数据,该怎么办呢? Towhee 提供 exception_safe
接口能够确保出现异常时继续执行。
(towhee.glob['path']('./exception/*.JPEG')
.exception_safe() #异常安全
.image_decode['path', 'img']()
.image_embedding.timm['img', 'vec'](model_name='resnet50')
.tensor_normalize['vec', 'vec']()
.milvus_search['vec', 'result'](collection=resnet_collection, limit=5)
.runas_op['result', 'result_img'](func=read_images)
.drop_empty() #清除坏数据
.select['img', 'result_img']()
.show()
)
上面的例子中,我们使用了四张图片作为输入数据,其中包含一张损坏的图片,由于我们在流水线中加入了 exception_safe
,所以程序不会发生中断,仅仅是打印了错误信息。
增加目标检测
根据不同的业务场景我们可以定制不同的流水线,比如在商品推荐场景中,我们更关注图像包含的商品,那么可以在流水线中加上 Towhee 的目标检测⁵算子,用于检测图像中的商品。代码如下所示,具体原理是先利用 get_object
返回图像所有目标中面积最大的物体(没有目标将返回图像本身),然后针对找到的目标物体进行特征提取。
def get_object(img, boxes):
if len(boxes) == 0:
return img
max_area = 0
for box in boxes:
x1, y1, x2, y2 = box
area = (x2-x1)*(y2-y1)
if area > max_area:
max_area = area
max_img = img[y1:y2,x1:x2,:]
return max_img
(towhee.glob['path']('./object/*.jpg')
.image_decode['path', 'img']()
.object_detection.yolov5['img', ('boxes', 'class', 'score')]() #目标检测算子
.runas_op[('img', 'boxes'), 'object'](func=get_object)
.image_embedding.timm['object', 'object_vec'](model_name='resnet50')
.tensor_normalize['object_vec', 'object_vec']()
.milvus_search['object_vec', 'object_result'](collection=yolo_collection, limit=3)
.runas_op['object_result', 'object_result_img'](func=read_images)
.select['img', 'result_img', 'object_result_img']()
.show()
)
在进行有无目标检测的结果对比之后,我们可以发现一个有意思的事情:在某些情况下,目标检测后的结果更优。以下图中每一行的结果为例,从左至右是我们输入的检索图片,和三张图搜的结果、进行目标检测后搜索的结果。第一行在加上目标检测后才能找到车相关的图片,否则都是蜘蛛,我们可以推测 ResNet 把待检索图片中的树枝理解成了蜘蛛,但是在 Towhee 流水线中应用上目标检测就可以解决这个问题。
FastAPI 部署
上一篇文章中我们介绍了利用 Gradio 部署图像搜索的服务,这次将展示如何利用 FastAPI ⁶来提供 Web 服务,FastAPI 是目前主流的基于 Python 的 Web 框架之一。我们先创建一个 FastAPI 实例 app
,然后将这个实例 app
绑定到 Towhee 的流水线中,app_search
提供了基于 FastAPI 的图像检索服务,最后我们设置好服务端口和地址,就能够得到一个在线服务了。
from fastapi import FastAPI
import uvicorn
import nest_asyncio
app = FastAPI()
with towhee.api['file']() as api:
app_search = (
api.image_load['file', 'img']()
.image_embedding.timm['img', 'vec'](model_name='resnet50')
.tensor_normalize['vec', 'vec']()
.milvus_search['vec', 'result'](collection=milvus_collection)
.runas_op['result', 'res_file'](func=lambda res: str([id_img[x.id] for x in res]))
.select['res_file']()
.serve('/search', app) #绑定流水线到 app,接口为 /search
)
nest_asyncio.apply()
uvicorn.run(app=app, host='0.0.0.0', port=8000)
类似的,参考 towhee-io/examples 我们可以完成 /load
和 /count
两个接口的创建,在所有工作就绪之后,我们就能够在浏览器中打开 http://0.0.0.0:8000/docs 来体验拥有更高性能的以图搜图服务应用了。
总结
相信在跟随本文耐心实践之后,你一定可以得到一个性能颇高、生产可用的“以图搜图系统”。当然,如果你愿意的话,也可以结合本文的例子,对其他的非结构化数据和项目(音视频)进行分析优化,原理是相通的。欢迎留言讨论,或者给我们的项目提出改进建议(https://github.com/towhee-io/...)。
下一篇文章中,我们将分析如何“打包 AI 流水线”,通过使用 Docker 来完成系统的快速搭建和完整数据迁移。
相关资料:
[1] https://towhee.io/
[2] https://milvus.io/
[3] https://milvus.io/docs/v2.1.x...
[4] https://towhee.io/tasks/detai...
[5] https://towhee.io/tasks/detai...
[6] https://fastapi.tiangolo.com/
如果你觉得我们分享的内容还不错,请不要吝啬给我们一些鼓励:点赞、喜欢或者分享给你的小伙伴!
活动信息、技术分享和招聘速递请关注:https://zilliz.gitee.io/welcome/
如果你对我们的项目感兴趣请关注:
用于存储向量并创建索引的数据库 Milvus
用于构建模型推理流水线的框架 Towhee