当前位置:网站首页>1.19.11. SQL client, start SQL client, execute SQL query, environment configuration file, restart policy, user-defined functions, constructor parameters

1.19.11. SQL client, start SQL client, execute SQL query, environment configuration file, restart policy, user-defined functions, constructor parameters

2022-07-07 04:16:00 Game programming

1.19.11.SQL client introduction start-up SQL Client command line interface perform SQL Inquire about To configure Environment profile Restart strategy rely on Custom function (User-defined Functions) constructors parameters The separation of SQL Inquire about View A temporary table (Temporal Table)

1.19.11.SQL client

For details, please refer to :https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sqlClient.html
Flink Of Table & SQL API Can handle SQL Query statements written in , But these queries need to be embedded with Java or Scala Written in the table program . Besides , These programs need to be packaged with a build tool before they are submitted to the cluster . This more or less limits Java/Scala Program pair Flink Use .
SQL The purpose of the client is to provide a simple way to write 、 Debug and submit program to Flink On the cluster , Without having to write a line Java or Scala Code .SQL Client command line interface (CLI) It can retrieve and visualize the results generated in real time in distributed applications from the command line .

1.19.11.SQL client 、 start-up SQL client 、 perform SQL Inquire about 、 Environment profile 、 Restart strategy 、 Custom function (User-defined Functions)、 constructors parameters - The first 1 Zhang introduction

This section describes how to start... From the command line (setup) And run your first Flink SQL Program .
SQL The client is bundled in the regular Flink Distribution in , So it can run directly . It only needs a running Flink The cluster can execute the table program in it . About settings Flink More about clustering , See the clustering and deployment section . If you just want to try SQL client , You can also start the local cluster with the following command :

./bin/start-cluster.sh start-up SQL Client command line interface

SQL Client The script is also located in Flink Of bin Directory . future , The user can start the embedded system standalone Process or by connecting to a remote server SQL Client gateway to start SQL Client command line interface . Currently only supported embedded Pattern . It can be started in the following ways CLI:

./bin/sql-client.sh embedded
1.19.11.SQL client 、 start-up SQL client 、 perform SQL Inquire about 、 Environment profile 、 Restart strategy 、 Custom function (User-defined Functions)、 constructors parameters - The first 2 Zhang

