openGauss数据库源码解析系列文章——openGauss简介(二)

2年前 (2022) 程序员胖胖胖虎阿
294 0 0

上一篇openGauss数据库源码解析系列文章--openGauss简介(上)中,从openGauss概述、应用场景、系统结构、代码结构四个方面对openGauss进行了初步介绍。其中,openGauss的代码结构介绍了数据库系统通信管理、SQL引擎两方面内容,本篇接着从代码结构第三方面的内容——存储引擎,以及openGauss的价值特性方面展开介绍。

(三)存储引擎

openGauss存储引擎是可插拔、自组装的,支持多个存储引擎来满足不同场景的业务诉求,目前支持行存储引擎、列存储引擎和内存引擎。

早期计算机程序通过文件系统管理数据,到了20世纪60年代这种方式就开始不能满足数据管理要求了,用户逐渐对数据并发写入的完整性、高效检索提出更高的要求。由于机械磁盘的随机读写性能问题,从20世纪80年代开始,大多数数据库一直在围绕着减少随机读写磁盘进行设计。主要思路是把对数据页面的随机写盘转化为对WAL(Write Ahead Log,预写式日志)日志的顺序写盘,WAL日志持久化完成,事务就算提交成功,数据页面异步刷盘。但是随着内存容量变大、保电内存、非易失性内存的发展,以及SSD技术逐渐的成熟,IO性能极大提高,经历了几十年发展的存储引擎需要调整架构来发挥SSD的性能和充分利用大内存计算的优势。随着互联网、移动互联网的发展,数据量剧增,业务场景多样化,一套固定不变的存储引擎不可能满足所有应用场景的诉求。因此现在的DBMS需要设计支持多种存储引擎,根据业务场景来选择合适的存储模型。

1. 数据库存储引擎要解决的问题

  • 存储的数据必须要保证ACID:原子性、一致性、隔离性、持久性。
  • 高并发读写,高性能。
  • 数据高效存储和检索能力。

2. openGauss存储引擎概述

openGauss整个系统设计支持多个存储引擎来满足不同场景的业务诉求。当前openGauss存储引擎有以下3种:

  • 行存储引擎。主要面向OLTP(Online Transaction Processing,在线交易处理)场景设计,例如订货发货,银行交易系统。
  • 列存储引擎。主要面向OLAP(Online Analytical Processing,联机分析处理)场景设计,例如数据统计报表分析。
  • 内存引擎。主要面向极致性能场景设计,例如银行风控场景。

创建表的时候可以指定行存储引擎、列存引擎的表、内存引擎的表,支持一个事务里包含对3种引擎表的DML(Data Manipulation Language,数据操作语言)操作,可以保证事务ACID性质。

1) storage源码组织

storage源码目录为:/src/gausskernel/storage。storage源码文件如表1所示。

表1 storage源码文件 |storage |storage源码文件 | |--|--| |access | 基础行存储引擎方法cbtree、hash、heap、index、... | | buffer | 缓冲区| | freespace |空闲空间管理 | |ipc | 进程内交互 | | large_object |大对象处理 | | remote |远程读 | | replication | 复制备份| |smgr | 存储管理 | | cmgr | 公共缓存方法 | |cstore | 列存储引擎 | |dfs | 分布式文件系统| | file | 文件类 | | lmgr | 锁管理 | | mot | 内存引擎 | | page | 数据页 |

2) storage主流程

storage主流程代码如下。

