Flink Catalog解读

网友投稿 1158 2022-10-12

Flink Catalog解读

Flink Catalog解读

文章目录

​​01 引言​​​​02 Catalog​​

​​2.1 Catalog概述​​​​2.2 Catalog分类​​​​2.3 Catalog API​​

​​2.3.1 数据库操作​​​​2.3.2 表操作​​​​2.3.3 视图操作​​​​2.3.4 分区操作​​​​2.3.5 函数操作​​

​​2.4 Catalog 示例(SQL Client的方式)​​

​​03 文末​​

01 引言

我们知道 Flink 有​​Table​​​(表)、​​View​​​(视图)、​​Function​​​(函数/算子)、​​Database​​(数据库)的概念,这都类似于我们平常使用的关系型数据库里面的概念。

相对于关系型数据库的这些概念,Flink 里还有一个 ​​Catalog​​(目录) 的概念,本文来讲解下。

02 Catalog

2.1 Catalog概述

数据处理最关键的方面之一是管理元数据:

元数据可以是临时的,例如在​​Flink​​​中临时表、或者通过​​TableEnvironment​​​ 注册的​​UDF​​;元数据也可以是持久化的,例如​​Hive Metastore​​ 中的元数据。

Catalog在Flink中提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。​​Catalog ​​提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

2.2 Catalog分类

Catalog目前分为以下几类:

分类

描述

缺陷

GenericInMemoryCatalog

基于内存实现的 Catalog

所有元数据只在 session 的生命周期内可用

​JdbcCatalog​

可以将 Flink 通过 JDBC 协议连接到关系数据库

JDBC Catalog只实现了PostgresCatalog

​HiveCatalog​

作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口

Hive Metastore 以小写形式存储所有元数据对象名称。而 GenericInMemoryCatalog 区分大小写。

自定义 Catalog

通过实现 Catalog 接口来开发自定义 Catalog

-

2.3 Catalog API

2.3.1 数据库操作

// create databasecatalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);// drop databasecatalog.dropDatabase("mydb", false);// alter databasecatalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);// get databasecatalog.getDatabase("mydb");// check if a database existcatalog.databaseExists("mydb");// list databases in a catalogcatalog.listDatabases("mycatalog");

2.3.2 表操作

// create tablecatalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);// drop tablecatalog.dropTable(new ObjectPath("mydb", "mytable"), false);// alter tablecatalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);// rename tablecatalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");// get tablecatalog.getTable("mytable");// check if a table exist or notcatalog.tableExists("mytable");// list tables in a databasecatalog.listTables("mydb");

2.3.3 视图操作

// create viewcatalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);// drop viewcatalog.dropTable(new ObjectPath("mydb", "myview"), false);// alter viewcatalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);// rename viewcatalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);// get viewcatalog.getTable("myview");// check if a view exist or notcatalog.tableExists("mytable");// list views in a databasecatalog.listViews("mydb");

2.3.4 分区操作

// create viewcatalog.createPartition( new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), new CatalogPartitionImpl(...), false);// drop partitioncatalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);// alter partitioncatalog.alterPartition( new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), new CatalogPartitionImpl(...), false);// get partitioncatalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));// check if a partition exist or notcatalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));// list partitions of a tablecatalog.listPartitions(new ObjectPath("mydb", "mytable"));// list partitions of a table under a give partition speccatalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));// list partitions of a table by expression filtercatalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));

2.3.5 函数操作

catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);// drop functioncatalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);// alter functioncatalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);// get functioncatalog.getFunction("myfunc");// check if a function exist or notcatalog.functionExists("myfunc");// list functions in a databasecatalog.listFunctions("mydb");

2.4 Catalog 示例(SQL Client的方式)

① 首先需要注册Catalog:用户可以访问默认创建的内存 Catalog default_catalog,这个 Catalog 默认拥有一个默认数据库 default_database。 用户也可以注册其他的 Catalog 到现有的 Flink 会话中,创建方式如下(可以使用Flink里面的Factory工厂模式动态加载):

tableEnv.registerCatalog(new CustomCatalog("myCatalog"));

② 指定使用的内容:Flink 始终在当前的 Catalog 和数据库中寻找表、视图和 UDF,代码如下:

Flink SQL> USE CATALOG myCatalog;Flink SQL> USE myDB;

也可以通过提供全限定名 catalog.database.object 来访问不在当前 Catalog 中的元数据信息,代码如下:

Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;

③ 其它常规命令:

-- 列出可用的 CatalogFlink SQL> show catalogs;-- 列出可用的数据库 Flink SQL> show databases;-- 列出可用的表Flink SQL> show tables;

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Online 小程序离线版 是一款实用的个人画板工具(online library)
下一篇:开发一个小程序「做微信小程序需要多少钱」
相关文章

 发表评论

暂时没有评论,来抢沙发吧~