Flink SQL 知其所以然:Explain、Show、Load、Set 子句

发布时间:2025-05-18 20:56:52 作者:益华网络 来源:undefined 浏览量(1) 点赞(1)
摘要:EXPLAIN 子句 大家好,我是老羊,今天我们来学习 Flink SQL 中的的 Explain、Show、Load、Set 共 4 个子句。 应用场景:EXPLAIN 子句其实就是用于查看当前这个 sql 查询的逻辑计划以及优化的执行计划。SQL 语法标准:EXPLAIN PLAN FOR 实际案例:public class Explain_Test {

EXPLAIN 子句

大家好,我是老羊,今天我们来学习 Flink SQL 中的的 Explain、Show、Load、Set 共 4 个子句。

应用场景:EXPLAIN 子句其实就是用于查看当前这个 sql 查询的逻辑计划以及优化的执行计划。SQL 语法标准:EXPLAIN PLAN FOR 实际案例:public class Explain_Test {

public static void main(String[] args) throws Exception {

FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);

flinkEnv.env().setParallelism(1);

String sql = "CREATE TABLE source_table (\n"

+ " user_id BIGINT COMMENT 用户 id,\n"

+ " name STRING COMMENT 用户姓名,\n"

+ " server_timestamp BIGINT COMMENT 用户访问时间戳,\n"

+ " proctime AS PROCTIME()\n"

+ ") WITH (\n"

+ " connector = datagen,\n"

+ " rows-per-second = 1,\n"

+ " fields.name.length = 1,\n"

+ " fields.user_id.min = 1,\n"

+ " fields.user_id.max = 10,\n"

+ " fields.server_timestamp.min = 1,\n"

+ " fields.server_timestamp.max = 100000\n"

+ ");\n"

+ "\n"

+ "CREATE TABLE sink_table (\n"

+ " user_id BIGINT,\n"

+ " name STRING,\n"

+ " server_timestamp BIGINT\n"

+ ") WITH (\n"

+ " connector = print\n"

+ ");\n"

+ "\n"

+ "EXPLAIN PLAN FOR\n"

+ "INSERT INTO sink_table\n"

+ "select user_id,\n"

+ " name,\n"

+ " server_timestamp\n"

+ "from (\n"

+ " SELECT\n"

+ " user_id,\n"

+ " name,\n"

+ " server_timestamp,\n"

+ " row_number() over(partition by user_id order by proctime) as rn\n"

+ " FROM source_table\n"

+ ")\n"

+ "where rn = 1";

/**

* 算子 {@link org.apache.flink.streaming.api.operators.KeyedProcessOperator}

* -- {@link org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction}

*/

for (String innerSql : sql.split(";")) {

TableResult tableResult = flinkEnv.streamTEnv().executeSql(innerSql);

tableResult.print();

}

}

}

上述代码执行结果如下:

1. 抽象语法树

== Abstract Syntax Tree ==

LogicalSink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])

+- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2])

+- LogicalFilter(condition=[=($3, 1)])

+- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)])

+- LogicalTableScan(table=[[default_catalog, default_database, source_table]])

2. 优化后的物理计划

== Optimized Physical Plan ==

Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])

+- Calc(select=[user_id, name, server_timestamp])

+- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME])

+- Exchange(distribution=[hash[user_id]])

+- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3])

+- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])

3. 优化后的执行计划

== Optimized Execution Plan ==

Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])

+- Calc(select=[user_id, name, server_timestamp])

+- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME])

+- Exchange(distribution=[hash[user_id]])

+- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3])

+- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])

USE 子句

应用场景:如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,USE 子句通常被用于切换库,那么在 Flink SQL 体系中,它的作用也是和 MySQL 中 USE 子句的功能基本一致,用于切换 Catalog,DataBase,使用 Module。SQL 语法标准:切换 Catalog:USE CATALOG catalog_name使用 Module:USE MODULES module_name1[, module_name2, ...]切换 Database:USE db名称实际案例:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// create a catalog

tEnv.executeSql("CREATE CATALOG cat1 WITH (...)");

tEnv.executeSql("SHOW CATALOGS").print();

// +-----------------+

// | catalog name |

// +-----------------+

// | default_catalog |

// | cat1 |

// +-----------------+

// change default catalog

tEnv.executeSql("USE CATALOG cat1");

tEnv.executeSql("SHOW DATABASES").print();

// databases are empty

// +---------------+

// | database name |

// +---------------+

// +---------------+

// create a database

tEnv.executeSql("CREATE DATABASE db1 WITH (...)");

tEnv.executeSql("SHOW DATABASES").print();

// +---------------+

// | database name |

// +---------------+

// | db1 |

// +---------------+

// change default database

tEnv.executeSql("USE db1");

// change module resolution order and enabled status

tEnv.executeSql("USE MODULES hive");

tEnv.executeSql("SHOW FULL MODULES").print();

// +-------------+-------+

// | module name | used |

// +-------------+-------+

// | hive | true |

// | core | false |

// +-------------+-------+

SHOW 子句