/* smgr/smgr.cpp, 存储管理 */
...
/* 文件管理函数列表,包含磁盘初始化、开关、同步等操作函数 */
static const f_smgr g_smgrsw[] = {   
 /* 磁盘*/    
 {mdinit, 
        NULL, 
       mdclose, 
       mdcreate, 
       mdexists, 
       mdunlink, 
       mdextend, 
       mdprefetch, 
       mdread, 
       mdwrite, 
       mdwriteback, 
       mdnblocks, 
       mdtruncate,  
       mdimmedsync, 
       mdpreckpt,  
       mdsync, 
       mdpostckpt,
       mdasyncread, 
       mdasyncwrite}};
 /* 
 *  存储管理初始化 * 
 * 当服务器后端启动时调用 */
 * void smgrinit(void)
 * {    int i;
 * /* 初始化所有存储相关管理器 */    
 * for (i = 0; i < SMGRSW_LENGTH; i++) {
 if (g_smgrsw[i].smgr_init) {            (*(g_smgrsw[i].smgr_init))();        }    }    
  /* 登记存储管理终止程序 */  
    if (!IS_THREAD_POOL_SESSION) {        
    on_proc_exit(smgrshutdown, 0);   
     }}/* 
     * 当后端服务关闭时,执行存储管理关闭代码 */
     * static void smgrshutdown(int code, Datum arg){    int i;
     * /* 关闭所有存储关联服务 */
     *     for (i = 0; i < SMGRSW_LENGTH; i++) { 
      if (g_smgrsw[i].smgr_shutdown) {  
     (*(g_smgrsw[i].smgr_shutdown))(); 
     } 
      }
      }

3. 行存储引擎

openGauss的行存储引擎设计上支持MVCC(Multi-Version Concurrency Control,多版本并发控制),采用集中式垃圾版本回收机制,可以提供OLTP业务系统的高并发读写要求。支持存储计算分离架构,存储层异步回放日志。如图1所示。

openGauss数据库源码解析系列文章——openGauss简介(二)

|图1 行存储架构|

行存储引擎的关键技术有:

  • 基于CSN(Commit Sequence Number,待提交事务的序列号,它是一个64位递增无符号数)的MVCC并发控制机制,集中式垃圾数据清理。
  • 并行刷日志,并行恢复。传统数据库一般都采用串行刷日志的设计,因为日志有顺序依赖关系,例如一个是事务产生的redo/undo log是有前后依赖关系的。openGauss的日志系统采用多个logwriter线程并行写的机制,充分发挥SSD的多通道IO能力。
  • 基于大内存设计的Buffer manager。

行存储buffer主流程代码如下。

/* buffer/bufmgr.cpp, 基础行存储管理 */
...
/* 查找或创建一个缓冲区 */
Buffer ReadBufferExtended(
    Relation reln, ForkNumber fork_num, BlockNumber block_num, ReadBufferMode mode, BufferAccessStrategy strategy)
{
    bool hit = false;
    Buffer buf;

    if (block_num == P_NEW) {
        STORAGE_SPACE_OPERATION(reln, BLCKSZ);
    }

    /* 以smgr(存储管理器)级别打开一个缓冲区 */
    RelationOpenSmgr(reln);

    /* 拒绝读取非局部临时关系的请求,因为可能会获得监控不到的错误数据 */
    if (RELATION_IS_OTHER_TEMP(reln) && fork_num <= INIT_FORKNUM)
        ereport(ERROR,
            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot access temporary tables of other sessions")));

    /* 读取缓冲区,更新pgstat 数量反馈cache 命中与否情况 */
    pgstat_count_buffer_read(reln);
    pgstatCountBlocksFetched4SessionLevel();
    buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence, fork_num, block_num, mode, strategy, &hit);
    if (hit) {
        pgstat_count_buffer_hit(reln);
    }
    return buf;
}

