当前位置:网站首页>Flinksql 读写pgsql

Flinksql 读写pgsql

2022-07-06 23:51:00 zs_bigdata

一. 代码

 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode());

        AbstractJdbcCatalog catalog = JdbcCatalogUtils.createCatalog(
                "myPgsql",
                "staging",
                "username",
                "password",
                "jdbc:postgresql://xxx:port"
        );

        tableEnv.registerCatalog("myPgsql", catalog);
        tableEnv.useCatalog("myPgsql");
        tableEnv.useDatabase("staging");
        boolean staging = catalog.tableExists(new ObjectPath("staging", "medical.hospital_department"));
        System.out.println(staging);
        System.out.println(Arrays.toString(tableEnv.listCatalogs()));
        System.out.println(Arrays.toString(tableEnv.listDatabases()));
        //System.out.println(Arrays.toString(tableEnv.listTables()));
        tableEnv.executeSql("select id,name from `medical.hospital_department`").print();

//sql api
tableEnv.executeSql(
                "CREATE TABLE hospital_position (\n" +
                        " id string,\n" +
                        " department_id string,\n" +
                        " waypoint_id STRING,\n" +
                        " zone_id STRING,\n" +
                        " PRIMARY KEY (id) NOT ENFORCED\n" +
                        ") WITH (\n" +
                        " 'connector' = 'jdbc',\n" +
                        " 'url' = 'jdbc:postgresql://xxxxx:port/数据库',\n" +
                        " 'username' = 'xx', " +
                        " 'password' = 'xxxx', " +
                        " 'table-name' = 'schema名.表名'\n" +
                        ")"
        );

2 踩坑

如果pgsql使用了schema,name此时表名需要``转义,否则会报找不到medical object

// `medical.hospital_department` 一定要转义
        tableEnv.executeSql("select id,name from `medical.hospital_department`").print();

原网站

版权声明
本文为[zs_bigdata]所创,转载请带上原文链接,感谢
https://blog.csdn.net/m0_50670853/article/details/125490309