当前位置:网站首页>Flink1.13 SQL basic syntax (I) DDL, DML

Flink1.13 SQL basic syntax (I) DDL, DML

2022-07-04 21:39:00 Game programming

DDL And create sentence

CREATE Statement to the current or specified Catalog Medium registry 、 surface 、 View or function . Registered Library 、 surface 、 Views and functions can be found in SQL Use in query .

(1) Types of columns in the table

1、 Physical column

It defines the name of the field in the data stored in the physical media 、 Type and order .

CREATE TABLE MyTable (  `user_id` BIGINT,  `name` STRING) WITH (  ...);

2、 Metadata Columns

The metadata column is SQL Standard extension , Allow access to some metadata that the data source itself has . The metadata column consists of METADATA Keyword identification .
for example , We can use metadata columns from Kafka Read from data Kafka The timestamp that comes with the data (Kafka Time stamp this piece of data ), Then we can go to Flink SQL Use this timestamp in ,
For example, time-based window operation .

create table MyTable(  `user_id` BIGINT,  `name` STRING,  --  Continue reading kafka Its own timestamp   `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'  ) WITH(  'connector' = 'kafka')
  • If the custom column name and Connector In the definition of metadata If the name of the field is the same ,FROM xxx Clauses can be omitted .

  • If you customize the data type of the column and Connector As defined in metadata If the data type of the field is inconsistent , When the program runs, it will automatically cast Strong go .
    But this requires that the two data types can be forcibly converted .

  • By default ,Flink SQL planner Think metadata Columns can Read It's fine too write in Of . However, the metadata information of some external storage systems can only be used for reading , Can't write .
    So in the scenario of writing to a table , We can use VIRTUAL Keyword to identify that a metadata column is not written to external storage ( Non persistence ).

CREATE TABLE MyTable (  -- sink  Will write   `timestamp` BIGINT METADATA,  -- sink  Do not write   `offset` BIGINT METADATA VIRTUAL,  `user_id` BIGINT,  `name` STRING,) WITH (  'connector' = 'kafka'  ...);

3、 Calculated column

The calculation column is actually in the process of creating a table DDL when , You can take some existing columns and generate new columns through some user-defined operations .

  • Calculated columns can contain other columns 、 Constant or function , But you can't write a sub query in .

  • The calculation column is generally used to define the time attribute ( Because in SQL The time attribute in the task can only be in DDL In the definition of , Can't be in DML The statement defines .
    The processing time : Use PROCTIME() Function to define the processing time column

CREATE TABLE user_actions ( user_name STRING, data STRING, --  Use the following sentence to  user_action_time  Declared as processing time  user_action_time AS PROCTIME()) WITH ( ...);

Event time : The timestamp of the event time can be declared in Watermark Pre treatment before . For example, if the field is not TIMESTAMP(3) Types or timestamps are nested in JSON In the string , You can use calculated columns for preprocessing .

CREATE TABLE user_actions (  user_name STRING,  data STRING,  -- 1.  This  ts  This is the common millisecond timestamp   ts BIGINT,  -- 2.  Convert the millisecond timestamp to  TIMESTAMP_LTZ  type ( Calculated column )  time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),  -- 3.  Use the following sentence to  user_action_time  Declared as event time , And declare  watermark  The generation rules of , namely  user_action_time  reduce  5  second   --  The field type of the event time column must be  TIMESTAMP  perhaps  TIMESTAMP_LTZ  type   WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND) WITH (  ...);
  • Be careful !!! And the virtual metadata Columns are similar , The calculation column can only be read but not written .

(2)WaterMark The definition of

Watermark Is in Create Table As defined in . Specifically SQL The grammatical standard is WATERMARK FOR rowtime_column_name AS watermark_strategy_expression.
rowtime_column_name: The event time attribute field of the table . The column must be TIMESTAMP(3)、TIMESTAMP_LTZ(3) class , This time can be a calculation column .
Flink SQL There are several kinds of WATERMARK Production strategy :

  • Bounded disorder : Set as WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit. This kind of policy can be used to set the maximum out of order time ,
    If set to WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘5’ SECOND, Then what is generated is to run 5s The delay Watermark.
    This is usually used Watermark Generation strategy , Such kind Watermark Generation strategies are usually used in scenarios where data is out of order , And corresponding to the actual scene , The data will be out of order , So they basically use this kind of strategy .

  • Strictly ascending : Set as WATERMARK FOR rowtime_column AS rowtime_column. Generally, this method is basically not used .

  • Increasing : Set as WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND. Generally, this method is basically not used . If you set this class , The same timestamp is allowed .

(3)Create Table With Clause

With Clause is when creating a table , Describe the data source 、 The specific metadata information stored externally of the data collection .

CREATE TABLE KafkaTable (  `user_id` BIGINT,  `item_id` BIGINT,  `behavior` STRING,  `ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH (  'connector' = 'kafka',  'topic' = 'anli',  'properties.bootstrap.servers' = 'hadoop01:9092',  'properties.group.id' = 'test',  'scan.startup.mode' = 'earliest-offset',  'format' = 'json')
 Be careful :Flink SQL  in  Connector  In fact, that is  Flink  Interface for linking external data sources . Take a similar example , stay  Java  Want to connect to  MySQL, Need to use  mysql-connector-java  Provided by the package  Java API  To link . Mapping to  Flink SQL  in , stay  Flink SQL  To connect to  Kafka, Need to use  kafka connectorFlink SQL  A series of built-in  Connector, concrete  https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/

DML And With Clause

With Statement and offline Hive SQL With Same statement , Using it can make your code logic clearer .

-- with  Clause WITH orders_with_total AS (    SELECT         order_id        , price + tax AS total    FROM Orders)SELECT     order_id    , SUM(total)FROM orders_with_totalGROUP BY     order_id;

DML And Select、Where Clause

INSERT INTO target_tableSELECT PRETTY_PRINT(order_id) FROM OrdersWhere id > 3

This SQL Corresponding real-time tasks , hypothesis Orders by kafka,target_table Also for the Kafka, When executed , Three operators will be generated :
Data source operator (From Order): Connect to Kafka topic, The data source operator has been running , Real time slave Order Kafka Read data one by one in , Then send one by one to the downstream Filtering and field normalization operators
Filtering and field normalization operators (Where id > 3 and PRETTY_PRINT(order_id)): Receive one piece of data sent by the upstream operator , And then determine id > 3? Judge the result as true Data execution PRETTY_PRINT UDF after ,
Send the calculation result data to the downstream one by one Data sink operator .
Data sink operator (INSERT INTO target_table): Receive one piece of data sent by the upstream , Write to target_table Kafka in

DML And SELECT DISTINCT Clause

Statement and offline Hive SQL SELECT DISTINCT Same statement , Used as a basis for key Data De duplication

INSERT into target_tableSELECT     DISTINCT id FROM Orders

This SQL Corresponding real-time tasks , hypothesis Orders by kafka,target_table Also for the Kafka, When executed , Three operators will be generated :
Data source operator (From Order): Connect to Kafka topic, The data source operator has been running , Real time slave Order Kafka Read data one by one in , Then send one by one to the downstream Operator de duplication
Operator de duplication (DISTINCT id): Receive one piece of data sent by the upstream operator , And then judge this id Have you been here before , The way to judge is to use Flink Medium state state , If this already exists in the State id 了 ,
It means you've been here , Don't send to the downstream operator , If this is not in the State id, It means you haven't been here , Then send to the downstream operator , It is also sent to the downstream one by one Data sink operator
Data sink operator (INSERT INTO target_table): Receive one piece of data sent by the upstream , Write to target_table Kafka in

DML: Window aggregation

Scroll the window (TUMBLE)
The sliding window (HOP)
Session window (SESSION)
Progressive window (CUMULATE)

(1) Scroll the window (TUMBLE)

for example , Specify a size of 5 Minutes of scrolling window . under these circumstances ,Flink To every 5 Minutes to open a new window , Each of them will be divided into a unique number 5 In the window of minutes .

 Group Window Aggregation  Scrolling window  SQL  Grammar is to put  tumble window  Your statement is written in  group by  clause , namely  tumble(row_time, interval '1' minute), The first parameter is the timestamp of the event time ; The second parameter is the size of the scrolling window .Windowing TVF  The way to write a scrolling window is to  tumble window  The declaration of is written in the... Of the data source  Table  clause , namely  TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND)), Contains three parameters . The first parameter  TABLE source_table  Declare data source table ; The second parameter  DESCRIPTOR(row_time)  Declare the timestamp of the data source ; The third parameter  INTERVAL '60' SECOND  Declare that the scroll window size is  1 min.

