当前位置:网站首页>Flink real time warehouse DWD layer (traffic domain) template code
Flink real time warehouse DWD layer (traffic domain) template code
2022-07-29 07:03:00 【Top master cultivation plan】
brief introduction
about DWD Layer development plays the role of introduction
Tool class
Time tools
public class DateFormatUtil {
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final DateTimeFormatter dtfFull = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static Long toTs(String dtStr, boolean isFull) {
LocalDateTime localDateTime = null;
if (!isFull) {
dtStr = dtStr + " 00:00:00";
}
localDateTime = LocalDateTime.parse(dtStr, dtfFull);
return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
}
public static Long toTs(String dtStr) {
return toTs(dtStr, false);
}
public static String toDate(Long ts) {
Date dt = new Date(ts);
LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
return dtf.format(localDateTime);
}
public static String toYmdHms(Long ts) {
Date dt = new Date(ts);
LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
return dtfFull.format(localDateTime);
}
public static void main(String[] args) {
System.out.println(toYmdHms(System.currentTimeMillis()));
}
}kafka Tool class
public class KafkaUtil {
static String BOOTSTRAP_SERVERS = "master:9092, node1:9092, node2:9092";
static String DEFAULT_TOPIC = "default_topic";
/**
* Get consumers according to the theme and consumer groups
* @param topic
* @param groupId
* @return
*/
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic,
// Due to the default decoder , If the string is empty, it will save , So customize one
new KafkaDeserializationSchema<String>() {
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if(record == null || record.value() == null) {
return "";
}
return new String(record.value());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}, prop);
return consumer;
}
/**
* Get the producer according to the theme
* @param topic
* @return
*/
public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60 * 15 * 1000 + "");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String jsonStr, @Nullable Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, jsonStr.getBytes());
}
}, prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
return producer;
}
}Traffic domain
Pre knowledge
Key value status
public class Status {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> initData = env.fromElements("a", "a", "b", "c");
// The state variable can be used only after the key value state is installed first
SingleOutputStreamOperator<Tuple2<String, Integer>> mapKeyData = initData.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String dataItem) throws Exception {
return Tuple2.of(dataItem, 1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> firstViewDtState = mapKeyData.keyBy(data -> data.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
ValueState<String> firstViewDtState;
@Override
public void open(Configuration param) throws Exception {
super.open(param);
firstViewDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>(
"firstViewDtState", String.class
));
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
String keyStatus = firstViewDtState.value();
if (keyStatus == null) {
// use key Save the value of
firstViewDtState.update(value.f0);
} else {
System.out.println(value + " Repeat it ");
}
}
});
firstViewDtState.print();
env.execute();
}
}result
(a,1) Repeat it Side output stream
public class OutputTagTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> initData = env.fromElements("a", "a", "b", "c");
// 1 Define side output stream
OutputTag<String> a = new OutputTag<String>("a") {
};
OutputTag<String> b = new OutputTag<String>("b") {
};
SingleOutputStreamOperator<String> processData = initData.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
if (value.equals("a")) {
// Write to side output stream a
ctx.output(a, value);
} else if (value.equals("b")) {
// Write to side output stream b
ctx.output(b, value);
} else {
// Write to the mainstream
out.collect(value);
}
}
});
// obtain a Side output stream
processData.getSideOutput(a).print("a>>");
// obtain b Side output stream
processData.getSideOutput(b).print("b>>");
// Mainstream data output
processData.print(" Main stream >>");
env.execute();
}
}result
b>>:4> b
a>>:3> a
a>>:2> a
Main stream >>:5> cLog data streaming
First of all, why do you need to divert the startup log ? Because different log types can be analyzed separately , The amount of data can be reduced during analysis
Program example of shunting implementation
public class BaseLogApp {
public static void main(String[] args) throws Exception {
// TODO 1. Initialization environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2. Enable status backend
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies
.failureRateRestart(10,
Time.of(3L, TimeUnit.DAYS),
Time.of(1L, TimeUnit.MINUTES)));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://master:8020/gmall/ck");
System.setProperty("HADOOP_USER_NAME", "bigdata");
// TODO 3. from Kafka Read mainstream data
String topic = "topic_logg";
String groupId = "base_log_consumer";
DataStreamSource<String> source = env.addSource(KafkaUtil.getKafkaConsumer(topic, groupId));
// TODO 4. Data cleaning , Transformation structure
// 4.1 Define the error side output stream
OutputTag<String> dirtyStreamTag = new OutputTag<String>("dirtyStream") {
};
SingleOutputStreamOperator<String> cleanedStream = source.process(
new ProcessFunction<String, String>() {
@Override
public void processElement(String jsonStr, Context ctx, Collector<String> out) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(jsonStr);
out.collect(jsonStr);
} catch (Exception e) {
ctx.output(dirtyStreamTag, jsonStr);
}
}
}
);
// 4.2 Write dirty data to Kafka Assign a theme
DataStream<String> dirtyStream = cleanedStream.getSideOutput(dirtyStreamTag);
String dirtyTopic = "dirty_data";
dirtyStream.addSink(KafkaUtil.getKafkaProducer(dirtyTopic));
// 4.3 Transform mainstream data structures jsonStr -> jsonObj
SingleOutputStreamOperator<JSONObject> mappedStream = cleanedStream.map(JSON::parseObject);
// TODO 5. New and old visitor status flag repair
// 5.1 according to mid Group data
KeyedStream<JSONObject, String> keyedStream = mappedStream.keyBy(r -> r.getJSONObject("common").getString("mid"));
// 5.2 New and old visitor status flag repair
SingleOutputStreamOperator<JSONObject> fixedStream = keyedStream.process(
new KeyedProcessFunction<String, JSONObject, JSONObject>() {
ValueState<String> firstViewDtState;
@Override
public void open(Configuration param) throws Exception {
super.open(param);
firstViewDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>(
"lastLoginDt", String.class
));
}
@Override
public void processElement(JSONObject jsonObj, Context ctx, Collector<JSONObject> out) throws Exception {
String isNew = jsonObj.getJSONObject("common").getString("is_new");
String firstViewDt = firstViewDtState.value();
Long ts = jsonObj.getLong("ts");
String dt = DateFormatUtil.toDate(ts);
if ("1".equals(isNew)) {
if (firstViewDt == null) {
firstViewDtState.update(dt);
} else {
if (!firstViewDt.equals(dt)) {
isNew = "0";
jsonObj.getJSONObject("common").put("is_new", isNew);
}
}
} else {
if (firstViewDt == null) {
// Set the first visit date to yesterday
String yesterday = DateFormatUtil.toDate(ts - 1000 * 60 * 60 * 24);
firstViewDtState.update(yesterday);
}
}
out.collect(jsonObj);
}
}
);
// TODO 6. shunt
// 6.1 Define startup 、 Exposure 、 action 、 Error side output stream
OutputTag<String> startTag = new OutputTag<String>("startTag") {
};
OutputTag<String> displayTag = new OutputTag<String>("displayTag") {
};
OutputTag<String> actionTag = new OutputTag<String>("actionTag") {
};
OutputTag<String> errorTag = new OutputTag<String>("errorTag") {
};
// 6.2 shunt
SingleOutputStreamOperator<String> separatedStream = fixedStream.process(
new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject jsonObj, Context context, Collector<String> out) throws Exception {
// 6.2.1 Collect error data
JSONObject error = jsonObj.getJSONObject("err");
if (error != null) {
context.output(errorTag, jsonObj.toJSONString());
}
// To eliminate "err" Field
jsonObj.remove("err");
// 6.2.2 Collect startup data
JSONObject start = jsonObj.getJSONObject("start");
if (start != null) {
context.output(startTag, jsonObj.toJSONString());
} else {
// obtain "page" Field
JSONObject page = jsonObj.getJSONObject("page");
// obtain "common" Field
JSONObject common = jsonObj.getJSONObject("common");
// obtain "ts"
Long ts = jsonObj.getLong("ts");
// 6.2.3 Collect exposure data
JSONArray displays = jsonObj.getJSONArray("displays");
if (displays != null) {
for (int i = 0; i < displays.size(); i++) {
JSONObject display = displays.getJSONObject(i);
JSONObject displayObj = new JSONObject();
displayObj.put("display", display);
displayObj.put("common", common);
displayObj.put("page", page);
displayObj.put("ts", ts);
context.output(displayTag, displayObj.toJSONString());
}
}
// 6.2.4 Collect action data
JSONArray actions = jsonObj.getJSONArray("actions");
if (actions != null) {
for (int i = 0; i < actions.size(); i++) {
JSONObject action = actions.getJSONObject(i);
JSONObject actionObj = new JSONObject();
actionObj.put("action", action);
actionObj.put("common", common);
actionObj.put("page", page);
actionObj.put("ts", ts);
context.output(actionTag, actionObj.toJSONString());
}
}
// 6.2.5 Collect page data
jsonObj.remove("displays");
jsonObj.remove("actions");
out.collect(jsonObj.toJSONString());
}
}
}
);
// Print the main stream and output streams on each side to see the shunting effect
separatedStream.print("page>>>");
separatedStream.getSideOutput(startTag).print("start!!!");
separatedStream.getSideOutput(displayTag).print("[email protected]@@");
separatedStream.getSideOutput(actionTag).print("action###");
separatedStream.getSideOutput(errorTag).print("error$$$");
// TODO 7. Output data to Kafka Different themes
// // 7.1 Extract the output stream on each side
// DataStream<String> startDS = separatedStream.getSideOutput(startTag);
// DataStream<String> displayDS = separatedStream.getSideOutput(displayTag);
// DataStream<String> actionDS = separatedStream.getSideOutput(actionTag);
// DataStream<String> errorDS = separatedStream.getSideOutput(errorTag);
//
// // 7.2 Define different log output to Kafka The topic name of
// String page_topic = "dwd_traffic_page_log";
// String start_topic = "dwd_traffic_start_log";
// String display_topic = "dwd_traffic_display_log";
// String action_topic = "dwd_traffic_action_log";
// String error_topic = "dwd_traffic_error_log";
//
// separatedStream.addSink(KafkaUtil.getKafkaProducer(page_topic));
// startDS.addSink(KafkaUtil.getKafkaProducer(start_topic));
// displayDS.addSink(KafkaUtil.getKafkaProducer(display_topic));
// actionDS.addSink(KafkaUtil.getKafkaProducer(action_topic));
// errorDS.addSink(KafkaUtil.getKafkaProducer(error_topic));
env.execute();
}
}Initial log data
{
"actions": [
{
"action_id": "get_coupon",
"item": "3",
"item_type": "coupon_id",
"ts": 1592134620882
}
],
"common": {
"ar": "110000",
"ba": "Oneplus",
"ch": "oppo",
"is_new": "0",
"md": "Oneplus 7",
"mid": "mid_232163",
"os": "Android 10.0",
"uid": "898",
"vc": "v2.1.134"
},
"displays": [
{
"display_type": "query",
"item": "18",
"item_type": "sku_id",
"order": 1,
"pos_id": 2
},
{
"display_type": "promotion",
"item": "13",
"item_type": "sku_id",
"order": 2,
"pos_id": 4
},
{
"display_type": "query",
"item": "29",
"item_type": "sku_id",
"order": 3,
"pos_id": 2
},
{
"display_type": "query",
"item": "7",
"item_type": "sku_id",
"order": 4,
"pos_id": 4
},
{
"display_type": "query",
"item": "19",
"item_type": "sku_id",
"order": 5,
"pos_id": 4
},
{
"display_type": "promotion",
"item": "22",
"item_type": "sku_id",
"order": 6,
"pos_id": 4
},
{
"display_type": "query",
"item": "25",
"item_type": "sku_id",
"order": 7,
"pos_id": 5
}
],
"page": {
"during_time": 11764,
"item": "31",
"item_type": "sku_id",
"last_page_id": "good_list",
"page_id": "good_detail",
"source_type": "query"
},
"ts": 1592134615000
}Data after diversion
Main stream ( Start log data )
{
"common": {
"ar": "310000",
"uid": "780",
"os": "Android 11.0",
"ch": "oppo",
"is_new": "1",
"md": "Huawei P30",
"mid": "mid_619118",
"vc": "v2.1.134",
"ba": "Huawei"
},
"page": {
"page_id": "payment",
"item": "33,25",
"during_time": 5439,
"item_type": "sku_ids",
"last_page_id": "trade"
},
"ts": 1592134614000
}Exposure log data
{
"common": {
"ar": "110000",
"uid": "914",
"os": "iOS 13.3.1",
"ch": "Appstore",
"is_new": "0",
"md": "iPhone Xs Max",
"mid": "mid_319090",
"vc": "v2.1.134",
"ba": "iPhone"
},
"display": {
"display_type": "query",
"item": "8",
"item_type": "sku_id",
"pos_id": 1,
"order": 7
},
"page": {
"page_id": "home",
"during_time": 4328
},
"ts": 1592134610000
}Behavior log data
{
"common": {
"ar": "230000",
"uid": "257",
"os": "Android 11.0",
"ch": "vivo",
"is_new": "0",
"md": "Xiaomi 9",
"mid": "mid_227516",
"vc": "v2.0.1",
"ba": "Xiaomi"
},
"action": {
"item": "35",
"action_id": "cart_minus_num",
"item_type": "sku_id",
"ts": 1592134612791
},
"page": {
"page_id": "cart",
"during_time": 3583,
"last_page_id": "good_detail"
},
"ts": 1592134611000
}Error log data
{
"common": {
"ar": "110000",
"uid": "780",
"os": "Android 11.0",
"ch": "huawei",
"is_new": "1",
"md": "Xiaomi 9",
"mid": "mid_503805",
"vc": "v2.1.134",
"ba": "Xiaomi"
},
"err": {
"msg": " Exception in thread \\ java.net.SocketTimeoutException\\n \\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)",
"error_code": 1245
},
"page": {
"page_id": "home",
"during_time": 17642
},
"displays": [
{
"display_type": "activity",
"item": "1",
"item_type": "activity_id",
"pos_id": 2,
"order": 1
},
{
"display_type": "query",
"item": "2",
"item_type": "sku_id",
"pos_id": 5,
"order": 2
},
{
"display_type": "query",
"item": "6",
"item_type": "sku_id",
"pos_id": 2,
"order": 3
},
{
"display_type": "query",
"item": "8",
"item_type": "sku_id",
"pos_id": 4,
"order": 4
},
{
"display_type": "query",
"item": "6",
"item_type": "sku_id",
"pos_id": 3,
"order": 5
},
{
"display_type": "promotion",
"item": "29",
"item_type": "sku_id",
"pos_id": 3,
"order": 6
}
],
"ts": 1592134611000
}Final result

