第3.1章:StarRocks数据导入--Insert into

2年前 (2022) 程序员胖胖胖虎阿
410 0 0

Insert Into是我们在MySQL中常用的导入方式,StarRocks同样也支持使用Insert into的方式进行数据导入,并且每次insert into操作都是一次完整的导入事务。

在StarRocks中,Insert的语法和MySQL等数据库的语法类似,具体可以参考官网文档:

Insert Into 导入 @ InsertInto @ StarRocks Docs第3.1章:StarRocks数据导入--Insert intohttps://docs.starrocks.com/zh-cn/main/loading/InsertInto

INSERT @ insert @ StarRocks Docs第3.1章:StarRocks数据导入--Insert intohttps://docs.starrocks.com/zh-cn/main/sql-reference/sql-statements/data-manipulation/insert

在StarRocks中,Insert into的语法虽然简单,但是使用不当就很容易有问题,尤其是高频率小数据量的insert,非常非常的不推荐。社区不时有同学反馈使用JDBC或者直接insert into导入时,插入1000条左右时很快有类似报错:close index channel failed,然后StarRocks的同学就会答疑说这是导入太频繁了,要降频率攒批导入。这里咱们就浅层次的解释一下为什么会报错、为什么要降频率以及如何恰当的使用insert。

1、高频insert为什么报错

首先,先看一下StarRocks存储引擎中的数据组织图:

第3.1章:StarRocks数据导入--Insert into

 在2.4章,我们已经介绍了Table->Partition->Tablet的关系,这里简单再复述一遍。

Table即为我们用Create语句创建的表,在表中我们通常会按照日期进行分区,即为一个个的Partition,分区是逻辑概念,只记录在表的元数据中。分区后,每个分区的数据又会按照建表语句中的分桶键进行Hash分桶,分桶就可以认为是物理概念了,表中的数据在经过分区分桶后,就会形成一个个的Tablet,尽量均匀的分布在集群的所有BE中。

在BE的storage/data中,我们可以看到很多数字的文件夹,文件夹中就是数据表经过分区分桶后形成的一个个Tablet,这也是为什么我说分桶可以简单的认为是物理概念。Tablet是StarRocks中数据均衡的最小单位,包括咱们常说的三副本,也是说同一个Tablet会在集群中共保留三份。各个Tablet之间的数据没有交集,在物理上独立存储。集群中不管是副本修复还是磁盘均衡,也都是以Tablet为单位移动或者克隆的。同样,我们每次的数据导入、删除或者更新,本质上也是对一个个Tablet中的数据进行操作。

复述完这些基本概念,我们再说说之前没提到过的Rowset和Segment。一个Tablet中包含若干个连续的RowsetRowset是逻辑概念,代表Tablet中一次数据变更的数据集合(数据变更包括了数据新增、删除、更新等),Rowset按版本信息进行记录,每次变更会生成一个版本。一个Rowset可能包含多个Segment,Segment可以认为是Rowset中的数据分段,在导入时,每完成写入一个Segment就会增加一个文件块对应。

Segment的概念比较底层,我们先不需要深入了解,这里咱们主要关注Rowset的概念。上面加粗的部分我们可以转换为更直观的语句来帮助理解:我们每次的导入操作,本质上就是对该表中涉及的到的那部分Tablet进行数据新增的变更操作。而对于某个被涉及到Tablet来说,每次新增操作都会在Tablet中生成一个新版本的Rowset。

说到这里,终于和前面的insert报错有关联了。开篇我们就提到,每次insert into对StarRocks来说都是一次完整的导入事务,那么对于导入涉及到的Tablet来说,每次insert into实际上都会在Tablet内部生成一个个连续版本号的Rowset,比如[0-1]、[2-2]、[3-3]……[9999-9999],每个版本的版本号不会重叠,而一旦某张表的某个Tablet中的Rowset个数达到1000(注意这里说的是同时存在的Rowset个数,而不是Rowset版本号),就会达到阈值,触发报错:close index channel failed。

2、为什么建议降低导入频率

解释完为什么报错,新的问题又来了,为什么Tablet中Rowset的版本个数太多了StarRocks就要主动报错,是有什么影响吗?确实是有,影响主要有两方面,首先是内存的占用,当Rowset的版本数过多,BE节点table_meta部分(主要是其中的RowSet元数据部分)占用的内存就可能非常多,同时Compaction消耗内存也会比较大(这个概念后面马上介绍),容易引起OOM,影响集群稳定性。其次就是查询会变慢,因为查询的过程中是需要对Tablet中的数据进行解压的,当Rowset版本很多,解压会变慢,导致查询Scan的耗时增加。综上考虑,StarRocks设置了单表中每个Tablet最大阈值为1000的限制。