By default ,SQL The client will be from ./conf/sql-client-defaults.yaml Middle read configuration . More information about the environment profile structure , See the configuration section (https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sqlClient.html#environment-files) perform SQL Inquire about

After the command line interface is started , You can use HELP The command lists all available SQL sentence . Enter first SQL Query the statement and press Enter Key execution , You can verify whether your settings and cluster connections are correct :

Flink SQL> SELECT 'Hello World';
1.19.11.SQL client 、 start-up SQL client 、 perform SQL Inquire about 、 Environment profile 、 Restart strategy 、 Custom function (User-defined Functions)、 constructors parameters - The first 3 Zhang

This query does not require table source, And produce only one line of results .CLI The results will be retrieved from the cluster and visualized . Press Q Key to exit the results view .
CLI Three modes are provided for maintaining and visualizing results .
 Table mode (table mode) Materialize the result in memory , The results are visualized in regular pagination tables . Execute the following command to enable :

Flink SQL> SET execution.result-mode=table;[INFO] Session property has been set.Flink SQL> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

give the result as follows :

1.19.11.SQL client 、 start-up SQL client 、 perform SQL Inquire about 、 Environment profile 、 Restart strategy 、 Custom function (User-defined Functions)、 constructors parameters - The first 4 Zhang

 Change log mode (changelog mode) Will not materialize and visualize the results , But by inserting (+) And revocation (-) A continuous query consisting of results streams :

Flink SQL> SET execution.result-mode=changelog;Flink SQL> select name,COUNT(*) AS cnt FROM (VALUES('Bob'),('Alice'),('Greg'),('Bob')) as NameTable(name) GROUP BY name;

give the result as follows :

1.19.11.SQL client 、 start-up SQL client 、 perform SQL Inquire about 、 Environment profile 、 Restart strategy 、 Custom function (User-defined Functions)、 constructors parameters - The first 5 Zhang

Tableau Pattern (tableau mode) Closer to traditional databases , The execution results will be printed directly on the screen in tabular form . The specific content displayed will depend on the job Different execution modes (execution.type):
Note that when you run a streaming query using this mode ,Flink The results will be continuously printed on the current screen . If the input of this streaming query is a limited data set , that Flink After processing all the data , Will automatically stop the job , At the same time, the printing on the screen will stop accordingly . If you want to end this query ahead of time , So you can use it directly CTRL-C Key , This will stop the job and stop printing on the screen .

Flink SQL> SET execution.result-mode=tableau;[INFO] Session property has been set.Flink SQL> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
1.19.11.SQL client 、 start-up SQL client 、 perform SQL Inquire about 、 Environment profile 、 Restart strategy 、 Custom function (User-defined Functions)、 constructors parameters - The first 6 Zhang

These result patterns are in SQL The prototype design process of query is very useful . These patterns store the results in SQL client Of Java In heap memory . In order to maintain CLI The interface responds in time , The change log mode displays only the most recent 1000 A change . Table mode supports browsing larger results , These results are only affected by the available main memory and the maximum number of rows configured (max-table-result-rows) The limitation of .
 Be careful : Queries executed in a batch environment can only be executed in tabular mode or Tableau Mode to retrieve .
After defining the query statement , It can be used as a long-running stand-alone Flink The job is submitted to the cluster . So , The target system needs to use INSERT INTO Statement specifies to store the result . The configuration section explains how to declare how to read data table source, Write data sink And how to configure other tabulator properties . To configure

[[email protected] flink-1.11.1]# ./bin/sql-client.sh embedded --helpSLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/home/admin/installed/flink-1.11.1/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/usr/hdp/!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]Mode "embedded" submits Flink jobs from the local machine.  Syntax: embedded [OPTIONS]  "embedded" mode options:     -d,--defaults <environment file>      The environment properties with which                                           every new session is initialized.                                           Properties might be overwritten by                                           session properties.     -e,--environment <environment file>   The environment properties to be                                           imported into the session. It might                                           overwrite default environment                                           properties.     -h,--help                             Show the help message with                                           descriptions of all options.     -hist,--history <History file path>   The file which you want to save the                                           command history into. If not                                           specified, we will auto-generate one                                           under your user's home directory.     -j,--jar <JAR file>                   A JAR file to be imported into the                                           session. The file might contain                                           user-defined classes needed for the                                           execution of statements such as                                           functions, table sources, or sinks.                                           Can be used multiple times.     -l,--library <JAR directory>          A JAR file directory with which every                                           new session is initialized. The files                                           might contain user-defined classes                                           needed for the execution of                                           statements such as functions, table                                           sources, or sinks. Can be used                                           multiple times.     -pyarch,--pyArchives <arg>            Add python archive files for job. The                                           archive files will be extracted to                                           the working directory of python UDF                                           worker. Currently only zip-format is                                           supported. For each archive file, a                                           target directory be specified. If the                                           target directory name is specified,                                           the archive file will be extracted to                                           a name can directory with the                                           specified name. Otherwise, the                                           archive file will be extracted to a                                           directory with the same name of the                                           archive file. The files uploaded via                                           this option are accessible via                                           relative path. '#' could be used as                                           the separator of the archive file                                           path and the target directory name.                                           Comma (',') could be used as the                                           separator to specify multiple archive                                           files. This option can be used to                                           upload the virtual environment, the                                           data files used in Python UDF (e.g.:                                           --pyArchives                                           file:///tmp/py37.zip,file:///tmp/data                                           .zip#data --pyExecutable                                           py37.zip/py37/bin/python). The data                                           files could be accessed in Python                                           UDF, e.g.: f = open('data/data.txt',                                           'r').     -pyexec,--pyExecutable <arg>          Specify the path of the python                                           interpreter used to execute the                                           python UDF worker (e.g.:                                           --pyExecutable                                           /usr/local/bin/python3). The python                                           UDF worker depends on Python 3.5+,                                           Apache Beam (version == 2.19.0), Pip                                           (version >= 7.1.0) and SetupTools                                           (version >= 37.0.0). Please ensure                                           that the specified environment meets                                           the above requirements.     -pyfs,--pyFiles <pythonFiles>         Attach custom python files for job.                                           These files will be added to the                                           PYTHONPATH of both the local client                                           and the remote python UDF worker. The                                           standard python resource file                                           suffixes such as .py/.egg/.zip or                                           directory are all supported. Comma                                           (',') could be used as the separator                                           to specify multiple files (e.g.:                                           --pyFiles                                           file:///tmp/myresource.zip,hdfs:///$n                                           amenode_address/myresource2.zip).     -pyreq,--pyRequirements <arg>         Specify a requirements.txt file which                                           defines the third-party dependencies.                                           These dependencies will be installed                                           and added to the PYTHONPATH of the                                           python UDF worker. A directory which                                           contains the installation packages of                                           these dependencies could be specified                                           optionally. Use '#' as the separator                                           if the optional parameter exists                                           (e.g.: --pyRequirements                                           file:///tmp/requirements.txt#file:///                                           tmp/cached_dir).     -s,--session <session identifier>     The identifier for a session.                                           'default' is the default identifier.     -u,--update <SQL update statement>    Experimental (for testing only!):                                           Instructs the SQL Client to                                           immediately execute the given update                                           statement after starting up. The                                           process is shut down after the                                           statement has been submitted to the                                           cluster and returns an appropriate                                           return code. Currently, this feature                                           is only supported for INSERT INTO                                           statements that declare the target                                           sink table. Environment profile

SQL Relevant environment variables need to be configured before query execution . Environment profile Defined catalog、table sources、table sinks、 User defined functions and other properties required for execution or deployment .

#  Definition table , Such as  source、sink、 View or temporary table .tables:  - name: MyTableSource    type: source-table    update-mode: append    connector:      type: filesystem      path: "/path/to/something.csv"    format:      type: csv      fields:        - name: MyField1          data-type: INT        - name: MyField2          data-type: VARCHAR      line-delimiter: "\n"      comment-prefix: "#"    schema:      - name: MyField1        data-type: INT      - name: MyField2        data-type: VARCHAR  - name: MyCustomView    type: view    query: "SELECT MyField2 FROM MyTableSource"#  Define user-defined functions functions:  - name: myUDF    from: class    class: foo.bar.AggregateUDF    constructor:      - 7.6      - false#  Definition  catalogscatalogs:   - name: catalog_1     type: hive     property-version: 1     hive-conf-dir: ...   - name: catalog_2     type: hive     property-version: 1     default-database: mydb2     hive-conf-dir: ...#  Change the basic execution behavior attributes of the table program .execution:  planner: blink                    #  Optional : 'blink' ( Default ) or  'old'  type: streaming                   #  Mandatory : The execution mode is  'batch'  or  'streaming'  result-mode: table                #  Mandatory :'table'  or  'changelog'  max-table-result-rows: 1000000    #  Optional :'table'  The maximum number of lines that can be maintained in mode ( The default is  1000000, Less than  1  It means unlimited )  time-characteristic: event-time   #  Optional : 'processing-time'  or  'event-time' ( Default )  parallelism: 1                    #  Optional :Flink  The number of parallels ( The default is  1)  periodic-watermarks-interval: 200 #  Optional : periodic  watermarks  Interval of ( Default  200 ms)  max-parallelism: 16               #  Optional :Flink  Maximum number of parallels ( Default  128)  min-idle-state-retention: 0       #  Optional : Table the minimum idle state time of the program   max-idle-state-retention: 0       #  Optional : Table the maximum idle state time of the program   current-catalog: catalog_1        #  Optional : Current session  catalog  The name of ( The default is  'default_catalog')  current-database: mydb1           #  Optional : At present  catalog  Current database name of                                     #   ( Default to current  catalog  The default database for )  restart-strategy:                 #  Optional : Restart strategy (restart-strategy)    type: fallback                  #    By default “ Back off ” To the global restart strategy #  Configuration options for tuning and tuning table programs .#  In dedicated ” To configure ” The complete list of options and their default values can be found on the page .configuration:  table.optimizer.join-reorder-enabled: true  table.exec.spill-compression.enabled: true  table.exec.spill-compression.block-size: 128kb#  Describe the properties of the table program submission cluster .deployment:  response-timeout: 5000

The above configuration :
 Define a CSV Read from a file table source MyTableSource The environment needed ,
 Defines a view MyCustomView , This view uses SQL Query the declared virtual table ,
 Defines a user-defined function myUDF, This function can be instantiated using the class name and two constructor parameters ,
 Connect to two Hive catalogs And use catalog_1 As the current directory , use mydb1 As the current database of the directory ,
streaming In mode, use blink planner The characteristic of running time is event-time And parallelism is 1 The sentence of ,
 stay table Run tentatively in the result mode (exploratory) Query for ,
 And connect through configuration options (join) Reorder and overflow to make some planning adjustments .
According to usage , The configuration can be split into multiple files . therefore , In general ( use --defaults Specify the default Environment Profile ) And based on each session ( use --environment Specify the session environment profile ) To create an environment configuration file . Every CLI All sessions will belong to session Default property initialization of property . for example , The default environment profile can specify all that are available for query in each session table source, The session environment configuration file only declares specific state retention time and parallelism . start-up CLI Application time , Both the default environment profile and the session environment profile can be specified . If no default environment profile is specified , be SQL The client will be in Flink Search in the configuration directory of ./conf/sql-client-defaults.yaml.
Be careful : stay CLI Properties set in the session ( Such as SET command ) The highest priority :

CLI commands > session environment file > defaults environment file Restart strategy

Restart policy control Flink Restart method when the job fails . And Flink The global restart strategy of the cluster is similar , The detailed restart configuration can be declared in the environment configuration file .
Flink The following strategies are supported :

execution:  #  Return to  flink-conf.yaml  Global policy defined in   restart-strategy:    type: fallback  #  The job fails directly and does not attempt to restart   restart-strategy:    type: none  #  The maximum given number of times to restart the job   restart-strategy:    type: fixed-delay    attempts: 3      #  The number of retries before the job is declared failed ( Default :Integer.MAX_VALUE)    delay: 10000     #  The interval between retries , In Milliseconds ( Default :10  second )  #  Continue to try as long as the maximum number of failures per time interval is not exceeded   restart-strategy:    type: failure-rate    max-failures-per-interval: 1   #  Maximum number of retries per interval ( Default :1)    failure-rate-interval: 60000   #  The interval between monitoring the failure rate , In Milliseconds     delay: 10000                   #  The interval between retries , In Milliseconds ( Default :10  second ) rely on

SQL Don't ask for the client Maven perhaps SBT Set up Java project . contrary , You can do it routinely JAR Package submits dependencies to the cluster . You can also separate ( use --jar) Specify each JAR Packages or ( use --library) Define the whole library Dependency Library . Expand the system for connection ( Such as Apache Kafka) And the corresponding data format ( Such as JSON),Flink Out of the box JAR Bundles (ready-to-use JAR bundles). these JAR Each distribution of the package can be downloaded from Maven Download from the central database to .
Provided SQL JARs The complete list of and use documents can be found in the connection extension system page .
Provided SQL JARs The complete list of and use documents can be found in the connection extension system page .
The following example shows that from Apache Kafka Read from JSON File and as table source Environment profile for .

tables:  - name: TaxiRides    type: source-table    update-mode: append    connector:      property-version: 1      type: kafka      version: "0.11"      topic: TaxiRides      startup-mode: earliest-offset      properties:        bootstrap.servers: localhost:9092        group.id: testGroup    format:      property-version: 1      type: json      schema: "ROW<rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP>"    schema:      - name: rideId        data-type: BIGINT      - name: lon        data-type: FLOAT      - name: lat        data-type: FLOAT      - name: rowTime        data-type: TIMESTAMP(3)        rowtime:          timestamps:            type: "from-field"            from: "rideTime"          watermarks:            type: "periodic-bounded"            delay: "60000"      - name: procTime        data-type: TIMESTAMP(3)        proctime: true

TaxiRide The result format of the table is similar to that of most JSON The format is similar . Besides , It also adds rowtime attribute rowTime and processing-time attribute procTime.connector and format It is allowed to define attribute versions ( The current version is 1 ) For future backward compatibility . Custom function (User-defined Functions)

SQL The client allows users to create user-defined functions to SQL Inquire about . At present , These custom functions are limited to Java/Scala Write classes and Python file .
To provide Java/Scala The custom function of , You need to implement and compile function classes first , This function inherits from ScalarFunction、 AggregateFunction or TableFunction( See custom function ). One or more functions can be packaged into SQL Client's JAR Dependency .
To provide Python The custom function of , You need to write Python Function and use decorators pyflink.table.udf.udf or pyflink.table.udf.udtf To decorate ( see Python UDFs)).Python One or more functions can be placed in the file . Its Python Files and related dependencies need to be selected in the environment configuration file or command line ( see Command line usage ) Specially specified in the configuration ( see Python To configure ).
All functions are called before , Must be declared in advance in the environment configuration file .functions Each function class in the list must specify .
 Used to register functions name,
 The source of the function from( At present, it is limited to class(Java/Scala UDF) or python(Python UDF))