/* 释放一个缓冲区 */
void ReleaseBuffer(Buffer buffer)
{
    BufferDesc* buf_desc = NULL;
    PrivateRefCountEntry* ref = NULL;
    /* 错误释放处理 */
    if (!BufferIsValid(buffer)) {
        ereport(ERROR, (errcode(ERRCODE_INVALID_BUFFER), (errmsg("bad buffer ID: %d", buffer))));
    }

    ResourceOwnerForgetBuffer(t_thrd.utils_cxt.CurrentResourceOwner, buffer);

    if (BufferIsLocal(buffer)) {
        Assert(u_sess->storage_cxt.LocalRefCount[-buffer - 1] > 0);
        u_sess->storage_cxt.LocalRefCount[-buffer - 1]--;
        return;
    }
    /* 释放当前缓冲区 */
    buf_desc = GetBufferDescriptor(buffer - 1);

    PrivateRefCountEntry *free_entry = NULL;
    ref = GetPrivateRefCountEntryFast(buffer, free_entry);
    if (ref == NULL) {
        ref = GetPrivateRefCountEntrySlow(buffer, false, false, free_entry);}
    Assert(ref != NULL);
    Assert(ref->refcount > 0);

    if (ref->refcount > 1) {
        ref->refcount--;
    } else {
        UnpinBuffer(buf_desc, false);
    }
}
/* 标记写脏缓冲区 */
void MarkBufferDirty(Buffer buffer)
{
    BufferDesc* buf_desc = NULL;
    uint32 buf_state;
    uint32 old_buf_state;

    if (!BufferIsValid(buffer)) {
        ereport(ERROR, (errcode(ERRCODE_INVALID_BUFFER), (errmsg("bad buffer ID: %d", buffer))));}

    if (BufferIsLocal(buffer)) {
        MarkLocalBufferDirty(buffer);
        return;
    }

    buf_desc = GetBufferDescriptor(buffer - 1);

    Assert(BufferIsPinned(buffer));
    Assert(LWLockHeldByMe(buf_desc->content_lock));

    old_buf_state = LockBufHdr(buf_desc);

    buf_state = old_buf_state | (BM_DIRTY | BM_JUST_DIRTIED);

    /* 将未入队的脏页入队 */
    if (g_instance.attr.attr_storage.enableIncrementalCheckpoint) {
        for (;;) {
            buf_state = old_buf_state | (BM_DIRTY | BM_JUST_DIRTIED);
            if (!XLogRecPtrIsInvalid(pg_atomic_read_u64(&buf_desc->rec_lsn))) {
                break;
            }

            if (!is_dirty_page_queue_full(buf_desc) && push_pending_flush_queue(buffer)) {
                break;
            }

            UnlockBufHdr(buf_desc, old_buf_state);
            pg_usleep(TEN_MICROSECOND);
            old_buf_state = LockBufHdr(buf_desc);
        }
    }

    UnlockBufHdr(buf_desc, buf_state);

    /* 如果缓冲区不是“脏”状态,则更新相关计数 */
    if (!(old_buf_state & BM_DIRTY)) {
        t_thrd.vacuum_cxt.VacuumPageDirty++;
        u_sess->instr_cxt.pg_buffer_usage->shared_blks_dirtied++;

        pgstatCountSharedBlocksDirtied4SessionLevel();

        if (t_thrd.vacuum_cxt.VacuumCostActive) {
            t_thrd.vacuum_cxt.VacuumCostBalance += u_sess->attr.attr_storage.VacuumCostPageDirty;
        }
    }
}

4. 列存储引擎

传统行存储数据压缩率低,必须按行读取,即使读取一列也必须读取整行。openGauss创建表的时候,可以指定行存储还是列存储。列存储表也支持DML操作,也支持MVCC。列存储架构如图2所示。

openGauss数据库源码解析系列文章——openGauss简介(二)

|图2 列存储架构|

列存储引擎有以下优势:

  • 列的数据特征比较相似,适合压缩,压缩比很高。
  • 当表列的个数比较多,但是访问的列个数比较少时,列存可以按需读取列数据,大大减少不必要的读IO,提高查询性能。
  • 基于列批量数据Vector(向量)的运算,CPU的cache命中率比较高,性能比较好。列存储引擎更适合OLAP大数据统计分析的场景。

1) 列存储源码组织

列存储源码目录为:/src/gausskernel/storage/cstore。列存储源码文件如表2所示。

表2 列存储源码文件

cstore 列存储源码文件
compression 数据压缩与解压
cstore_allocspace 空间分配
cstore_am 列存储公共API
cstore_***_func 支持函数
cstore_psort 列内排序
cu 数据压缩单元
cucache_mgr 缓存管理器
custorage 持久化存储
cstore_delete 删除方法
cstore_update 更新方法
cstore_vector 缓冲区实现
cstore_rewrite SQL重写
cstore_insert 插入方法
cstore_mem_alloc 内存分配

2) 列存储主要API