既然导入的过程中会不断增加Rowset版本,版本数过多还会出现问题,那么StarRocks是怎么解决的呢?这就又得引出Compaction的概念,简单来说,Compaction可以认为是一个常驻线程,来不断的对Tablet中的Rowset版本进行合并,合并后的Rowset就会像这样:

第3.1章:StarRocks数据导入--Insert into

 Compaction也分为两类,一类是cumulative compaction,是将多个最新导入的Rowset合并成较大的Rowset,例如上图中的右半部分。另一类是base compaction,是将cumulative compaction产生的rowset合入到start version为0的基线数据版本中,例如上图左侧的部分,base compaction的开销会比较大。

在StarRocks集群运行时,对表的各类数据变更操作不断产生新版本的Rowset,后台Compaction进程来不断合并Rowset,进而保证集群整体稳定高效。

到此,终于,我们可以解释为什么快速insert into会产生报错,那就是因为生成Rowset版本的速度太快,而合并速度跟不上,导致累积版本超过了1000,进而触发阈值报错。为了保证集群的稳定运行以及查询效率,我们就要保证整体的Compaction效率要大于Rowset生成速率。

首先,我们能不能通过加快Compaction的速度来解决问题呢?一定程度上也可以,部分场景下我们也会通过调整Compaction的几个参数来加速Compaction,例如在be.conf中配置以下参数(配置后需重启BE):

cumulative_compaction_num_threads_per_disk = 4

base_compaction_num_threads_per_disk = 2

cumulative_compaction_check_interval_seconds = 2

但是通常也不推荐,因为加速Compaction的同时内存占用和磁盘IO也可能会大大增加,而且有时也是治标不治本,所以还是得从数据导入频率这个根源上入手。

理论上,我们的每次导入操作,不论是只导入一条还是十万、百万条,对于StarRocks来说,都是只生成一个新的Rowset版本。那么在Compaction效率有限的情况下,我们完全可以通过“攒微批+降频率”来规避Rowset版本过多的问题。实际上,若业务实时性要求不高,在机器内存充足的情况下,攒批越大、导入频率越低,对StarRocks集群的稳定性及查询性能的影响就越小。

3、Insert推荐用法

那么Insert into要怎么使用才合适呢?我们可以概括总结为以下几点:

1、高频率小数据的insert into,或者JDBC的executeUpdate()方法,就完全不要用了!

2、低频率小数据的insert into,像偶尔导入几条测试数据什么的,可以用,但注意频率;

3、低频率较大数据的insert into values(data1),(data2)……(data10086)或者类似意义的JDBC executeBatch()方法,可以用,但不推荐,因为有更快的实现方式;

4、insert into select,推荐使用,可以用来在StarRocks内部进行ETL,或者利用外部表的方式来便捷的导入数据(需留意拖库的情况)。

前面提到,对比JDBC的executeBatch(),在StarRocks中有更快的攒批导入方式,就是Stream Load,官网也给出了Java方式实现Stream Load的Demo链接:

https://github.com/StarRocks/demo/tree/master/MiscDemo/stream_load第3.1章:StarRocks数据导入--Insert intohttps://github.com/StarRocks/demo/tree/master/MiscDemo/stream_load

Stream Load可以说是StarRocks最核心的导入实现方式,后面咱们介绍的Routine Load、DataX StarRocksWriter以及Flink Connector,其底层实现都可以说是基于Stream Load的攒微批导入,在本章节的后续介绍中,咱们也会对StarRocks的主要导入方式逐个进行介绍。

4、Insert使用与调优

4.1严格模式

Insert into是一种同步的导入方式,导入成功会直接显示导入结果,我们简单举几个例子,使用mysql client访问StarRocks:

[root@node01 ~]# mysql –h192.168.110.101 -P9030 -uroot –proot

创建演示用的数据库和数据表:

mysql> CREATE DATABASE IF NOT EXISTS starrocks;

mysql> USE starrocks;