Real time table of independent visitors
Prerequisite knowledge
state TTL
public class StateTTl {
public static void main(String[] args) throws Exception {
// Environmental preparation
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Initialization data
DataStreamSource<String> initData = env.fromElements("a", "a", "b", "c", "b");
SingleOutputStreamOperator<Tuple2<String,Integer>> targetData = initData.map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String,Integer> map(String value) throws Exception {
if (value.equals("a")) {
Thread.sleep(2000);
}
return Tuple2.of(value,1);
}
});
// Test status expired
SingleOutputStreamOperator<Tuple2<String, Integer>> testTTL = targetData.keyBy(data -> data.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private ValueState<String> lastVisitDt;
@Override
public void open(Configuration paramenters) throws Exception {
super.open(paramenters);
ValueStateDescriptor<String> valueStateDescriptor =
new ValueStateDescriptor<>("testTTL", String.class);
valueStateDescriptor.enableTimeToLive(
StateTtlConfig
.newBuilder(Time.seconds(1L))
// Set the update lifetime when creating and updating status
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build()
);
// about keyStatus Set up ttl
lastVisitDt = getRuntimeContext().getState(valueStateDescriptor);
}
@Override
public void processElement(Tuple2<String, Integer> value, KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
String valueState = lastVisitDt.value();
if (valueState == null) {
System.out.println(value);
lastVisitDt.update(value.f0);
} else {
System.out.println(value.f0 + " There is a corresponding state ");
}
}
});
testTTL.print();
env.execute();
}
}Output results
(a,1)
(a,1)
(b,1)
(c,1)
b There is a corresponding state The result shows that the first a Deleted after a second
Code implementation
public class DwdTrafficUniqueVisitorDetail {
public static void main(String[] args) throws Exception {
// TODO 1. Environmental preparation
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2. Status backend settings
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://master:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "bigdata");
// TODO 3. from kafka dwd_traffic_page_log Topic reads log data , Encapsulated as a stream
String topic = "dwd_traffic_page_log";
String groupId = "dwd_traffic_user_jump_detail";
FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(topic, groupId);
DataStreamSource<String> pageLog = env.addSource(kafkaConsumer);
// TODO 4. Transformation structure
SingleOutputStreamOperator<JSONObject> mappedStream = pageLog.map(JSON::parseObject);
// TODO 5. Filter last_page_id Not for null The data of
SingleOutputStreamOperator<JSONObject> firstPageStream = mappedStream.filter(
jsonObj -> jsonObj
.getJSONObject("page")
.getString("last_page_id") == null
);
// TODO 6. according to mid grouping
KeyedStream<JSONObject, String> keyedStream = firstPageStream
.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));
// TODO 7. adopt Flink Status programming filters independent visitor records
SingleOutputStreamOperator<JSONObject> filteredStream = keyedStream.filter(
new RichFilterFunction<JSONObject>() {
private ValueState<String> lastVisitDt;
@Override
public void open(Configuration paramenters) throws Exception {
super.open(paramenters);
ValueStateDescriptor<String> valueStateDescriptor =
new ValueStateDescriptor<>("last_visit_dt", String.class);
valueStateDescriptor.enableTimeToLive(
StateTtlConfig
.newBuilder(Time.days(1L))
// Set the update lifetime when creating and updating status
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build()
);
lastVisitDt = getRuntimeContext().getState(valueStateDescriptor);
}
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
String visitDt = DateFormatUtil.toDate(jsonObj.getLong("ts"));
String lastDt = lastVisitDt.value();
if (lastDt == null || !lastDt.equals(visitDt)) {
lastVisitDt.update(visitDt);
return true;
}
return false;
}
}
);
// TODO 8. Write independent guest data to
// Kafka dwd_traffic_unique_visitor_detail The theme
String targetTopic = "dwd_traffic_unique_visitor_detail";
FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(targetTopic);
filteredStream.map(JSONAware::toJSONString).addSink(kafkaProducer);
// TODO 9. Start the task
env.execute();
}
}result
{
"common": {
"ar": "310000",
"uid": "201",
"os": "Android 11.0",
"ch": "vivo",
"is_new": "1",
"md": "Xiaomi Mix2 ",
"mid": "mid_994205",
"vc": "v2.0.1",
"ba": "Xiaomi"
},
"page": {
"page_id": "home",
"during_time": 19868
},
"ts": 1592133292000
}Jump out of the transaction fact table
Prerequisite knowledge
To ensure that the data is orderly, the first thing to use is the waterline , Then use the operation of the window to make the data orderly
FlinkCEP Use
public class CEPTest {
public static void main(String[] args) throws Exception {
//TODO 1 Get the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO Note that when there are multiple partitions Because if there are multiple parallelism ,watermark Still won't be promoted , Still can't trigger the calculation
env.setParallelism(1);
//TODO 2 Get the data of the network port Source
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9998);
//TODO 3 Suppose the data we get is a 1, The first is data, the second is seconds
SingleOutputStreamOperator<Tuple2<String, Long>> mappedStream = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
return Tuple2.of(s[0], Integer.parseInt(s[1])*1000L);
}
});
//TODO 4 In order to ensure the order of the data, we have to use the water mark
SingleOutputStreamOperator<Tuple2<String, Long>> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks(
WatermarkStrategy
// Setup delay 0 second
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(
new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> initData, long recordTimestamp) {
// System.out.println(initData);
return initData.f1;
}
}
)
);
//TODO 5 After grouping, group and open a window for each group
KeyedStream<Tuple2<String, Long>, String> keyedStream = withWatermarkStream.keyBy(data -> {
return data.f0;
});
// keyedStream.print("keyedStream");
//TODO 6 It's mainly used here CEP Match the data you want , What we want here is a and b stay 10 Seconds are continuous , Otherwise I'm running out of time
Pattern<Tuple2<String, Long>, Tuple2<String, Long>> pattern = Pattern.<Tuple2<String, Long>>begin("first").where(
new SimpleCondition<Tuple2<String, Long>>() {
@Override
public boolean filter(Tuple2<String, Long> firstData) throws Exception {
return firstData.f0.equals("a");
}
}
).next("second").where(
new SimpleCondition<Tuple2<String, Long>>() {
@Override
public boolean filter(Tuple2<String, Long> secondData) throws Exception {
return secondData.f0.equals("a");
}
}
// The above call has the same name Time class , You need to use the full class name here
).within(org.apache.flink.streaming.api.windowing.time.Time.seconds(10L));
PatternStream<Tuple2<String, Long>> patternStream = CEP.pattern(keyedStream, pattern);
// Timeout side output stream
OutputTag<Tuple2<String, Long>> timeoutTag = new OutputTag<Tuple2<String, Long>>("timeoutTag") {
};
SingleOutputStreamOperator<Tuple2<String, Long>> flatSelectStream = patternStream.flatSelect(
timeoutTag,
new PatternFlatTimeoutFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
@Override
public void timeout(Map<String, List<Tuple2<String, Long>>> pattern, long timeoutTimestamp, Collector<Tuple2<String, Long>> out) throws Exception {
// Get the timeout data
Tuple2<String, Long> first = pattern.get("first").get(0);
out.collect(first);
}
},
new PatternFlatSelectFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
@Override
public void flatSelect(Map<String, List<Tuple2<String, Long>>> pattern, Collector<Tuple2<String, Long>> out) throws Exception {
// Got the matching data
Tuple2<String, Long> second = pattern.get("second").get(0);
out.collect(second);
}
}
);
DataStream<Tuple2<String, Long>> timeOutDStream = flatSelectStream.getSideOutput(timeoutTag);
// Print timeout data
timeOutDStream.print("timeOut");
// Print the normally matched data
flatSelectStream.print("flatSelectStream");
env.execute();
}
}Test data
[[email protected] createdata]$ nc -lk 9998
a 2
a 3
a 15Get the results
flatSelectStream> (a,3000)
timeOut> (a,3000)
The conclusion is a 2,a 3 It hasn't been closed yet 10 Second window , Input 15 Close the window when , because a 2,a 3 then a 3 Is in accordance with 10 The output of data in seconds , Then enter a 15, So the front one a 3 Is the timeout data
Union
public class UnionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> dataStream1 = env.fromElements(1, 2, 3);
DataStreamSource<Integer> dataStream2 = env.fromElements(4, 5, 6);
dataStream1.union(dataStream2)
.print();
env.execute();
}
}Output
2> 5
3> 6
1> 4
11> 3
9> 1
10> 2Combat code
public class DwdTrafficUserJumpDetail {
public static void main(String[] args) throws Exception {
// TODO 1. Environmental preparation
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2. Status backend settings
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://master:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "bigdata");
// TODO 3. from kafka dwd_traffic_page_log Topic reads log data , Encapsulated as a stream
String topic = "dwd_traffic_page_log";
String groupId = "dwd_traffic_user_jump_detail";
FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(topic, groupId);
DataStreamSource<String> pageLog = env.addSource(kafkaConsumer);
// Test data
/*DataStream<String> kafkaDS = env
.fromElements(
"{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
"{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",
"{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
"\"home\"},\"ts\":15000} ",
"{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
"\"detail\"},\"ts\":30000} "
);*/
// TODO 4. Transformation structure
SingleOutputStreamOperator<JSONObject> mappedStream = pageLog.map(JSON::parseObject);
// TODO 5. Set the water line , For users to jump out of Statistics
SingleOutputStreamOperator<JSONObject> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObj, long recordTimestamp) {
return jsonObj.getLong("ts");
}
}
)
);
// TODO 6. according to mid grouping
KeyedStream<JSONObject, String> keyedStream = withWatermarkStream.keyBy(jsonOjb -> jsonOjb.getJSONObject("common").getString("mid"));
// TODO 7. Definition CEP Matching rules
Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("first").where(
new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
return lastPageId == null;
}
}
).next("second").where(
new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
return lastPageId == null;
}
}
// The above call has the same name Time class , You need to use the full class name here
).within(org.apache.flink.streaming.api.windowing.time.Time.seconds(10L));
// TODO 8. hold Pattern Apply to stream
PatternStream<JSONObject> patternStream = CEP.pattern(keyedStream, pattern);
// TODO 9. Extract events on the match and timeout events
OutputTag<JSONObject> timeoutTag = new OutputTag<JSONObject>("timeoutTag") {
};
SingleOutputStreamOperator<JSONObject> flatSelectStream = patternStream.flatSelect(
timeoutTag,
new PatternFlatTimeoutFunction<JSONObject, JSONObject>() {
@Override
public void timeout(Map<String, List<JSONObject>> pattern, long timeoutTimestamp, Collector<JSONObject> out) throws Exception {
JSONObject element = pattern.get("first").get(0);
out.collect(element);
}
},
new PatternFlatSelectFunction<JSONObject, JSONObject>() {
@Override
public void flatSelect(Map<String, List<JSONObject>> pattern, Collector<JSONObject> out) throws Exception {
JSONObject element = pattern.get("first").get(0);
out.collect(element);
}
}
);
DataStream<JSONObject> timeOutDStream = flatSelectStream.getSideOutput(timeoutTag);
// TODO 11. Merge the two streams and write the data out to Kafka
DataStream<JSONObject> unionDStream = flatSelectStream.union(timeOutDStream);
String targetTopic = "dwd_traffic_user_jump_detail";
FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(targetTopic);
unionDStream .map(JSONAware::toJSONString)
.addSink(kafkaProducer);
env.execute();
}
}边栏推荐
- Teacher Wu Enda's machine learning course notes 04 multiple linear regression
- Etcd principle
- 记 - 踩坑-实时数仓开发 - doris/pg/flink
- Share some tips for better code, smooth coding and improve efficiency
- Teacher wangshuyao's notes on operations research 01 guidance and introduction
- MySQL: what happens in the bufferpool when you crud? Ten pictures can make it clear
- 吴恩达老师机器学习课程笔记 00 写在前面
- IO流 - File - properties
- The difference between pairs and ipairs
- Unity免费元素特效推荐
猜你喜欢