应用场景:如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,SHOW 子句常常用于查询库、表、函数等,在 Flink SQL 体系中也类似。Flink SQL 支持 SHOW 以下内容。SQL 语法标准:SHOW CATALOGS:展示所有 Catalog

SHOW CURRENT CATALOG:展示当前的 Catalog

SHOW DATABASES:展示当前 Catalog 下所有 Database

SHOW CURRENT DATABASE:展示当前的 Database

SHOW TABLES:展示当前 Database 下所有表

SHOW VIEWS:展示所有视图

SHOW FUNCTIONS:展示所有的函数

SHOW MODULES:展示所有的 Module(Module 是用于 UDF 扩展)实际案例:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// show catalogs

tEnv.executeSql("SHOW CATALOGS").print();

// +-----------------+

// | catalog name |

// +-----------------+

// | default_catalog |

// +-----------------+

// show current catalog

tEnv.executeSql("SHOW CURRENT CATALOG").print();

// +----------------------+

// | current catalog name |

// +----------------------+

// | default_catalog |

// +----------------------+

// show databases

tEnv.executeSql("SHOW DATABASES").print();

// +------------------+

// | database name |

// +------------------+

// | default_database |

// +------------------+

// show current database

tEnv.executeSql("SHOW CURRENT DATABASE").print();

// +-----------------------+

// | current database name |

// +-----------------------+

// | default_database |

// +-----------------------+

// create a table

tEnv.executeSql("CREATE TABLE my_table (...) WITH (...)");

// show tables

tEnv.executeSql("SHOW TABLES").print();

// +------------+

// | table name |

// +------------+

// | my_table |

// +------------+

// create a view

tEnv.executeSql("CREATE VIEW my_view AS ...");

// show views

tEnv.executeSql("SHOW VIEWS").print();

// +-----------+

// | view name |

// +-----------+

// | my_view |

// +-----------+

// show functions

tEnv.executeSql("SHOW FUNCTIONS").print();

// +---------------+

// | function name |

// +---------------+

// | mod |

// | sha256 |

// | ... |

// +---------------+

// create a user defined function

tEnv.executeSql("CREATE FUNCTION f1 AS ...");

// show user defined functions

tEnv.executeSql("SHOW USER FUNCTIONS").print();

// +---------------+

// | function name |

// +---------------+

// | f1 |

// | ... |

// +---------------+

// show modules

tEnv.executeSql("SHOW MODULES").print();

// +-------------+

// | module name |

// +-------------+

// | core |

// +-------------+

// show full modules

tEnv.executeSql("SHOW FULL MODULES").print();

// +-------------+-------+

// | module name | used |

// +-------------+-------+

// | core | true |

// | hive | false |

// +-------------+-------+

LOAD、UNLOAD 子句

应用场景:我们可以使用 LOAD 子句去加载 Flink SQL 体系内置的或者用户自定义的 Module,UNLOAD 子句去卸载 Flink SQL 体系内置的或者用户自定义的 Module。SQL 语法标准:-- 加载

LOAD MODULE module_name [WITH (key1 = val1, key2 = val2, ...)]

-- 卸载

UNLOAD MODULE module_name实际案例:LOAD 案例:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// 加载 Flink SQL 体系内置的 Hive module

tEnv.executeSql("LOAD MODULE hive WITH (hive-version = 3.1.2)");

tEnv.executeSql("SHOW MODULES").print();

// +-------------+

// | module name |

// +-------------+

// | core |

// | hive |

// +-------------+UNLOAD 案例:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// 卸载唯一的一个 CoreModule

tEnv.executeSql("UNLOAD MODULE core");

tEnv.executeSql("SHOW MODULES").print();

// 结果啥 Moudle 都没有了

SET、RESET 子句

应用场景:SET 子句可以用于修改一些 Flink SQL 的环境配置,RESET 子句是可以将所有的环境配置恢复成默认配置,但只能在 SQL CLI 中进行使用,主要是为了让用户更纯粹的使用 SQL 而不必使用其他方式或者切换系统环境。SQL 语法标准:SET (key = value)?

RESET (key)?实际案例:

启动一个 SQL CLI 之后,在 SQL CLI 中可以进行以下 SET 设置:

Flink SQL> SET table.planner = blink;

[INFO] Session property has been set.

Flink SQL> SET;

table.planner=blink;

Flink SQL> RESET table.planner;

[INFO] Session property has been reset.

Flink SQL> RESET;

[INFO] All session properties have been set to their default values.

二维码

扫一扫,关注我们

声明:本文由【益华网络】编辑上传发布,转载此文章须经作者同意,并请附上出处【益华网络】及本页链接。如内容、图片有任何版权问题,请联系我们进行处理。

感兴趣吗?

欢迎联系我们,我们愿意为您解答任何有关网站疑难问题!

您身边的【网站建设专家】

搜索千万次不如咨询1次

主营项目:网站建设,手机网站,响应式网站,SEO优化,小程序开发,公众号系统,软件开发等

立即咨询 15368564009
在线客服
嘿,我来帮您!