mysql> CREATE TABLE IF NOT EXISTS starrocks.`user_status` (

  `uid` int(11) NOT NULL COMMENT "",

  `act_time` date NOT NULL COMMENT "",

  `status` varchar(50) NULL COMMENT ""

)

DISTRIBUTED BY HASH(`uid`) BUCKETS 3

PROPERTIES (

"replication_num" = "1"

);

导入几条演示数据:

mysql> insert into user_status values(1001,'2021-03-11','shopping'),(1002,'2021-03-12','star');

Query OK, 2 rows affected (0.04 sec)

{'label':'insert_e62d70c1-7b3a-11ec-96f6-02427399a38c', 'status':'VISIBLE', 'txnId':'13015'}

导入成功后会显示导入的行数、任务自动生成的本数据库内的唯一label、数据可见状态以及事务ID,根据show语句查看label我们还可以查看到insert任务的具体信息:

mysql> SHOW LOAD WHERE label="insert_e62d70c1-7b3a-11ec-96f6-02427399a38c"\G

*************************** 1. row ***************************

         JobId: 24022

         Label: insert_e62d70c1-7b3a-11ec-96f6-02427399a38c

         State: FINISHED

      Progress: ETL:100%; LOAD:100%

          Type: INSERT

       EtlInfo: NULL

      TaskInfo: cluster:N/A; timeout(s):3600; max_filter_ratio:0.0

      ErrorMsg: NULL

    CreateTime: 2022-01-22 12:22:24

  EtlStartTime: 2022-01-22 12:22:24

 EtlFinishTime: 2022-01-22 12:22:24

 LoadStartTime: 2022-01-22 12:22:24

LoadFinishTime: 2022-01-22 12:22:24

           URL:

    JobDetails: {"Unfinished backends":{},"ScannedRows":0,"TaskNumber":0,"All backends":{},"FileNumber":0,"FileSize":0}

当然,我们也可以在insert中自定义label,例如:

mysql> insert into user_status with label my_label_0001 values(1003,'2021-03-13','pay'),(1004,'2021-03-14','star');

Query OK, 2 rows affected (0.14 sec)

{'label':'my_label_0001', 'status':'VISIBLE', 'txnId':'13016'}

如果导入失败,insert也会返回错误信息,例如我们导入错误时间格式的数据(数据漏加引号):

mysql> insert into user_status values(1005,'2021-03-11','shopping'),(1006,2021-03-12,'star');

ERROR 1064 (HY000): Insert has filtered data in strict mode, tracking_url=http://192.168.110.101:8040/api/_load_error_log?file=__shard_2/error_log_insert_stmt_89ff158a-7b3d-11ec-96f6-02427399a38d_89ff158a7b3d11ec_96f602427399a38d

这里有两处需要我们注意,一个是strict mode,另一处是tracking_url。

先说tracking_url,使用Web或者curl命令访问tracking_url,我们可以看到更详细的错误信息:

[root@node01 ~]# curl http://192.168.110.101:8040/api/_load_error_log?file=__shard_2/error_log_insert_stmt_89ff158a-7b3d-11ec-96f6-02427399a38d_89ff158a7b3d11ec_96f602427399a38d

Reason: null value for not null column, column=act_time. src line: [];

能看出是因为格式不对,强转为null引起的问题,由此我们就可以去排查数据格式。

接下来就是strict mode,在开篇我们提到过,每次insert into操作都是一次导入事务,事务的定义大家并不陌生,简单说就是这次导入操作要么整批成功,要么整批失败,不会说存在一次导入操作有的数据导入成功有的不成功的情况。实际业务中,我们的数据可能不是那么的规整,那么通常我们在导入时就会设置一个容错率来忽视一定比例的脏数据,StarRocks的主要导入方式中都有容错类的参数,但insert方式是没法处理容错率这个逻辑的,所以StarRocks设计了严格模式参数enable_insert_strict

enable_insert_strict:当该参数为false时,表示一次insert任务只要有一条或以上数据被正确导入,就返回成功。当该参数设置为true时,表示但凡有一条数据错误,则任务整体失败。该参数默认为true。可通过SET enable_insert_strict = false; 来设置。

例如这里我们设置为false:

mysql> SET enable_insert_strict = false;

Query OK, 0 rows affected (0.01 sec)

再次执行上面失败的insert语句:

mysql> insert into user_status values(1005,'2021-03-11','shopping'),(1006,2021-03-12,'star');

Query OK, 1 row affected, 1 warning (0.03 sec)

