Flink SQL 知其所以然:TopN、Order By、Limit 操作

发布时间:2025-05-11 12:45:53 作者:益华网络 来源:147小编 浏览量(1) 点赞(0)
摘要:DML:Order By、Limit 子句 大家好,我是老羊,今天我们来学习 Flink SQL 中的 TopN、Order By、Limit 3个操作。1.Order By 子句支持 Batch\Streaming,但在实时任务中一般用的非常少。 实时任务中,Order By 子句中必须要有时间属性字段,并且时间属性必须为升序时间属性,即WATERMAR

DML:Order By、Limit 子句

大家好,我是老羊,今天我们来学习 Flink SQL 中的 TopN、Order By、Limit 3个操作。

1.Order By 子句

支持 Batch\Streaming,但在实时任务中一般用的非常少。

实时任务中,Order By 子句中必须要有时间属性字段,并且时间属性必须为升序时间属性,即 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 0.001 SECOND 或者 WATERMARK FOR rowtime_column AS rowtime_column。

举例:

CREATE TABLE source_table_1 (

user_id BIGINT NOT NULL,

row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),

WATERMARK FOR row_time AS row_time

) WITH (

connector = datagen,

rows-per-second = 10,

fields.user_id.min = 1,

fields.user_id.max = 10

);

CREATE TABLE sink_table (

user_id BIGINT

) WITH (

connector = print

);

INSERT INTO sink_table

SELECT user_id

FROM source_table_1

Order By row_time, user_id desc2.Limit 子句

支持 Batch\Streaming,但实时场景一般不使用,但是此处依然举一个例子:

CREATE TABLE source_table_1 (

user_id BIGINT NOT NULL,

row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),

WATERMARK FOR row_time AS row_time

) WITH (

connector = datagen,

rows-per-second = 10,

fields.user_id.min = 1,

fields.user_id.max = 10

);

CREATE TABLE sink_table (

user_id BIGINT

) WITH (

connector = print

);

INSERT INTO sink_table

SELECT user_id

FROM source_table_1

Limit 3

结果如下,只有 3 条输出:

+I[5]

+I[9]

+I[4]

DML:TopN 子句

TopN 定义(支持 Batch\Streaming):TopN 其实就是对应到离线数仓中的 row_number(),可以使用 row_number() 对某一个分组的数据进行排序应用场景:根据 某个排序 条件,计算某个分组下的排行榜数据SQL 语法标准:SELECT [column_list]

FROM (

SELECT [column_list],

ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]

ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum

FROM table_name)

WHERE rownum <= N [AND conditions]

ROW_NUMBER():标识 TopN 排序子句

PARTITION BY col1[, col2...]:标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序取 topN,比如下述案例中的partition by key,就是根据需求中的搜索关键词(key)做为分区

ORDER BY col1 [asc|desc][, col2 [asc|desc]...]:标识 TopN 的排序规则,是按照哪些字段、顺序或逆序进行排序

WHERE rownum <= N:这个子句是一定需要的,只有加上了这个子句,Flink 才能将其识别为一个 TopN 的查询,其中 N 代表 TopN 的条目数

[AND conditions]:其他的限制条件也可以加上

实际案例:取某个搜索关键词下的搜索热度前 10 名的词条数据。

输入数据为搜索词条数据的搜索热度数据,当搜索热度发生变化时,会将变化后的数据写入到数据源的 Kafka 中:

数据源 schema:

-- 字段名 备注

-- key 搜索关键词

-- name 搜索热度名称

-- search_cnt 热搜消费热度(比如 3000)

-- timestamp 消费词条时间戳

CREATE TABLE source_table (

name BIGINT NOT NULL,

search_cnt BIGINT NOT NULL,

key BIGINT NOT NULL,

row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),

WATERMARK FOR row_time AS row_time

) WITH (

...

);

-- 数据汇 schema:

-- key 搜索关键词

-- name 搜索热度名称

-- search_cnt 热搜消费热度(比如 3000)

-- timestamp 消费词条时间戳

CREATE TABLE sink_table (

key BIGINT,

name BIGINT,

search_cnt BIGINT,

`timestamp` TIMESTAMP(3)

) WITH (

...

);

-- DML 逻辑

INSERT INTO sink_table

SELECT key, name, search_cnt, row_time as `timestamp`

FROM (

SELECT key, name, search_cnt, row_time,

-- 根据热搜关键词 key 作为 partition key,然后按照 search_cnt 倒排取前 100 名

ROW_NUMBER() OVER (PARTITION BY key

ORDER BY search_cnt desc) AS rownum

FROM source_table)

WHERE rownum <= 100

输出结果:

-D[关键词1, 词条1, 4944]

+I[关键词1, 词条1, 8670]

+I[关键词1, 词条2, 1735]

-D[关键词1, 词条3, 6641]

+I[关键词1, 词条3, 6928]

-D[关键词1, 词条4, 6312]

+I[关键词1, 词条4, 7287]

可以看到输出数据是有回撤数据的,为什么会出现回撤,我们来看看 SQL 语义。

SQL 语义

上面的 SQL 会翻译成以下三个算子:

数据源:数据源即最新的词条下面的搜索词的搜索热度数据,消费到 Kafka 中数据后,按照 partition key 将数据进行 hash 分发到下游排序算子,相同的 key 数据将会发送到一个并发中

排序算子:为每个 Key 维护了一个 TopN 的榜单数据,接受到上游的一条数据后,如果 TopN 榜单还没有到达 N 条,则将这条数据加入 TopN 榜单后,直接下发数据,如果到达 N 条之后,经过 TopN 计算,发现这条数据比原有的数据排序靠前,那么新的 TopN 排名就会有变化,就变化了的这部分数据之前下发的排名数据撤回(即回撤数据),然后下发新的排名数据

数据汇:接收到上游的数据之后,然后输出到外部存储引擎中

上面三个算子也是会 24 小时一直运行的。

二维码

扫一扫,关注我们

声明:本文由【益华网络】编辑上传发布,转载此文章须经作者同意,并请附上出处【益华网络】及本页链接。如内容、图片有任何版权问题,请联系我们进行处理。

感兴趣吗?

欢迎联系我们,我们愿意为您解答任何有关网站疑难问题!

您身边的【网站建设专家】

搜索千万次不如咨询1次

主营项目:网站建设,手机网站,响应式网站,SEO优化,小程序开发,公众号系统,软件开发等

立即咨询 15368564009
在线客服
嘿,我来帮您!