Java/Scala UDF Must specify :
 Declare a fully qualified function class class And for instantiation constructor Optional list of parameters .
Python UDF Must specify :
 Declare the full name fully-qualified-name, That is, the of the function “[module name].[object name]”

functions:  - name: java_udf               # required: name of the function    from: class                  # required: source of the function    class: ...                   # required: fully qualified class name of the function    constructor:                 # optional: constructor parameters of the function class      - ...                      # optional: a literal parameter with implicit type      - class: ...               # optional: full class name of the parameter        constructor:             # optional: constructor parameters of the parameter's class          - type: ...            # optional: type of the literal parameter            value: ...           # optional: value of the literal parameter  - name: python_udf             # required: name of the function    from: python                 # required: source of the function     fully-qualified-name: ...    # required: fully qualified class name of the function

about Java/Scala UDF, Make sure that the order and type of construction parameters specified by the function class are strictly matched . constructors parameters

According to user-defined functions , In the use of SQL Before , It is necessary to match the construction parameters to the corresponding types .
As shown in the above example , When declaring a user-defined function , You can use construction parameters to configure the corresponding classes , There are three ways :
Implicitly typed text values :SQL The client will automatically deduce the corresponding type according to the text . at present , Only support BOOLEAN、INT、 DOUBLE and VARCHAR .
If the type of automatic derivation does not match the expectation ( for example , You need VARCHAR Type of false), You can use explicit types instead .