Unity exploration plot access design analysis & process + code specific implementation

MySql基础知识(高频面试题)

MySQL:当你CRUD时BufferPool中发生了什么?十张图就能说清楚

实战!聊聊如何解决MySQL深分页问题

【论文阅读 | 冷冻电镜】RELION 4.0 中新的 subtomogram averaging 方法解读

分享一些你代码更好的小建议,流畅编码提搞效率

Etcd principle

Thread synchronization - producers and consumers, tortoise and rabbit race, dual thread printing

Share some tips for better code, smooth coding and improve efficiency

How to write controller layer code gracefully?
随机推荐
吴恩达老师机器学习课程笔记 01 引言
IDEA中实现Mapper接口到映射文件xml的跳转
数据库使用psql及jdbc进行远程连接,不定时自动断开的解决办法
Difference between CNAME record and a record
leetcode-1331:数组序号转换
说一下 TCP/IP 协议?以及每层的作用?
HJ37 统计每个月兔子的总数 斐波那契数列
Flink实时仓库-DWD层(下单-多张表实现join操作)模板代码
IDEA找不到Database解决方法
The difference between pairs and ipairs
MySQL:当你CRUD时BufferPool中发生了什么?十张图就能说清楚
【冷冻电镜】Relion4.0——subtomogram教程
模拟卷Leetcode【普通】061. 旋转链表
mysql查询区分大小写
10道面试常问JVM题
Summary of 2022 SQL classic interview questions (with analysis)
Not so simple singleton mode
Teacher Wu Enda machine learning course notes 05 octave tutorial
SSH免密登录-两台虚拟机建立免密通道 双向信任
ECCV 2022丨轻量级模型架ParC-Net 力压苹果MobileViT代码和论文下载