当前位置:网站首页>Flink comprehensive case (IX)
Flink comprehensive case (IX)
2022-07-24 13:45:00 【Hua Weiyun】
Flink Comprehensive case ( Nine )
Today's goal
- Flink FileSink Landing write to HDFS
- FlinkSQL Integrate Hive Data warehouse
- Comprehensive cases of automatic praise of orders
Flink FileSink Landing write to HDFS
Common file storage formats
TextFile
csv
rcFile
parquet
orc
sequenceFile
Support the integrated writing of stream and batch to HDFS
File Sink demand
Write stream data to HDFS
package cn.itcast.flink.filesink;import org.apache.flink.api.common.serialization.SimpleStringEncoder;import org.apache.flink.connector.file.sink.FileSink;import org.apache.flink.core.fs.Path;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.util.concurrent.TimeUnit;/** * Author itcast * Date 2021/6/24 10:52 * Desc TODO */public class FileSinkDemo { public static void main(String[] args) throws Exception { //1. Initialize the stream computing runtime environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2. Set up Checkpoint(10s) Start periodically and stateBackend Storage path // Sink Ensure only one semantic use checkpoint and Second paragraph submission env.enableCheckpointing(10000); env.setStateBackend(new FsStateBackend("file:///d:/chk/")); //4. Access socket data source , get data DataStreamSource<String> source = env.socketTextStream("node1", 9999); //5. establish Streamingfilesink object OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("coo") .withPartSuffix(".txt") .build(); //5-1. Create output file configuration , Specify the output path /FlinkStreamFileSink/parquet FileSink sink = FileSink .forRowFormat( new Path("hdfs://node1:8020/FileSink/parquet"), new SimpleStringEncoder<String>("UTF-8")) // sink-kafka new FlinkKafkaProducer //5-2.StreamingFileSink Line formatting , withBucketAssigner->DateTimeBucketAssigner .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH--mm")) //withRollingPolicy -> Default drum strategy .withRollingPolicy(DefaultRollingPolicy.builder() .withMaxPartSize(64 * 1024 * 1024) .withRolloverInterval(TimeUnit.SECONDS.toMillis(10)) .withInactivityInterval(TimeUnit.SECONDS.toMillis(5)) .build()) //withOutputFileConfig -> Configuration of output file .withOutputFileConfig(config) .build(); //6. Set output sink source.print(); source.sinkTo(sink).setParallelism(1); //source.addSink(sink).setParallelism(1); //7. Perform tasks env.execute(); }}
边栏推荐
- Bayesian width learning system based on graph regularization
- Realize a JS lottery?
- 网络安全——文件上传竞争条件绕过
- Explain flex layout in detail
- R language uses the tablestack function of epidisplay package to make statistical summary tables (descriptive statistics based on the grouping of target variables, hypothesis testing, etc.), set the b
- Browser type judgment
- position: -webkit-sticky; /* for Safari */ position: sticky;
- Nessus安全测试工具使用教程
- Simple use and difference of symmetric res, AES and asymmetric RSA (JWT)
- Spelling words~
猜你喜欢
随机推荐
OWASP ZAP安全测试工具使用教程(高级)
Overview of multi view learning methods based on canonical correlation analysis
The KAP function of epidisplay package in R language calculates the value of kappa statistics (total consistency, expected consistency), analyzes the consistency of the results of multiple scoring obj
How to configure webrtc protocol for low latency playback on easycvr platform v2.5.0 and above?
游戏思考04总结:针对帧、状态、物理同步的总结(之前写的太长,现在简略下)
【无标题】rhcsa第一次作业
Why are there "two abstract methods" in the functional interface comparator?
Easycvr platform security scanning prompt go pprof debugging information leakage solution
The R language uses the DOTPLOT function of epidisplay package to visualize the frequency of data points in different intervals in the form of point graphs, uses the by parameter to specify the groupi
第六章 总线
网络安全——Cookie注入
Research and progress of traffic situation awareness based on social media data enhancement
Happy number ~ ~ ~ (in fact, I'm not happy at all) & ugly number
2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)v2
基于典型相关分析的多视图学习方法综述
Ggarrange function of R language ggpubr package combines multiple images and annotates them_ Figure add annotation, annotation, annotation information for the combined image, and add annotation inform
关于不定方程解的个数的问题
22-07-23周总结
Is it safe for Huatai Securities to open an account through channels? Is it formal
Chrome plug-in development tutorial