- true         # -> BOOLEAN (case sensitive)- 42           # -> INT- 1234.222     # -> DOUBLE- foo          # -> VARCHAR

Display the text value of the type : To ensure type safety , It needs to be clearly stated type and value Parameters for property .

- type: DECIMAL  value: 11111111111111111

The following table lists the supported Java Parameter type and corresponding SQL type .

Java type SQL type
java.lang.FloatREAL, FLOAT
java.lang.IntegerINTEGER, INT

Other types ( for example TIMESTAMP and ARRAY)、 Original type and null Not yet .
**( nesting ) Class instance :** In addition to text values , You can also specify construction parameters by class and constructor Property to create ( nesting ) Class instance . This process can be executed recursively , Until the last construction parameters are described by text values .

- class: foo.bar.paramClass  constructor:    - StarryName    - class: java.lang.Integer      constructor:        - class: java.lang.String          constructor:            - type: VARCHAR              value: 3

Catalogs Can be YAML Attribute Collection definition , And in SQL Before the client starts, it is automatically registered in the running environment .
Users can specify in SQL CLI Which of them catalog To be regarded as current catalog, And which database catalog It can be used in the current database .

catalogs:   - name: catalog_1     type: hive     property-version: 1     default-database: mydb2     hive-conf-dir: <path of Hive conf directory>   - name: catalog_2     type: hive     property-version: 1     hive-conf-dir: <path of Hive conf directory>execution:   ...   current-catalog: catalog_1   current-database: mydb1 The separation of SQL Inquire about