列存储主要API代码如下。

/*  cstore_am.cpp */
...
/*  扫描 APIs */
    void InitScan(CStoreScanState *state, Snapshot snapshot = NULL);
    void InitReScan();
    void InitPartReScan(Relation rel);
    bool IsEndScan() const;

    /*  延迟读取APIs */
    bool IsLateRead(int id) const;
    void ResetLateRead();

    /*  更新列存储扫描计时标记*/
    void SetTiming(CStoreScanState *state);

    /*  列存储扫描*/
    void ScanByTids(_in_ CStoreIndexScanState *state, _in_ VectorBatch *idxOut, _out_ VectorBatch *vbout);
    void CStoreScanWithCU(_in_ CStoreScanState *state, BatchCUData *tmpCUData, _in_ bool isVerify = false);

    /*  加载数据压缩单元描述信息  */
    bool LoadCUDesc(_in_ int col, __inout LoadCUDescCtl *loadInfoPtr, _in_ bool prefetch_control, _in_ Snapshot snapShot = NULL);

    /*  从描述表中获取数据压缩单元描述*/
    bool GetCUDesc(_in_ int col, _in_ uint32 cuid, _out_ CUDesc *cuDescPtr, _in_ Snapshot snapShot = NULL);

    /*  获取元组删除信息*/
    void GetCUDeleteMaskIfNeed(_in_ uint32 cuid, _in_ Snapshot snapShot);

    bool GetCURowCount(_in_ int col, __inout LoadCUDescCtl *loadCUDescInfoPtr, _in_ Snapshot snapShot);
    /* 获取实时行号。 */
    int64 GetLivedRowNumbers(int64 *deadrows);

    /*  获得数据压缩单元*/
    CU *GetCUData(_in_ CUDesc *cuDescPtr, _in_ int colIdx, _in_ int valSize, _out_ int &slotId);

    CU *GetUnCompressCUData(Relation rel, int col, uint32 cuid, _out_ int &slotId, ForkNumber forkNum = MAIN_FORKNUM,
                            bool enterCache = true) const;

    /*  缓冲向量填充 APIs */
    int FillVecBatch(_out_ VectorBatch *vecBatchOut);

    /*  填充列向量*/
    template <bool hasDeadRow, int attlen>
    int FillVector(_in_ int colIdx, _in_ CUDesc *cu_desc_ptr, _out_ ScalarVector *vec);

    template <int attlen>
    void FillVectorByTids(_in_ int colIdx, _in_ ScalarVector *tids, _out_ ScalarVector *vec);

    template <int attlen>
    void FillVectorLateRead(_in_ int seq, _in_ ScalarVector *tids, _in_ CUDesc *cuDescPtr, _out_ ScalarVector *vec);

    void FillVectorByIndex(_in_ int colIdx, _in_ ScalarVector *tids, _in_ ScalarVector *srcVec, _out_ ScalarVector *destVec);

    /*  填充系统列*/
    int FillSysColVector(_in_ int colIdx, _in_ CUDesc *cu_desc_ptr, _out_ ScalarVector *vec);

    template <int sysColOid>
    void FillSysVecByTid(_in_ ScalarVector *tids, _out_ ScalarVector *destVec);

    template <bool hasDeadRow>
    int FillTidForLateRead(_in_ CUDesc *cuDescPtr, _out_ ScalarVector *vec);

    void FillScanBatchLateIfNeed(__inout VectorBatch *vecBatch);

    /* 设置数据压缩单元范围以支持索引扫描 */
    void SetScanRange();

    /*  判断行是否可用*/
    bool IsDeadRow(uint32 cuid, uint32 row) const;

    void CUListPrefetch();
    void CUPrefetch(CUDesc *cudesc, int col, AioDispatchCUDesc_t **dList, int &count, File *vfdList);

    /* 扫描函数 */
    typedef void (CStore::*ScanFuncPtr)(_in_ CStoreScanState *state, _out_ VectorBatch *vecBatchOut);
    void RunScan(_in_ CStoreScanState *state, _out_ VectorBatch *vecBatchOut);
    int GetLateReadCtid() const;
    void IncLoadCuDescCursor();

