当前位置:网站首页>flink1.13 sql基础语法(一)DDL、DML

flink1.13 sql基础语法(一)DDL、DML

2022-07-04 20:35:00 游戏编程

DDL之create语句

CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。

(1)表中列的类型

1、物理列

其定义了物理介质中存储的数据中字段的名称、类型和顺序。

CREATE TABLE MyTable (  `user_id` BIGINT,  `name` STRING) WITH (  ...);

2、元数据列

元数据列是 SQL 标准的扩展,允许访问数据源本身具有的一些元数据。元数据列由 METADATA 关键字标识。
例如,我们可以使用元数据列从 Kafka 数据中读取 Kafka 数据自带的时间戳(Kafka 引擎给这条数据打上的时间戳标记),然后我们可以在 Flink SQL 中使用这个时间戳,
比如进行基于时间的窗口操作。

create table MyTable(  `user_id` BIGINT,  `name` STRING,  -- 继续读取kafka本身自带的时间戳  `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'  ) WITH(  'connector' = 'kafka')
  • 如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样的话,FROM xxx 子句是可以被省略的。

  • 如果自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致的话,程序运行时会自动 cast 强转。
    但是这要求两种数据类型是可以强转的。

  • 默认情况下,Flink SQL planner 认为 metadata 列是可以 读取 也可以 写入 的。但是有些外部存储系统的元数据信息是只能用于读取,不能写入的。
    那么在往一个表写入的场景下,我们就可以使用 VIRTUAL 关键字来标识某个元数据列不写入到外部存储中(不持久化)。

CREATE TABLE MyTable (  -- sink 时会写入  `timestamp` BIGINT METADATA,  -- sink 时不写入  `offset` BIGINT METADATA VIRTUAL,  `user_id` BIGINT,  `name` STRING,) WITH (  'connector' = 'kafka'  ...);

3、计算列

计算列其实就是在写建表的 DDL 时,可以拿已有的一些列经过一些自定义的运算生成的新列。

  • 计算列可以包含其他列、常量或者函数,但是不能写一个子查询进去。

  • 计算列一般是用于定义时间属性的(因为在 SQL 任务中时间属性只能在 DDL 中定义,不能在 DML 语句中定义。
    处理时间:使用 PROCTIME() 函数来定义处理时间列

CREATE TABLE user_actions ( user_name STRING, data STRING, -- 使用下面这句来将 user_action_time 声明为处理时间 user_action_time AS PROCTIME()) WITH ( ...);

事件时间:事件时间的时间戳可以在声明 Watermark 之前进行预处理。比如如果字段不是 TIMESTAMP(3) 类型或者时间戳是嵌套在 JSON 字符串中的,则可以使用计算列进行预处理。

CREATE TABLE user_actions (  user_name STRING,  data STRING,  -- 1. 这个 ts 就是常见的毫秒级别时间戳  ts BIGINT,  -- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型(计算列)  time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),  -- 3. 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒  -- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型  WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND) WITH (  ...);
  • 注意!!!和虚拟 metadata 列是类似的,计算列也是只能读不能写的。

(2)WaterMark的定义

Watermark 是在 Create Table 中进行定义的。具体 SQL 语法标准是 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression。
rowtime_column_name:表的事件时间属性字段。该列必须是 TIMESTAMP(3)、TIMESTAMP_LTZ(3) 类,这个时间可以是一个计算列。
Flink SQL 提供了几种 WATERMARK 生产策略:

  • 有界无序:设置方式为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。此类策略就可以用于设置最大乱序时间,
    假如设置为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘5’ SECOND,则生成的是运行 5s 延迟的 Watermark。
    一般都用这种 Watermark 生成策略,此类 Watermark 生成策略通常用于有数据乱序的场景中,而对应到实际的场景中,数据都是会存在乱序的,所以基本都使用此类策略。

  • 严格升序:设置方式为 WATERMARK FOR rowtime_column AS rowtime_column。一般基本不用这种方式。

  • 递增:设置方式为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。一般基本不用这种方式。如果设置此类,则允许有相同的时间戳出现。

(3)Create Table With 子句

With 子句就是在建表时,描述数据源、数据汇的具体外部存储的元数据信息的。

CREATE TABLE KafkaTable (  `user_id` BIGINT,  `item_id` BIGINT,  `behavior` STRING,  `ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH (  'connector' = 'kafka',  'topic' = 'anli',  'properties.bootstrap.servers' = 'hadoop01:9092',  'properties.group.id' = 'test',  'scan.startup.mode' = 'earliest-offset',  'format' = 'json')
注意:Flink SQL 中 Connector 其实就是 Flink 用于链接外部数据源的接口。举一个类似的例子,在 Java 中想连接到 MySQL,需要使用 mysql-connector-java 包提供的 Java API 去链接。映射到 Flink SQL 中,在 Flink SQL 中要连接到 Kafka,需要使用 kafka connectorFlink SQL 已经提供了一系列的内置 Connector,具体可见 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/

DML之With 子句

With 语句和离线 Hive SQL With 语句一样的,使用它可以让你的代码逻辑更加清晰。

-- with 子句WITH orders_with_total AS (    SELECT         order_id        , price + tax AS total    FROM Orders)SELECT     order_id    , SUM(total)FROM orders_with_totalGROUP BY     order_id;

DML之Select、Where子句

INSERT INTO target_tableSELECT PRETTY_PRINT(order_id) FROM OrdersWhere id > 3

这个 SQL 对应的实时任务,假设 Orders 为 kafka,target_table 也为 Kafka,在执行时,会生成三个算子:
数据源算子(From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 过滤和字段标准化算子
过滤和字段标准化算子(Where id > 3 和 PRETTY_PRINT(order_id)):接收到上游算子发的一条一条的数据,然后判断 id > 3?将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,
一条一条将计算结果数据发给下游 数据汇算子。
数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中

DML之SELECT DISTINCT 子句

语句和离线 Hive SQL SELECT DISTINCT 语句一样的,用作根据 key 进行数据去重

INSERT into target_tableSELECT     DISTINCT id FROM Orders

这个 SQL 对应的实时任务,假设 Orders 为 kafka,target_table 也为 Kafka,在执行时,会生成三个算子:
数据源算子(From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 去重算子
去重算子(DISTINCT id):接收到上游算子发的一条一条的数据,然后判断这个 id 之前是否已经来过了,判断方式就是使用 Flink 中的 state 状态,如果状态中已经有这个 id 了,
则说明已经来过了,不往下游算子发,如果状态中没有这个 id,则说明没来过,则往下游算子发,也是一条一条发给下游 数据汇算子
数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中

DML:窗口聚合

滚动窗口(TUMBLE)
滑动窗口(HOP)
Session 窗口(SESSION)
渐进式窗口(CUMULATE)

(1)滚动窗口(TUMBLE)

例如,指定一个大小为 5 分钟的滚动窗口。在这种情况下,Flink 将每隔 5 分钟开启一个新的窗口,其中每一条数都会划分到唯一一个 5 分钟的窗口中。

 Group Window Aggregation 滚动窗口的 SQL 语法就是把 tumble window 的声明写在了 group by 子句中,即 tumble(row_time, interval '1' minute),第一个参数为事件时间的时间戳;第二个参数为滚动窗口大小。Windowing TVF 滚动窗口的写法就是把 tumble window 的声明写在了数据源的 Table 子句中,即 TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND)),包含三部分参数。第一个参数 TABLE source_table 声明数据源表;第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;第三个参数 INTERVAL '60' SECOND 声明滚动窗口大小为 1 min。

实际案例:简单且常见的分维度分钟级别同时在线用户数、总销售额
flink13版本前的处理方式
Group Window Aggregation(1.13 之前只有此类方案,此方案在 1.13 及之后版本已经标记为废弃)

public class _02_GroupWindowAggr {    public static void main(String[] args) throws Exception {        Configuration configuration = new Configuration();        configuration.setString("rest.port","9091");        // 执行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);        env.setParallelism(1);        // 创建表环境        EnvironmentSettings settings = EnvironmentSettings                .newInstance()                .inStreamingMode()                .build();        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);        String sourceTableSql = "create  table source_table(\n" +                "  -- 维度数据\n" +                "  dim STRING,\n" +                "  -- 用户id\n" +                "  user_id BIGINT,\n" +                "  -- 销售额\n" +                "  price BIGINT,\n" +                "  -- 事件时间戳\n" +                "  row_time as cast(CURRENT_TIMESTAMP  AS timestamp(3) ),\n" +                "  -- 设置水位线,延迟时间为5秒\n" +                "  WATERMARK FOR row_time AS row_time - interval '5' SECOND\n" +                ") with (\n" +                "  'connector' = 'datagen',\n" +                "  'rows-per-second' = '10',\n" +                "  'fields.dim.length' = '1',\n" +                "  'fields.user_id.min' = '1',\n" +                "  'fields.user_id.max' = '100000',\n" +                "  'fields.price.min' = '1',\n" +                "  'fields.price.max' = '100000'\n" +                ")";        String sinkTableSql = "CREATE TABLE sink_table (\n" +                "    dim STRING,\n" +                "    pv BIGINT,\n" +                "    sum_price BIGINT,\n" +                "    max_price BIGINT,\n" +                "    min_price BIGINT,\n" +                "    uv BIGINT,\n" +                "    window_start bigint\n" +                ") WITH (\n" +                "  'connector' = 'print'\n" +                ")";        String insertTableSql = "INSERT INTO sink_table\n" +                "select\n" +                "  dim,\n" +                "  count(1) as  pv,\n" +                "  sum(price) as sum_price,\n" +                "  max(price) as max_price,\n" +                "  min(price) as min_price,\n" +                "  count(distinct user_id) as uv,\n" +                "  UNIX_TIMESTAMP(cast (tumble_start(row_time,interval '1' minute) as STRING ) ) * 1000 AS window_start\n" +                "from source_table\n" +                "group by dim,tumble(row_time,interval '1' minute )";        tEnv.executeSql(sourceTableSql);        tEnv.executeSql(sinkTableSql);        tEnv.executeSql(insertTableSql);    }}

flink13版本处理方式

public class _03_WindowTVF {    public static void main(String[] args) {        Configuration configuration = new Configuration();        configuration.setString("rest.port","9091");        // 执行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);        env.setParallelism(1);        // 创建表环境        EnvironmentSettings settings = EnvironmentSettings                .newInstance()                .inStreamingMode()                .build();        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);        String sourceTableSql = "create  table source_table(\n" +                "  -- 维度数据\n" +                "  dim STRING,\n" +                "  -- 用户id\n" +                "  user_id BIGINT,\n" +                "  -- 销售额\n" +                "  price BIGINT,\n" +                "  -- 事件时间戳\n" +                "  row_time as cast(CURRENT_TIMESTAMP  AS timestamp(3) ),\n" +                "  -- 设置水位线,延迟时间为5秒\n" +                "  WATERMARK FOR row_time AS row_time - interval '5' SECOND\n" +                ") with (\n" +                "  'connector' = 'datagen',\n" +                "  'rows-per-second' = '10',\n" +                "  'fields.dim.length' = '1',\n" +                "  'fields.user_id.min' = '1',\n" +                "  'fields.user_id.max' = '100000',\n" +                "  'fields.price.min' = '1',\n" +                "  'fields.price.max' = '100000'\n" +                ")";        String sinkTableSql = "CREATE TABLE sink_table (\n" +                "    dim STRING,\n" +                "    pv BIGINT,\n" +                "    sum_price BIGINT,\n" +                "    max_price BIGINT,\n" +                "    min_price BIGINT,\n" +                "    uv BIGINT,\n" +                "    window_start bigint\n" +                ") WITH (\n" +                "  'connector' = 'print'\n" +                ")";        String insertTableSql = "INSERT INTO sink_table\n" +                "select\n" +                "  dim,\n" +                "  count(1) as  pv,\n" +                "  sum(price) as sum_price,\n" +                "  max(price) as max_price,\n" +                "  min(price) as min_price,\n" +                "  count(distinct user_id) as uv,\n" +                "  UNIX_TIMESTAMP(cast (window_start as STRING)) * 1000  AS window_start\n" +                "from table (\n" +                "  tumble(\n" +                "  table source_table,\n" +                "  descriptor(row_time),\n" +                "  interval '60' second\n" +                "  )\n" +                ")\n" +                "group by window_start,window_end,dim";        tEnv.executeSql(sourceTableSql);        tEnv.executeSql(sinkTableSql);        tEnv.executeSql(insertTableSql);    }}

(3) 滑动窗口(HOP)

实际案例:简单且常见的分维度分钟级别同时在线用户数,1 分钟输出一次,计算最近 5 分钟的数据
Group Window Aggregation

-- 数据处理逻辑insert into sink_tableSELECT dim,    UNIX_TIMESTAMP(CAST(hop_start(row_time, interval '1' minute, interval '5' minute) AS STRING)) * 1000 as window_start,     count(distinct user_id) as uvFROM source_tableGROUP BY dim    , hop(row_time, interval '1' minute, interval '5' minute)

Group Window Aggregation 滚动窗口的写法就是把 hop window 的声明写在了 group by 子句中,即 hop(row_time, interval ‘1’ minute, interval ‘5’ minute)。其中:
第一个参数为事件时间的时间戳;第二个参数为滑动窗口的滑动步长;第三个参数为滑动窗口大小。
Windowing TVF 方案(1.13 只支持 Streaming 任务)

-- 数据处理逻辑insert into sink_tableSELECT     dim,    UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,     count(distinct user_id) as bucket_uvFROM TABLE(HOP(        TABLE source_table        , DESCRIPTOR(row_time)        , INTERVAL '1' MINUTES, INTERVAL '5' MINUTES))GROUP BY window_start,          window_end,         dim

Windowing TVF 滚动窗口的写法就是把 hop window 的声明写在了数据源的 Table 子句中,
即 TABLE(HOP(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ‘1’ MINUTES, INTERVAL ‘5’ MINUTES)),包含四部分参数:
第一个参数 TABLE source_table 声明数据源表;
第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;
第三个参数 INTERVAL ‘1’ MINUTES 声明滚动窗口滑动步长大小为 1 min;
第四个参数 INTERVAL ‘5’ MINUTES 声明滚动窗口大小为 5 min。

(3)Session 窗口(SESSION)

Session 窗口定义:Session 时间窗口和滚动、滑动窗口不一样,其没有固定的持续时间,如果在定义的间隔期(Session Gap)内没有新的数据出现,则 Session 就会窗口关闭。
实际案例:计算每个用户在活跃期间(一个 Session)总共购买的商品数量,如果用户 5 分钟没有活动则视为 Session 断开
1.13 版本中 Flink SQL 不支持 Session 窗口的 Window TVF
Group Window Aggregation 方案:

-- 数据处理逻辑insert into sink_tableSELECT    dim,   UNIX_TIMESTAMP(CAST(session_start(row_time, interval '5' minute) AS STRING)) * 1000 as window_start,    count(1) as pvFROM source_tableGROUP BY dim     , session(row_time, interval '5' minute)

上述 SQL 任务是在整个 Session 窗口结束之后才会把数据输出。
Group Window Aggregation 中 Session 窗口的写法就是把 session window 的声明写在了 group by 子句中,即 session(row_time, interval ‘5’ minute)。其中:
第一个参数为事件时间的时间戳;第二个参数为 Session gap 间隔。

(4)渐进式窗口(CUMULATE)

渐进式窗口可以认为是首先开一个最大窗口大小的滚动窗口(max window size),然后根据用户设置的触发的时间间隔(window step)将这个滚动窗口拆分为多个窗口,这些窗口具有相同的窗口起点和不同的窗口终点。
实际案例:每天的截止当前分钟的累计 money(sum(money)),去重 id 数(count(distinct id))。每天代表渐进式窗口大小为 1 天,分钟代表渐进式窗口移动步长为分钟级别。
其特点就在于,每一分钟的输出结果都是当天零点累计到当前的结果。
目前只有 Windowing TVF 方案支持:

-- 数据处理逻辑insert into sink_tableSELECT     UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,     window_start,     sum(money) as sum_money,    count(distinct id) as count_distinct_idFROM TABLE(CUMULATE(       TABLE source_table       , DESCRIPTOR(row_time)       , INTERVAL '60' SECOND       , INTERVAL '1' DAY))GROUP BY    window_start,     window_end

Windowing TVF 滚动窗口的写法就是把 cumulate window 的声明写在了数据源的 Table 子句中,即 TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ‘60’ SECOND, INTERVAL ‘1’ DAY)),
其中包含四部分参数:
第一个参数 TABLE source_table 声明数据源表;
第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;
第三个参数 INTERVAL ‘60’ SECOND 声明渐进式窗口触发的渐进步长为 1 min。
第四个参数 INTERVAL ‘1’ DAY 声明整个渐进式窗口的大小为 1 天,到了第二天新开一个窗口重新累计。

(5) Window TVF 支持 Grouping Sets、Rollup、Cube

离线 Hive SQL 使用经验的就会想到,如果有了 Grouping Sets,我们就可以直接用 Grouping Sets 将维度组合写在一条 SQL 中,写起来方便并且执行效率也高。当然,Flink 支持这个功能。
目前 Grouping Sets 只在 Window TVF 中支持,不支持 Group Window Aggregation。
实际案例,计算每日零点累计到当前这一分钟的分汇总、age、sex、age+sex 维度的用户数。

-- 用户访问明细表CREATE TABLE source_table (    age STRING,    sex STRING,    user_id BIGINT,    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND) WITH (  'connector' = 'datagen',  'rows-per-second' = '1',  'fields.age.length' = '1',  'fields.sex.length' = '1',  'fields.user_id.min' = '1',  'fields.user_id.max' = '100000');CREATE TABLE sink_table (    age STRING,    sex STRING,    uv BIGINT,    window_end bigint) WITH (  'connector' = 'print');-- 数据处理的逻辑insert into sink_tableSELECT     UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,     if (age is null, 'ALL', age) as age,    if (sex is null, 'ALL', sex) as sex,    count(distinct user_id) as bucket_uvFROM TABLE(CUMULATE(       TABLE source_table       , DESCRIPTOR(row_time)       , INTERVAL '5' SECOND       , INTERVAL '1' DAY))GROUP BY     window_start,     window_end,    -- grouping sets 写法    GROUPING SETS (        ()        , (age)        , (sex)        , (age, sex)    )

(6) DML之Group 聚合

Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照数据的类别进行分组,比如年龄、性别,是横向的;而窗口聚合是在时间粒度上对数据进行分组,是纵向的。
怎么将窗口聚合转换为 Group 聚合。假如一个窗口聚合是按照 1 分钟的粒度进行聚合,如下 SQL:

-- 数据处理逻辑insert into sink_tableselect dim,    count(*) as pv,    sum(price) as sum_price,    max(price) as max_price,    min(price) as min_price,    -- 计算 uv 数    count(distinct user_id) as uv,    UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000  as window_startfrom source_tablegroup by    dim,    -- 按照 Flink SQL tumble 窗口写法划分窗口    tumble(row_time, interval '1' minute)

转换为 Group 聚合的写法如下:

-- 数据处理逻辑insert into sink_tableselect dim,    count(*) as pv,    sum(price) as sum_price,    max(price) as max_price,    min(price) as min_price,    -- 计算 uv 数    count(distinct user_id) as uv,    cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_startfrom source_tablegroup by    dim,    -- 将秒级别时间戳 / 60 转化为 1min    cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

但是窗口聚合和 Group by 聚合的差异在于:
本质区别:窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。
而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出
运行层面:窗口聚合是和 时间 绑定的,窗口聚合其中窗口的计算结果触发都是由时间(Watermark)推动的。
Group by 聚合完全由数据推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。
sql语意:

Orders 为 kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子: 数据源算子(From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中 Group 聚合算子(group by key + sum\count\max\min):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sum\count\max\min 结果。如果有结果 oldResult,拿出来和当前的数据进行 sum\count\max\min 计算出这个 key 的新结果 newResult,并将新结果 [key, newResult] 更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息 -[key, oldResult],然后再将新结果发往下游 +[key, newResult];如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 sum\max\min 结果 newResult,并将新结果 [key, newResult] 更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送 +[key, newResult]。 数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中

Group 聚合支持 Grouping sets、Rollup、Cube
作者:undo_try

游戏编程,一个游戏开发收藏夹~

如果图片长时间未显示,请使用Chrome内核浏览器。

原网站

版权声明
本文为[游戏编程]所创,转载请带上原文链接,感谢
https://www.233tw.com/database/134116