Define end-to-end SQL The Conduit ,SQL Of INSERT INTO Statements can be directed to Flink The cluster submits long-running detached queries . The result of the query is output to division SQL In the extended system outside the client . This can cope with higher concurrency and more data .CLI It does not control the split query after submission .
sink MyTableSink Must be declared in the environment configuration file . See more about Flink Supported external systems and their configuration information , See connection page. Here's how Apache Kafka Of sink Example .

tables:  - name: MyTableSink    type: sink-table    update-mode: append    connector:      property-version: 1      type: kafka      version: "0.11"      topic: OutputTopic      properties:        bootstrap.servers: localhost:9092        group.id: testGroup    format:      property-version: 1      type: json      derive-schema: true    schema:      - name: rideId        data-type: BIGINT      - name: lon        data-type: FLOAT      - name: lat        data-type: FLOAT      - name: rideTime        data-type: TIMESTAMP(3)

SQL The client should ensure that the statement is successfully submitted to the cluster . Once the query is submitted ,CLI Will show about Flink Information about the job .

[INFO] Table update statement has been successfully submitted to the cluster:Cluster ID: StandaloneClusterIdJob ID: 6f922fe5cba87406ff23ae4a7bb79044Web interface: http://localhost:8081

Be careful :
After submission ,SQL The client does not track running Flink Job status . You can close after submitting CLI process , And it will not affect the separated query .Flink The restart strategy of is responsible for fault tolerance . Cancel the query with Flink Of web Interface 、 Command line or REST API . View