5. 内存引擎

openGauss引入了MOT(Memory-Optimized Table,内存优化表)存储引擎,它是一种事务性行存储,针对多核和大内存服务器进行了优化。MOT是openGauss数据库出色的生产级特性(Beta版本),它为事务性工作负载提供更高的性能。MOT完全支持ACID特性,并包括严格的持久性和高可用性支持。企业可以在关键任务、性能敏感的在线事务处理(OLTP)中使用MOT,以实现高性能、高吞吐、可预测低延迟以及多核服务器的高利用率。MOT尤其适合在多路和多核处理器的现代服务器上运行,例如基于ARM(Advanced RISC Machine,高级精简指令集计算机器)/鲲鹏处理器的华为TaiShan服务器,以及基于x86的戴尔或类似服务器。MOT存储引擎如图3所示。

openGauss数据库源码解析系列文章——openGauss简介(二)

|图3 openGauss内存引擎|

MOT与基于磁盘的普通表并排创建。MOT的有效设计实现了几乎完全的SQL覆盖,并且支持完整的数据库功能集,如存储过程和自定义函数。通过完全存储在内存中的数据和索引、非统一内存访问感知(NUMA-aware)设计、消除锁和锁存争用的算法以及查询原生编译,MOT可提供更快的数据访问和更高效的事务执行。MOT有效的几乎无锁的设计和高度调优的实现,使其在多核服务器上实现了卓越的近线性吞吐量扩展。

MOT的高性能(查询和事务延迟)、高可扩展性(吞吐量和并发量)等特点,在某些情况下低成本(高资源利用率)方面拥有显著优势。

  • 低延迟(Low Latency):提供快速的查询和事务响应时间。
  • 高吞吐量(High Throughput):支持峰值和持续高用户并发。
  • 高资源利用率(High Resource Utilization):充分利用硬件。

MOT的关键技术如下:

  • 内存优化数据结构:以实现高并发吞吐量和可预测的低延迟为目标,所有数据和索引都在内存中,不使用中间页缓冲区,并使用持续时间最短的锁。数据结构和所有算法都是专门为内存设计而优化的。
  • 免锁事务管理:MOT在保证严格一致性和数据完整性的前提下,采用乐观的策略实现高并发和高吞吐。在事务过程中,MOT不会对正在更新的数据行的任何版本加锁,从而大大降低了一些大内存系统中的争用。
  • 免锁索引:由于内存表的数据和索引完全存储在内存中,因此拥有一个高效的索引数据结构和算法非常重要。MOT索引机制基于领域前沿的树结构Masstree,一种用于多核系统的快速和可扩展的键值(Key Value,KV)存储索引,以B+树的Trie实现。通过这种方式,高并发工作负载在多核服务器上可以获得卓越的性能。同时MOT应用了各种先进的技术以优化性能,如优化锁方法、高速缓存感知和内存预取。
  • NUMA-aware的内存管理:MOT内存访问的设计支持非统一内存访问(NUMA,Non-Uniform Memory Access)感知。NUMA-aware算法增强了内存中数据布局的性能,使线程访问物理上连接到线程运行的核心的内存。这是由内存控制器处理的,不需要通过使用互连(如英特尔QPI(Quick Path Interconnect,快速路径互连))进行额外的跳转。MOT的智能内存控制模块,为各种内存对象预先分配了内存池,提高了性能、减少了锁、保证了稳定性。
  • 高效持久性:日志和检查点是实现磁盘持久化的关键能力,也是ACID的关键要求之一。目前所有的磁盘(包括SSD和NVMe(Non-Volatile Memory express,非易失性高速传输总线))都明显慢于内存,因此持久化是基于内存数据库引擎的瓶颈。作为一个基于内存的存储引擎,MOT的持久化设计必须实现各种各样的算法优化,以确保持久化的同时还能达到设计时的速度和吞吐量目标。
  • 高SQL覆盖率和功能集:MOT通过扩展的openGauss外部数据封装(Foreign Data Wrapper,FDW)以及索引,几乎支持完整的SQL范围,包括存储过程、用户定义函数和系统函数调用。
  • 使用PREPARE语句的查询原生编译:通过使用PREPARE客户端命令,可以以交互方式执行查询和事务语句。这些命令已被预编译成原生执行格式,也称为Code-Gen或即时(Just-in-Time,JIT)编译。这样可以实现平均30%的性能提升。
  • MOT和openGauss数据库的无缝集成:MOT是一个高性能的面向内存优化的存储引擎,已集成在openGauss软件包中。MOT的主内存引擎和基于磁盘的存储引擎并存,以支持多种应用场景,同时在内部重用数据库辅助服务,如WAL重做日志、复制、检查点和恢复高可用性等。