{'label':'insert_346cbbcd-7b3f-11ec-96f6-02427399a38c', 'status':'VISIBLE', 'txnId':'13019'}

导入成功,不同的是返回值中显示有1 warning,这里的rows affected表示总共有多少行数据被导入,而warnings表示被过滤的行数。我们还可以通过show语句和label查看任务详情:

mysql> SHOW LOAD WHERE label="insert_346cbbcd-7b3f-11ec-96f6-02427399a38c"\G

*************************** 1. row ***************************

         JobId: 24024

         Label: insert_346cbbcd-7b3f-11ec-96f6-02427399a38c

         State: FINISHED

      Progress: ETL:100%; LOAD:100%

          Type: INSERT

       EtlInfo: NULL

      TaskInfo: cluster:N/A; timeout(s):3600; max_filter_ratio:0.0

      ErrorMsg: NULL

    CreateTime: 2022-01-22 12:53:13

  EtlStartTime: 2022-01-22 12:53:13

 EtlFinishTime: 2022-01-22 12:53:13

 LoadStartTime: 2022-01-22 12:53:13

LoadFinishTime: 2022-01-22 12:53:13

           URL: http://192.168.110.101:8040/api/_load_error_log?file=__shard_3/error_log_insert_stmt_346cbbcd-7b3f-11ec-96f6-02427399a38d_346cbbcd7b3f11ec_96f602427399a38d

    JobDetails: {"Unfinished backends":{},"ScannedRows":0,"TaskNumber":0,"All backends":{},"FileNumber":0,"FileSize":0}

可以看到任务整体成功,但也会打印错误信息,打开URL:

[root@node01 ~]# curl http://192.168.110.101:8040/api/_load_error_log?file=__shard_3/error_log_insert_stmt_346cbbcd-7b3f-11ec-96f6-02427399a38d_346cbbcd7b3f11ec_96f602427399a38d

Reason: null value for not null column, column=act_time. src line: [];

报错和前面的相同,也就是说,关闭严格模式后,insert时虽然有错误数据,但只要有数据是正常可用的,就会忽视脏数据,保证可用数据的正常导入。enable_insert_strict参数是session参数,执行SET enable_insert_strict = false\true;后会立刻生效,但断开当前连接后就会失效,若需要全局修改,可以加上global,例如全局关闭:

mysql> SET global enable_insert_strict = false;

全局关闭后是从下一个session生效,当前session是不生效的,所以通常我们也会两者搭配使用。

4.2并行度

Insert导入方式也可以通过简单的调优来加速,最直接相关的参数就是并行度。insert导入语句本质上还是一句sql,所以我们可以通过设置合适的并行度来进行加速。在一般场景中,我们通常建议设置全局并行度为单个BE节点CPU合数的一半,例如我部署BE的服务器是16C的CPU,那么全局并行度我可以设为8:

mysql> set global parallel_fragment_exec_instance_num = 8;

全局变量还是会在下个session才生效,为了立刻生效,我们可以再执行:

mysql> set parallel_fragment_exec_instance_num = 8;

有些场景下,使用insert into并不是说越快越好,例如insert into select语句进行内部ETL或者通过外部表拉取数据,当速度过快时,就可能导致源库压力过大,影响源库中的业务,也或者,会导致StarRocks BE的load内存和ColumnPool内存占用较高,影响集群稳定性,所以一定要结合实际情况,来设置合适的并行度控制导入速率。

4.3超时时间

与insert相关的超时参数有两个:

  • insert_load_default_timeout_second

fe.conf中的参数,表示导入任务的超时时间(以秒为单位),默认为3600秒(1小时)。导入任务在设定的超时时间内未完成则会被系统取消,变成CANCELLED。目前Insert Into并不支持自定义导入的timeout时间,所有Insert Into导入的超时时间是统一的,如果导入任务无法在规定时间内完成,则可以调大该参数。

  • query_timeout

Session变量,默认为300秒,可以通过show variables like '%query_timeout%';查看。Insert Into本身也是一个SQL命令,因此Insert Into语句也受到Session变量query_timeout的限制。可以通过SET query_timeout = xxx;来增加超时时间,单位是秒,同样,也可以通过增加global关键词让设置全局生效。

版权声明:程序员胖胖胖虎阿 发表于 2022年9月3日 上午3:00。
转载请注明:第3.1章:StarRocks数据导入--Insert into | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...