Actual case : Simple and common dimension minute level simultaneous online users 、 Total sales
flink13 Pre version processing
Group Window Aggregation(1.13 There were only such schemes before , This plan is in 1.13 And later versions have been marked obsolete )

public class _02_GroupWindowAggr {    public static void main(String[] args) throws Exception {        Configuration configuration = new Configuration();        configuration.setString("rest.port","9091");        //  execution environment         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);        env.setParallelism(1);        //  Create a table environment         EnvironmentSettings settings = EnvironmentSettings                .newInstance()                .inStreamingMode()                .build();        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);        String sourceTableSql = "create  table source_table(\n" +                "  --  Dimensional data \n" +                "  dim STRING,\n" +                "  --  user id\n" +                "  user_id BIGINT,\n" +                "  --  sales \n" +                "  price BIGINT,\n" +                "  --  Event timestamp \n" +                "  row_time as cast(CURRENT_TIMESTAMP  AS timestamp(3) ),\n" +                "  --  Set the water line , The delay time is 5 second \n" +                "  WATERMARK FOR row_time AS row_time - interval '5' SECOND\n" +                ") with (\n" +                "  'connector' = 'datagen',\n" +                "  'rows-per-second' = '10',\n" +                "  'fields.dim.length' = '1',\n" +                "  'fields.user_id.min' = '1',\n" +                "  'fields.user_id.max' = '100000',\n" +                "  'fields.price.min' = '1',\n" +                "  'fields.price.max' = '100000'\n" +                ")";        String sinkTableSql = "CREATE TABLE sink_table (\n" +                "    dim STRING,\n" +                "    pv BIGINT,\n" +                "    sum_price BIGINT,\n" +                "    max_price BIGINT,\n" +                "    min_price BIGINT,\n" +                "    uv BIGINT,\n" +                "    window_start bigint\n" +                ") WITH (\n" +                "  'connector' = 'print'\n" +                ")";        String insertTableSql = "INSERT INTO sink_table\n" +                "select\n" +                "  dim,\n" +                "  count(1) as  pv,\n" +                "  sum(price) as sum_price,\n" +                "  max(price) as max_price,\n" +                "  min(price) as min_price,\n" +                "  count(distinct user_id) as uv,\n" +                "  UNIX_TIMESTAMP(cast (tumble_start(row_time,interval '1' minute) as STRING ) ) * 1000 AS window_start\n" +                "from source_table\n" +                "group by dim,tumble(row_time,interval '1' minute )";        tEnv.executeSql(sourceTableSql);        tEnv.executeSql(sinkTableSql);        tEnv.executeSql(insertTableSql);    }}

flink13 Version processing method

public class _03_WindowTVF {    public static void main(String[] args) {        Configuration configuration = new Configuration();        configuration.setString("rest.port","9091");        //  execution environment         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);        env.setParallelism(1);        //  Create a table environment         EnvironmentSettings settings = EnvironmentSettings                .newInstance()                .inStreamingMode()                .build();        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);        String sourceTableSql = "create  table source_table(\n" +                "  --  Dimensional data \n" +                "  dim STRING,\n" +                "  --  user id\n" +                "  user_id BIGINT,\n" +                "  --  sales \n" +                "  price BIGINT,\n" +                "  --  Event timestamp \n" +                "  row_time as cast(CURRENT_TIMESTAMP  AS timestamp(3) ),\n" +                "  --  Set the water line , The delay time is 5 second \n" +                "  WATERMARK FOR row_time AS row_time - interval '5' SECOND\n" +                ") with (\n" +                "  'connector' = 'datagen',\n" +                "  'rows-per-second' = '10',\n" +                "  'fields.dim.length' = '1',\n" +                "  'fields.user_id.min' = '1',\n" +                "  'fields.user_id.max' = '100000',\n" +                "  'fields.price.min' = '1',\n" +                "  'fields.price.max' = '100000'\n" +                ")";        String sinkTableSql = "CREATE TABLE sink_table (\n" +                "    dim STRING,\n" +                "    pv BIGINT,\n" +                "    sum_price BIGINT,\n" +                "    max_price BIGINT,\n" +                "    min_price BIGINT,\n" +                "    uv BIGINT,\n" +                "    window_start bigint\n" +                ") WITH (\n" +                "  'connector' = 'print'\n" +                ")";        String insertTableSql = "INSERT INTO sink_table\n" +                "select\n" +                "  dim,\n" +                "  count(1) as  pv,\n" +                "  sum(price) as sum_price,\n" +                "  max(price) as max_price,\n" +                "  min(price) as min_price,\n" +                "  count(distinct user_id) as uv,\n" +                "  UNIX_TIMESTAMP(cast (window_start as STRING)) * 1000  AS window_start\n" +                "from table (\n" +                "  tumble(\n" +                "  table source_table,\n" +                "  descriptor(row_time),\n" +                "  interval '60' second\n" +                "  )\n" +                ")\n" +                "group by window_start,window_end,dim";        tEnv.executeSql(sourceTableSql);        tEnv.executeSql(sinkTableSql);        tEnv.executeSql(insertTableSql);    }}

(3) The sliding window (HOP)

Actual case : Simple and common dimension minute level simultaneous online users ,1 One minute output , Calculate recent 5 Minute data
Group Window Aggregation

--  Data processing logic insert into sink_tableSELECT dim,    UNIX_TIMESTAMP(CAST(hop_start(row_time, interval '1' minute, interval '5' minute) AS STRING)) * 1000 as window_start,     count(distinct user_id) as uvFROM source_tableGROUP BY dim    , hop(row_time, interval '1' minute, interval '5' minute)

Group Window Aggregation The way to write a scrolling window is to hop window Your statement is written in group by clause , namely hop(row_time, interval ‘1’ minute, interval ‘5’ minute). among :
The first parameter is the timestamp of the event time ; The second parameter is the sliding step of the sliding window ; The third parameter is the sliding window size .
Windowing TVF programme (1.13 Only support Streaming Mission )

--  Data processing logic insert into sink_tableSELECT     dim,    UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,     count(distinct user_id) as bucket_uvFROM TABLE(HOP(        TABLE source_table        , DESCRIPTOR(row_time)        , INTERVAL '1' MINUTES, INTERVAL '5' MINUTES))GROUP BY window_start,          window_end,         dim

Windowing TVF The way to write a scrolling window is to hop window The declaration of is written in the... Of the data source Table clause ,
namely TABLE(HOP(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ‘1’ MINUTES, INTERVAL ‘5’ MINUTES)), Contains four parameters :
The first parameter TABLE source_table Declare data source table ;
The second parameter DESCRIPTOR(row_time) Declare the timestamp of the data source ;
The third parameter INTERVAL ‘1’ MINUTES Declare that the sliding step size of the scrolling window is 1 min;
Fourth parameter INTERVAL ‘5’ MINUTES Declare that the scroll window size is 5 min.

(3)Session window (SESSION)

Session Window definition :Session Time window and scrolling 、 Sliding windows are different , It has no fixed duration , If in the defined interval (Session Gap) There is no new data in , be Session The window closes .
Actual case : Calculate each user's active period ( One Session) The total number of goods purchased , If the user 5 Minutes without activity is regarded as Session To break off
1.13 In the version Flink SQL I won't support it Session Window Window TVF
Group Window Aggregation programme :

--  Data processing logic insert into sink_tableSELECT    dim,   UNIX_TIMESTAMP(CAST(session_start(row_time, interval '5' minute) AS STRING)) * 1000 as window_start,    count(1) as pvFROM source_tableGROUP BY dim     , session(row_time, interval '5' minute)

Above SQL The task is in the whole Session The data will not be output until the window is finished .
Group Window Aggregation in Session The way to write a window is to put session window Your statement is written in group by clause , namely session(row_time, interval ‘5’ minute). among :
The first parameter is the timestamp of the event time ; The second parameter is Session gap interval .

(4) Progressive window (CUMULATE)

Progressive window can be considered as opening a scrolling window with the largest window size first (max window size), Then, according to the trigger time interval set by the user (window step) Split this scrolling window into multiple windows , These windows have the same window starting point and different window ending points .
Actual case : The cumulative number of minutes up to the current day money(sum(money)), duplicate removal id Count (count(distinct id)). The daily representative progressive window size is 1 God , Minute represents progressive window, and the moving step is minute level .
Its characteristic is , The output result of each minute is the result accumulated from the zero point of the day to the current .
At present, only Windowing TVF Program support :

--  Data processing logic insert into sink_tableSELECT     UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,     window_start,     sum(money) as sum_money,    count(distinct id) as count_distinct_idFROM TABLE(CUMULATE(       TABLE source_table       , DESCRIPTOR(row_time)       , INTERVAL '60' SECOND       , INTERVAL '1' DAY))GROUP BY    window_start,     window_end

Windowing TVF The way to write a scrolling window is to cumulate window The declaration of is written in the... Of the data source Table clause , namely TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ‘60’ SECOND, INTERVAL ‘1’ DAY)),
It contains four parameters :
The first parameter TABLE source_table Declare data source table ;
The second parameter DESCRIPTOR(row_time) Declare the timestamp of the data source ;
The third parameter INTERVAL ‘60’ SECOND Declare that the progressive step triggered by the progressive window is 1 min.
Fourth parameter INTERVAL ‘1’ DAY Declares that the size of the entire progressive window is 1 God , The next day, a new window was opened to accumulate again .

(5) Window TVF Support Grouping Sets、Rollup、Cube

offline Hive SQL Those who use experience will think , If you have the Grouping Sets, We can use it directly Grouping Sets Write the dimension combination in one line SQL in , Easy to write and efficient to execute . Of course ,Flink Support this feature .
at present Grouping Sets Only in Window TVF Chinese support , I won't support it Group Window Aggregation.
Actual case , Calculate the minute summary accumulated from zero point of each day to the current minute 、age、sex、age+sex Number of users in dimension .

--  The user accesses the schedule CREATE TABLE source_table (    age STRING,    sex STRING,    user_id BIGINT,    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND) WITH (  'connector' = 'datagen',  'rows-per-second' = '1',  'fields.age.length' = '1',  'fields.sex.length' = '1',  'fields.user_id.min' = '1',  'fields.user_id.max' = '100000');CREATE TABLE sink_table (    age STRING,    sex STRING,    uv BIGINT,    window_end bigint) WITH (  'connector' = 'print');--  The logic of data processing insert into sink_tableSELECT     UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,     if (age is null, 'ALL', age) as age,    if (sex is null, 'ALL', sex) as sex,    count(distinct user_id) as bucket_uvFROM TABLE(CUMULATE(       TABLE source_table       , DESCRIPTOR(row_time)       , INTERVAL '5' SECOND       , INTERVAL '1' DAY))GROUP BY     window_start,     window_end,    -- grouping sets  How to write it     GROUPING SETS (        ()        , (age)        , (sex)        , (age, sex)    )

(6) DML And Group polymerization

Group The difference between aggregation and window aggregation mentioned above , It's about Group Aggregation is the grouping of data by category , Such as age 、 Gender , It's horizontal ; Window aggregation is to group data in time granularity , It's vertical .
How to convert window aggregation into Group polymerization . Suppose a window aggregation is based on 1 Minutes of granularity for polymerization , as follows SQL:

--  Data processing logic insert into sink_tableselect dim,    count(*) as pv,    sum(price) as sum_price,    max(price) as max_price,    min(price) as min_price,    --  Calculation  uv  Count     count(distinct user_id) as uv,    UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000  as window_startfrom source_tablegroup by    dim,    --  according to  Flink SQL tumble  Window writing divides windows     tumble(row_time, interval '1' minute)

Convert to Group The expression of aggregation is as follows :

--  Data processing logic insert into sink_tableselect dim,    count(*) as pv,    sum(price) as sum_price,    max(price) as max_price,    min(price) as min_price,    --  Calculation  uv  Count     count(distinct user_id) as uv,    cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_startfrom source_tablegroup by    dim,    --  Put the second level timestamp  / 60  Turn into  1min    cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

But window aggregation and Group by The difference lies in the aggregation :
Essential difference : Window aggregation has temporal semantics , Its essence is to realize that after the window ends and outputs the result , If there is late data in the follow-up, the original results will not be changed , That is, the output result value is the fixed value ( Don't consider allowLateness).
and Group by Aggregation has no temporal semantics , No matter how long the data is late , As long as the data comes , Recall the result data of the last output , Then send the calculated new result data
Operational level : Window aggregation is and Time The binding of , Window aggregation, in which the calculation result of the window is triggered by time (Watermark) Pushing .
Group by Aggregation is driven entirely by data and triggers computation , A new piece of data is used to calculate the result based on this data ; It can be seen that the implementation methods of the two are also very different .
sql Semantics :

Orders  by  kafka,target_table  by  Kafka, This  SQL  Generated real-time tasks , When executed , Three operators will be generated :  Data source operator (From Order): The data source operator has been running , Real time slave  Order Kafka  Read data one by one in , Then send one by one to the downstream  Group  Aggregation operator , Sending data downstream  shuffle  The strategy is based on  group by  Medium  key  Send , same  key  To the same  SubTask( Concurrent )  in  Group  Aggregation operator (group by key + sum\count\max\min): Receive one piece of data sent by the upstream operator , Go to the State  state  Look for this  key  Previous  sum\count\max\min  result . If there is a result  oldResult, Take it out and compare it with the current data  sum\count\max\min  Calculate this  key  New results of  newResult, And put the new results  [key, newResult]  Update to  state  in , Before sending the result of the new calculation downstream , First send a message to recall the last result  -[key, oldResult], Then send the new results downstream  +[key, newResult]; If  state  There is no current  key  Result , Then directly use the current data to calculate  sum\max\min  result  newResult, And put the new results  [key, newResult]  Update to  state  in , It's the first time to swim down , You do not need to send a fallback message first , Direct transmission  +[key, newResult].  Data sink operator (INSERT INTO target_table): Receive one piece of data sent by the upstream , Write to  target_table Kafka  in 

Group Aggregation support Grouping sets、Rollup、Cube
author :undo_try

Game programming , A game development favorite ~

If the picture is not displayed for a long time , Please use Chrome Kernel browser .

原网站

版权声明
本文为[Game programming]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/185/202207042035111024.html