1) 内存引擎源码组织

内存引擎源码目录为:/src/gausskernel/storage/mot。内存引擎源码文件如表3所示。

表3 内存引擎源码文件

mot 内存引擎源码文件
concurrency_control 并发控制管理
infra 辅助与配置函数
memory 内存数据管理
storage 持久化存储
system 全局控制API
utils 日志等通用方法

2) 内存引擎主流程

内存引擎主流程代码如下。

/* system/mot_engine.cpp */
...
/* 创建内存引擎实例 */
MOTEngine* MOTEngine::CreateInstance(
    const char* configFilePath /* = nullptr */, int argc /* = 0 */, char* argv[] /* = nullptr */)
{
    if (m_engine == nullptr) {
        if (CreateInstanceNoInit(configFilePath, argc, argv) != nullptr) {
            bool result = m_engine->LoadConfig();
            if (!result) {
                MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "System Startup", "Failed to load Engine configuration");
            } else {
                result = m_engine->Initialize();
                if (!result) {
                    MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "System Startup", "Engine initialization failed");
                }
            }

            if (!result) {
                DestroyInstance();
                MOT_ASSERT(m_engine == nullptr);
            }
        }
    }
    return m_engine;
}
/* 内存引擎初始化 */
bool MOTEngine::Initialize()
{
    bool result = false;
/* 初始化应用服务,开始后台任务 */
    do {  // instead of goto
        m_initStack.push(INIT_CORE_SERVICES_PHASE);
        result = InitializeCoreServices();
        CHECK_INIT_STATUS(result, "Failed to Initialize core services");

        m_initStack.push(INIT_APP_SERVICES_PHASE);
        result = InitializeAppServices();
        CHECK_INIT_STATUS(result, "Failed to Initialize applicative services");

        m_initStack.push(START_BG_TASKS_PHASE);
        result = StartBackgroundTasks();
        CHECK_INIT_STATUS(result, "Failed to start background tasks");
    } while (0);

    if (result) {
        MOT_LOG_INFO("Startup: MOT Engine initialization finished successfully");
        m_initialized = true;
    } else {
        MOT_LOG_PANIC("Startup: MOT Engine initialization failed!");
        /* 调用方应在失败后调用DestroyInstance() */
    }

    return result;
}
/* 销毁内存引擎实例 */
void MOTEngine::Destroy()
{
    MOT_LOG_INFO("Shutdown: Shutting down MOT Engine");
    while (!m_initStack.empty()) {
        switch (m_initStack.top()) {
            case START_BG_TASKS_PHASE:
                StopBackgroundTasks();
                break;

            case INIT_APP_SERVICES_PHASE:
                DestroyAppServices();
                break;

            case INIT_CORE_SERVICES_PHASE:
                DestroyCoreServices();
                break;

            case LOAD_CFG_PHASE:
                break;

            case INIT_CFG_PHASE:
                DestroyConfiguration();
                break;

            default:
                break;
        }
        m_initStack.pop();
    }
    ClearErrorStack();
    MOT_LOG_INFO("Shutdown: MOT Engine shutdown finished");
}

openGauss数据库源码解析系列文章——openGauss简介(二)

Gauss松鼠会是汇集数据库爱好者和关注者的大本营,大家共同学习、探索、分享数据库前沿知识和技术,互助解决问题,共建数据库技术交流圈。

相关文章

暂无评论

暂无评论...