作者:闻乃松
写过Spark应用程序的同学都知道,通过下面这段代码就可以加载和访问外部Hive数据源:
SparkSession.builder().
appName(TestSparkHive.class.getSimpleName()).
master("local[*]").
enableHiveSupport().
getOrCreate();
List<Row> list= spark.sql("show databases").collectAsList();
也许你会好奇,它是怎么找到并访问外部Hive数据源的?
其实,Spark识别Hive,也是需要依赖Hive配置项的,配置项的来源可以是$HIVE_HOME环境变量,也可以从Spark的运行环境的classpath下加载Hive相关的配置文件。
创建对Hive外部数据源的访问,不得不提到Spark的两个类:SessionCatalog和ExternalCatalog。前者是对后者的封装,对外部数据源的访问都是通过ExternalCatalog实现。而ExternalCatalog是一个Trait类型,提供了对表、函数和分区的增删改查基本接口。对Hive数据源来讲,分别继承上述两个类,提供了具体的实现:HiveSessionCatalog和HiveExternalCatalog。
随着新数据源(Spark中称为DataSourceV2)的出现,原来的SessionCatalog暴露出弊端和不足,为了适应新的数据源特性,Spark推出了新的接口:CatalogPlugin,因为属于顶层接口,CatalogPlugin本身很简单,只有3个方法:
public interface CatalogPlugin {
void initialize(String name, CaseInsensitiveStringMap options);
String name();
default String[] defaultNamespace() { return new String[0]; }
}
实现自定义Catalog,既可以直接实现CatalogPlugin,也可以扩展TableCatalog接口,TableCatalog扩展了CatalogPlugin并提供了表操作相关功能的接口。同理,实现函数相关的Catalog,也可以直接扩展FunctionCatalog,因为它提供了函数管理相关的接口。同SessionCatalog相对应,CatalogPlugin接口体系也实现了V2SessionCatalog,整个CatalogPlugin类体系表示为下图所示:
V2SessionCatalog不同于SessionCataolog,主要表现在:
- V2SessionCatalog实现了CatalogPlugIn接口,CatalogPlugIn是针对新数据源(DatasourceV2)的元数据管理。
- SessionCatalog 只是普通类,封装了外部数据源的元数据管理接口ExternalCatalog。
- SessionCatalog 作为V2SessionCatalog的属性,或者说 V2SessionCatalog是SessionCatalog的代理实现。
关于第3点,可以从V2SessionCatalog的实现得到佐证,同时以一个方法listTables的实现为例来看:
/**translates calls to the v1 SessionCatalog. */
class V2SessionCatalog(catalog: SessionCatalog) extends TableCatalog
with SupportsNamespaces with SQLConfHelper {
override def listTables(namespace: Array[String]): Array[Identifier] = {
namespace match {
case Array(db) => catalog .listTables(db)
.map(ident => Identifier.of(Array(ident.database.getOrElse("")),
ident.table)) .toArray
case _ => throw QueryCompilationErrors.noSuchNamespaceError(namespace)
}
}
....
}
有了SessionCatalog和V2SessionCatalog,Spark又是如何管理这些Catalog呢?
Spark提供了CatalogManager,其内部通过一个Map类型的内存数据结构维护注册的Catalog实例:
class CatalogManager( defaultSessionCatalog: CatalogPlugin, val v1SessionCatalog:
SessionCatalog) extends SQLConfHelper with Logging {
//SESSION_CATALOG_NAME 常量:spark_catalog
import CatalogManager.SESSION_CATALOG_NAME
import CatalogV2Util._ private val catalogs = mutable.HashMap.empty[String, CatalogPlugin]
def catalog(name: String): CatalogPlugin = synchronized {
if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {
v2SessionCatalog
}
else {
catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))
}
}
}
CatalogManager维护了所有Catalog实例的键值对信息,能够根据catalog名称返回对应的Catalog实例,其中有一个固定的名字叫spark_catalog,用于当前默认的Catalog实例实现,该示例就是V2SessionCatalog,它代理了普通的SessionCatalog,因此,在使用时,即使什么Catalog都不注册,Spark也会根据默认的Catalog实例加载Hive数据源。但是V2SessionCatalog只是对SessionCatalog的简单代理,那么如何实现复杂的数据源元数据管理功能呢?这就需要扩展V2SessionCatalog的实现,这里以Spark Iceberg的实现为例说明:
public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces>
extends BaseCatalog implements CatalogExtension {
private static final String[] DEFAULT_NAMESPACE = new String[]{
"default"
};
private String catalogName = null;
private TableCatalog icebergCatalog = null;
private StagingTableCatalog asStagingCatalog = null;
private T sessionCatalog = null;
...
}
SparkSessionCatalog实现了CatalogExtension接口,而CatalogExtension接口扩展了SparkPlugIn。注意到类中有两个TableCatalog类型的属性:icebergCatalog和sessionCatalog。其中sessionCatalog就是上面介绍的V2SessionCatalog。
实际上,icebergCatalog和sessionCatalog是 Iceberg Runtime提供的两个类定义,分别是:
org.apache.iceberg.spark.SparkCatalog
org.apache.iceberg.spark.SparkSessionCatalog
关于这两个类的区别,官网有这么一段解释:
什么意思?
就是说,SparkCatalog专用于Iceberg 管理,比如你可以这样在Spark Catalog Manager中注册hive和hadoop类型的Catalog:
set spark.sql.catalog.hive_iceberg_catalog_demo=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.hive_iceberg_catalog_demo.type=hive;
或者
set spark.sql.catalog.hadoop_iceberg_catalog_demo=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.hadoop_iceberg_catalog_demo.type=hadoop;
你可以使用如下的形式创建Iceberg表:
CREATE TABLE new_iceberg_catalog.default.sample_iceberg_table(
id bigint COMMENT 'unique id', data string)
USING iceberg
location 'your path'
TBLPROPERTIES ('iceberg.catalog'='new_iceberg_catalog');
如果当前默认namespace在default下,你甚至可以将上面建表语句简写为:
CREATE TABLE sample_iceberg_table( id bigint COMMENT 'unique id', data string);
因为当前Catalog已经明确定义为Iceberg表,它能自动创建Iceberg表,但无法访问普通的Hive表。而SparkSessionCatalog不仅可以定义上面的Iceberg Catalog,并在其中创建Iceberg类型的表,还可以创建非Iceberg类型的表,注册方式同上:
set spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog;
set spark.sql.catalog.spark_catalog.type=hive;
对SparkSessionCatalog类型的Catalog,其名称为固定的spark_catalog。它重写了Spark默认的V2SessionCatalog行为,SparkSessionCatalog可看做是对Hive数据源的兼容,对非Iceberg类型的表操作,跟普通的Hive表操作并无区别。以createTable这个方法为例:
public Table createTable(Identifier ident, StructType schema, Transform[] partitions, Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
String provider = (String)properties.get("provider");
return this.useIceberg(provider) ?
this.icebergCatalog.createTable(ident, schema, partitions, properties) :
this.getSessionCatalog().createTable(ident, schema, partitions, properties);
}
如果是Iceberg表,它使用icebergCatalog创建表,否则就用SessionCatalog创建表。而listTables直接将请求转给了SessionCatalog,因为Hive Iceberg表和普通Hive表都基于HMS存储,所以可以使用SessionCatalog。
public Identifier[] listTables(String[] namespace)
throws NoSuchNamespaceException {
return this.getSessionCatalog().listTables(namespace);
}
除了上述区别外,SparkSessionCatalog对Create Table AS Select或者Replace Table As Select无法保证原子性,而SparkCatalog可以。
上面介绍了Spark Iceberg对多类型Catalog的支持,下一步自然要问,这有什么用?
举两个场景来说明:
- Hive数据入湖
--定义catalog
set spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog;
set spark.sql.catalog.spark_catalog.type=hive;
set spark.sql.catalog.spark_catalog.uri=thrift://ip:9083;
set spark.sql.catalog.spark_catalog.warehouse=s3a://mybucket/warehouse;
CREATE TABLE spark_catalog.default.sample_iceberg_table(
id bigint COMMENT 'unique id', data string)
USING iceberg;
insert into spark_catalog.default.sample_iceberg_table
select * from default.sample_hive_table;
- 不同Hive版本的数据湖数据迁移,比如从低版本的HMS数据湖迁移到高版本的数据湖,可以这样实现:
--定义新的数据湖
SET spark.sql.catalog.new_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;
SET spark.sql.catalog.new_iceberg_catalog.type=hive;
SET spark.sql.catalog.new_iceberg_catalog.uri=thrift://ip-new:9083;
SET spark.sql.catalog.new_iceberg_catalog.warehouse=s3a://mybucket/warehouse;
--创建新数据湖的表 CREATE TABLE new_iceberg_catalog.default.sample_iceberg_table(
id bigint COMMENT 'unique id', data string) USING iceberg;
--定义旧的数据湖 SET spark.sql.catalog.old_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;
SET spark.sql.catalog.old_iceberg_catalog.type=hive;
SET spark.sql.catalog.old_iceberg_catalog.uri=thrift://ip-old:9083;
SET spark.sql.catalog.old_iceberg_catalog.warehouse=hdfs://service/warehouse;
insert into new_iceberg_catalog.default.sample_iceberg_table
select * from old_iceberg_catalog.default.sample_iceberg_table;