当前位置:网站首页>Flink综合案例(九)
Flink综合案例(九)
2022-07-24 13:37:00 【华为云】
Flink综合案例(九)
今日目标
- Flink FileSink 落地写入到 HDFS
- FlinkSQL 整合 Hive数据仓库
- 订单自动好评综合案例
Flink FileSink 落地写入到 HDFS
常用的文件存储格式
TextFile
csv
rcFile
parquet
orc
sequenceFile
支持流批一体的写入到 HDFS
File Sink 需求
将流数据写入到 HDFS
package cn.itcast.flink.filesink;import org.apache.flink.api.common.serialization.SimpleStringEncoder;import org.apache.flink.connector.file.sink.FileSink;import org.apache.flink.core.fs.Path;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.util.concurrent.TimeUnit;/** * Author itcast * Date 2021/6/24 10:52 * Desc TODO */public class FileSinkDemo { public static void main(String[] args) throws Exception { //1.初始化流计算运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.设置Checkpoint(10s)周期性启动 和 stateBackend 存储路径 // Sink保证仅一次语义使用 checkpoint 和 二段提交 env.enableCheckpointing(10000); env.setStateBackend(new FsStateBackend("file:///d:/chk/")); //4.接入socket数据源,获取数据 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //5.创建Streamingfilesink对象 OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("coo") .withPartSuffix(".txt") .build(); //5-1. 创建输出文件配置,指定输出路径 /FlinkStreamFileSink/parquet FileSink sink = FileSink .forRowFormat( new Path("hdfs://node1:8020/FileSink/parquet"), new SimpleStringEncoder<String>("UTF-8")) // sink-kafka new FlinkKafkaProducer //5-2.StreamingFileSink 行格式化 , withBucketAssigner->DateTimeBucketAssigner .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH--mm")) //withRollingPolicy -> 默认滚筒策略 .withRollingPolicy(DefaultRollingPolicy.builder() .withMaxPartSize(64 * 1024 * 1024) .withRolloverInterval(TimeUnit.SECONDS.toMillis(10)) .withInactivityInterval(TimeUnit.SECONDS.toMillis(5)) .build()) //withOutputFileConfig -> 输出文件的配置 .withOutputFileConfig(config) .build(); //6.设置输出 sink source.print(); source.sinkTo(sink).setParallelism(1); //source.addSink(sink).setParallelism(1); //7.执行任务 env.execute(); }}
边栏推荐
- Is it safe for Huatai Securities to open an account through channels? Is it formal
- Network security - Web information collection
- Basic operation of file
- 代码签名证书与SSL证书区别
- 游戏思考04总结:针对帧、状态、物理同步的总结(之前写的太长,现在简略下)
- Network security - function bypass injection
- 网络安全——报错注入
- 网络安全——过滤绕过注入
- hdparm
- Detailed tutorial of ettercap
猜你喜欢

Why are there "two abstract methods" in the functional interface comparator?

Hcip day 13

Aggregation measurement of robot swarm intelligence based on group entropy

爱可可AI前沿推介(7.24)

Swarm intelligence collaborative obstacle avoidance method inspired by brain attention mechanism

Thread multithreading

flow

网络安全——函数绕过注入

Inversion of array (output in reverse order) (define an array and assign a value to output the array in reverse order)

Experience sharing | how to use SaaS for enterprise knowledge management
随机推荐
DDD based on ABP -- Entity creation and update
深入浅出边缘云 | 2. 架构
网络安全——服务漏洞扫描与利用
Notes on Linear Algebra -- lesson 25 -- projection of vectors on axes
[paper reading] mean teachers are better role models
基于ABP实现DDD--实体创建和更新
网络安全——文件上传渗透测试
JQ remove an element style
[paper reading] temporary binding for semi-superior learning
Chat room project
使用Activiti创建数据库表报错,
Group intelligence decision-making in an open environment: concepts, challenges and leading technologies
Exploration of sustainable learning ability to support the application of ecological evolution of Pengcheng series open source large models
Search engine based on boost library
指针进阶部分(1)
网络安全——过滤绕过注入
position: -webkit-sticky; /* for Safari */ position: sticky;
Analysis of device restart jamming -reboot jamming
Embedded problem troubleshooting methods, network problems, SD card problems, device startup problems, serial port problems, I2C problems, SPI problems, PCIe problems, etc
WSDM 22 | 基于双曲几何的图推荐