The view is a virtual table , Allowed to pass through SQL Query to define . The definition of the view is immediately parsed and validated . However , Submit routine INSERT INTO or SELECT Statement will not be executed immediately , Only when the view is accessed will it really execute .
Views can use environment configuration files or CLI Session to define .
The following example shows how to define multiple views in a file . The order of view registration is consistent with the environment configuration file that defines them . Support such as View A Dependent view B , View B Dependent view C Reference chain of .

tables:  - name: MyTableSource    # ...  - name: MyRestrictedView    type: view    query: "SELECT MyField2 FROM MyTableSource"  - name: MyComplexView    type: view    query: >      SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)      FROM MyTableSource      WHERE MyField2 > 200

Compare with table soruce and sink, The views defined in the session environment configuration file have the highest priority .
The view can also be in CLI session CREATE VIEW Statement to create :
CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource;
View can be in CLI Create , Can also be used DROP VIEW Statement delete :
Be careful :CLI The definition of view in is limited to the above syntax . Future versions will support defining view structures and adding escaped spaces to table names . A temporary table (Temporal Table)

The temporary table is on the history table of changes ( A parameterized ) View , The view returns the contents of the table at a specific point in time . This is very useful for joining the contents of one table to another at a specific timestamp . See the link temporary table page for more information .
The following example shows how to define a temporary table SourceTemporalTable:

tables:  #  Define... That contains updates to temporary tables  table source ( Or view )  - name: HistorySource    type: source-table    update-mode: append    connector: # ...    format: # ...    schema:      - name: integerField        data-type: INT      - name: stringField        data-type: STRING      - name: rowtimeField        data-type: TIMESTAMP(3)        rowtime:          timestamps:            type: from-field            from: rowtimeField          watermarks:            type: from-source  #  Define a temporary table on the change history table with time attribute and primary key   - name: SourceTemporalTable    type: temporal-table    history-table: HistorySource    primary-key: integerField    time-attribute: rowtimeField  # could also be a proctime field

As shown in the example ,table source, The definitions of views and temporary tables can be mixed . They are registered in the order defined in the environment configuration file . for example , Temporary tables can reference a view , This view depends on another view or table source.
author :to.to

Game programming , A game development favorite ~

If the picture is not displayed for a long time , Please use Chrome Kernel browser .


本文为[Game programming]所创,转载请带上原文链接,感谢