1. Abstract
The log data we collected earlier has been saved to Kafka in , As log data ODS layer , from kafka Of ODS The log data read by the layer is divided into 3 class , Page log 、 Start log and exposure log . Although these three types of data are user behavior data , But it has a completely different data structure , So we need to split it . Write the split logs back to Kafka In different topics , As log DWD layer .
The page log is output to the main stream , The startup log is output to the startup side output stream , The exposure log is output to the exposure side output stream
2. Identify new and old users
The client business itself has the identification of new and old users , But it's not accurate enough , Need to reconfirm with real-time calculation ( Business operations are not involved , Just make a simple status confirmation ).
Data splitting is realized by using side output stream
According to the content of log data , Divide the log data into 3 class : Page log 、 Start log and exposure log . Push the data of different streams to the downstream kafka Different Topic in
3. Code implementation
In the bag app Create flink Mission BaseLogTask.java,
adopt flink consumption kafka The data of , Then record the consumption checkpoint Deposit in hdfs in , Remember to create the path manually , Then give the authority
checkpoint Optional use , You can turn it off during the test .
package com.zhangbao.gmall.realtime.app;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
/**
* @author: zhangbao
* @date: 2021/6/18 23:29
* @desc:
**/
public class BaseLogTask {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set parallelism , namely kafka Partition number
env.setParallelism(4);
// add to checkpoint, Every time 5 Once per second
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/baseLogAll"));
// Specify which user reads hdfs file
System.setProperty("HADOOP_USER_NAME","zhangbao");
// Add data sources
String topic = "ods_base_log";
String group = "base_log_app_group";
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, group);
DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
// Convert the format
SingleOutputStreamOperator<JSONObject> jsonDs = kafkaDs.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String s) throws Exception {
return JSONObject.parseObject(s);
}
});
jsonDs.print("json >>> --- ");
try {
// perform
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
MyKafkaUtil.java Tool class
package com.zhangbao.gmall.realtime.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
/**
* @author: zhangbao
* @date: 2021/6/18 23:41
* @desc:
**/
public class MyKafkaUtil {
private static String kafka_host = "hadoop101:9092,hadoop102:9092,hadoop103:9092";
/**
* kafka consumer
*/
public static FlinkKafkaConsumer<String> getKafkaSource(String topic,String group){
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafka_host);
return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(),props);
}
}
4. New and old visitor status repair
Rules for identifying new and old customers
Identify new and old visitors , The front end will record the status of new and old customers , Maybe not , Here again , preservation mid State of one day ( Save first visit date as status ), Wait until there is a log in the back equipment , Get the date from the status and compare the log generation date , If the status is not empty , And the status date is not equal to the current date , It means it's a regular visitor , If is_new Mark is 1, Then repair its state .
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author: zhangbao
* @date: 2021/6/18 23:29
* @desc:
**/
public class BaseLogTask {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set parallelism , namely kafka Partition number
env.setParallelism(4);
// add to checkpoint, Every time 5 Once per second
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/baseLogAll"));
// Specify which user reads hdfs file
System.setProperty("HADOOP_USER_NAME","zhangbao");
// Add data sources , Come here kafka The data of
String topic = "ods_base_log";
String group = "base_log_app_group";
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, group);
DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
// Convert the format
SingleOutputStreamOperator<JSONObject> jsonDs = kafkaDs.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String s) throws Exception {
return JSONObject.parseObject(s);
}
});
jsonDs.print("json >>> --- ");
/**
* Identify new and old visitors , The front end will record the status of new and old customers , Maybe not , Here again
* preservation mid State of one day ( Save first visit date as status ), Wait until there is a log in the back equipment , Get the date from the status and compare the log generation date ,
* If the status is not empty , And the status date is not equal to the current date , It means it's a regular visitor , If is_new Mark is 1, Then repair its state
*/
// according to id Group logs
KeyedStream<JSONObject, String> midKeyedDs = jsonDs.keyBy(data -> data.getJSONObject("common").getString("mid"));
// New and old visitor status repair , States are divided into operator States and keyed states , Here we record the status of a certain device , Using keyed state is more appropriate
SingleOutputStreamOperator<JSONObject> midWithNewFlagDs = midKeyedDs.map(new RichMapFunction<JSONObject, JSONObject>() {
// Definition mid state
private ValueState<String> firstVisitDateState;
// Define date formatting
private SimpleDateFormat sdf;
// Initialization method
@Override
public void open(Configuration parameters) throws Exception {
firstVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("newMidDateState", String.class));
sdf = new SimpleDateFormat("yyyyMMdd");
}
@Override
public JSONObject map(JSONObject jsonObject) throws Exception {
// Get current mid state
String is_new = jsonObject.getJSONObject("common").getString("is_new");
// Get the current log timestamp
Long ts = jsonObject.getLong("ts");
if ("1".equals(is_new)) {
// Visitor date status
String stateDate = firstVisitDateState.value();
String nowDate = sdf.format(new Date());
if (stateDate != null && stateDate.length() != 0 && !stateDate.equals(nowDate)) {
// It's an old guest
is_new = "0";
jsonObject.getJSONObject("common").put("is_new", is_new);
} else {
// New visitors
firstVisitDateState.update(nowDate);
}
}
return jsonObject;
}
});
midWithNewFlagDs.print();
try {
// perform
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
5. Data splitting is realized by using side output stream
After the above new and old customers repair , Then divide the log data into 3 class
Start log label definition :OutputTag<String> startTag = new OutputTag<String>("start"){};
And exposure log label definitions :OutputTag<String> displayTag = new OutputTag<String>("display"){};
The page log is output to the main stream , The startup log is output to the startup side output stream , The exposure log is output to the exposure log side output stream .
The data is split and sent to kafka
dwd_start_log: start log
dwd_display_log: Exposure log
dwd_page_log: Page log
package com.zhangbao.gmall.realtime.app;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author: zhangbao
* @date: 2021/6/18 23:29
* @desc:
**/
public class BaseLogTask {
private static final String TOPIC_START = "dwd_start_log";
private static final String TOPIC_DISPLAY = "dwd_display_log";
private static final String TOPIC_PAGE = "dwd_page_log";
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set parallelism , namely kafka Partition number
env.setParallelism(4);
// add to checkpoint, Every time 5 Once per second
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/baseLogAll"));
// Specify which user reads hdfs file
System.setProperty("HADOOP_USER_NAME","zhangbao");
// Add data sources , Come here kafka The data of
String topic = "ods_base_log";
String group = "base_log_app_group";
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, group);
DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
// Convert the format
SingleOutputStreamOperator<JSONObject> jsonDs = kafkaDs.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String s) throws Exception {
return JSONObject.parseObject(s);
}
});
jsonDs.print("json >>> --- ");
/**
* Identify new and old visitors , The front end will record the status of new and old customers , Maybe not , Here again
* preservation mid State of one day ( Save first visit date as status ), Wait until there is a log in the back equipment , Get the date from the status and compare the log generation date ,
* If the status is not empty , And the status date is not equal to the current date , It means it's a regular visitor , If is_new Mark is 1, Then repair its state
*/
// according to id Group logs
KeyedStream<JSONObject, String> midKeyedDs = jsonDs.keyBy(data -> data.getJSONObject("common").getString("mid"));
// New and old visitor status repair , States are divided into operator States and keyed states , Here we record the status of a certain device , Using keyed state is more appropriate
SingleOutputStreamOperator<JSONObject> midWithNewFlagDs = midKeyedDs.map(new RichMapFunction<JSONObject, JSONObject>() {
// Definition mid state
private ValueState<String> firstVisitDateState;
// Define date formatting
private SimpleDateFormat sdf;
// Initialization method
@Override
public void open(Configuration parameters) throws Exception {
firstVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("newMidDateState", String.class));
sdf = new SimpleDateFormat("yyyyMMdd");
}
@Override
public JSONObject map(JSONObject jsonObject) throws Exception {
// Get current mid state
String is_new = jsonObject.getJSONObject("common").getString("is_new");
// Get the current log timestamp
Long ts = jsonObject.getLong("ts");
if ("1".equals(is_new)) {
// Visitor date status
String stateDate = firstVisitDateState.value();
String nowDate = sdf.format(new Date());
if (stateDate != null && stateDate.length() != 0 && !stateDate.equals(nowDate)) {
// It's an old guest
is_new = "0";
jsonObject.getJSONObject("common").put("is_new", is_new);
} else {
// New visitors
firstVisitDateState.update(nowDate);
}
}
return jsonObject;
}
});
// midWithNewFlagDs.print();
/**
* According to the content of log data , Divide the log data into 3 class , Page log 、 Start log and exposure log . Page log
* Output to the mainstream , The startup log is output to the startup side output stream , The exposure log is output to the exposure log side output stream
* Side output stream :1 Receive late data ,2 shunt
*/
// Define the label of the output stream on the startup side , Open parentheses to generate the corresponding type
OutputTag<String> startTag = new OutputTag<String>("start"){};
// Define the exposure side output stream label
OutputTag<String> displayTag = new OutputTag<String>("display"){};
SingleOutputStreamOperator<String> pageDs = midWithNewFlagDs.process(
new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject jsonObject, Context context, Collector<String> collector) throws Exception {
String dataStr = jsonObject.toString();
JSONObject startJson = jsonObject.getJSONObject("start");
// Determine whether to start the log
if (startJson != null && startJson.size() > 0) {
context.output(startTag, dataStr);
} else {
// Determine whether to expose the log
JSONArray jsonArray = jsonObject.getJSONArray("displays");
if (jsonArray != null && jsonArray.size() > 0) {
// Add... To each exposure pageId
String pageId = jsonObject.getJSONObject("page").getString("page_id");
// Traverse the output exposure log
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject disPlayObj = jsonArray.getJSONObject(i);
disPlayObj.put("page_id", pageId);
context.output(displayTag, disPlayObj.toString());
}
} else {
// If it's not the exposure log , Page log , Output to the mainstream
collector.collect(dataStr);
}
}
}
}
);
// Get side output stream
DataStream<String> startDs = pageDs.getSideOutput(startTag);
DataStream<String> disPlayDs = pageDs.getSideOutput(displayTag);
// Printout
startDs.print("start>>>");
disPlayDs.print("display>>>");
pageDs.print("page>>>");
/**
* Send the log data of different streams to the specified kafka The theme
*/
startDs.addSink(MyKafkaUtil.getKafkaSink(TOPIC_START));
disPlayDs.addSink(MyKafkaUtil.getKafkaSink(TOPIC_DISPLAY));
pageDs.addSink(MyKafkaUtil.getKafkaSink(TOPIC_PAGE));
try {
// perform
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.Flink More related articles on data splitting of real-time projects
- 3.Flink Process analysis and environment construction of real-time project
1. Process analysis The log data has been (ods_base_log) And business data (ods_base_db_m) Send to kafka, As ods layer , The next thing to do is to pass flink consumption kafka Of ods data , Carry out Jane ...
- 1.Flink Real time project preparation
1. Log generation project Log generation machine :hadoop101 jar package :mock-log-0.0.1-SNAPSHOT.jar gmall_mock |----mock_common |----mock ...
- Big data project _15_ Telecom customer service analysis platform _01&02_ Project background + Project framework + Project implementation + Data production + Data collection / consumption ( Storage )
One . Project background II . Project structure III . Project implementation 3.1. Data production 3.1.1. data structure 3.1.2. Write code 3.1.3. Package test 3.2. Data collection / consumption ( Storage )3.2.1. Data collection : Collect data generated in real time to kafka Set ...
- 45.oracle Table type 、 Data splitting 、 Table partitioning
Don't do something meaningless , For example, if you want to leave your job, you don't want to go back , There is no need to state in the reasons for leaving the company “ The level of leadership is too poor , Bad character ” The reason for this , It is “ Personal reasons ”, Of course, I really don't approve resignation . oracle Table type The types of tables are divided into ...
- Sentinel: Access the console to view the monitoring data in real time
Sentinel Provide a lightweight open source console , It provides machine discovery and health management . monitor ( Stand alone and cluster ), The function of rule management and push . For example, we used to directly set the initial current limit value in the code , After being connected to the console, the current can be limited directly through the console ...
- Alibaba cloud DataWorks Officially launched Stream Studio: Provide users with big data real-time computing data center
5 month 15 Japan Alibaba cloud DataWorks Officially launched Stream Studio, Officially provide users with real-time computing power of big data , It also marks DataWorks Become offline . Data center in the field of real-time dual computing . According to introducing ,Stream St ...
- How to use it .NETCore towards Azure EventHubs Quasi real time batch sending data ?
Recently, I'm doing a project based on Azure Cloud Internet of things analysis project : .netcore Acquisition program to Azure Event center (EventHubs) send data , adopt Azure EventHubs Capture Dump to Azure Blog ...
- Use SignalR ASP.NET Core To simply implement a background real-time push data to Echarts Show the function of the chart
What is? SignalR ASP.NET Core ASP.NET Core SignalR It's an open source library , Simplify real-time web Features added to the application . real time web The function enables the server-side code to stand ...
- Flink Real time computing pv、uv Several ways to
This article was first published in :Java Big data and data warehouse ,Flink Real time computing pv.uv Several ways to Real time statistics pv.uv It's the most common big data statistics demand , There was an article before SparkStreaming Real time statistics pv,uv The case of , Here we use ...
- utilize SQl Split and combine the data in the database
utilize SQl The implementation of data splitting and combination of the database provides the following solutions : Method 1 : WITH CTE AS (SELECT A.Id,A.[Uid],UserName FROM (SELECT A.[id], RE ...
Random recommendation
- Hibernate Medium GetCurrentSession() Method
from 3.0.1 Version open beginning ,Hibernate Added SessionFactory.getCurrentSession() Method . use getCurrentSession() Created session stay commit ...
- ./configure,make,make install The role of ( turn )
These are typical uses GNU Of AUTOCONF and AUTOMAKE The installation steps of the generated program . ./configure It is used to detect the target features of your installation platform . For example, it will check whether you have CC or GCC, It's not necessary CC or GCC ...
- android And adb General solutions to startup problems
Sometimes , When we open eclipse Ready to run Android Project time , Virtual opportunity cannot be started , And the following error reports will appear . [2015-10-07 16:47:46 - Game2048] ---------------- ...
- sql Sentence feeling
select The content can be used as a table ,, For example, take an alias or something . union It's vertical , Additional records ( That's ok ) join on It's horizontal , Append column
- tyvj1728 Ordinary equilibrium tree
To thoroughly understand tree arrays , Try to make a common balanced tree with a tree array The tree array can only be done offline , Or it is also possible to ensure that the size of the value is within the range that the array can bear , Because offline is required because all numbers must be discretized in advance . Then we look at the picture on Liu Rujia's blue book utilize ...
- python Traverse dictionary elements
a={'a':{'b':{'c':{'d':'e'}},'f':'g'},'h':'i'} def show(myMap): for str in myMap.keys(): secondDict=m ...
- Linux Command learning
mkdir -p Create directory (make directorys) p Recursive create ls -l(long)d(direcitory) Show directory or file cd Toggle directory from "/" Start Directory ,/ ...
- vue2+swiper( The user action swiper after , You can't autoplay 了 )
take autoplayDisableOnInteraction Set to false
- Free test _ Soft test foundation _001< Say at the beginning _ Testing Philosophy >
- 16.ajax_case04
# Grab the golden Financial Express interface # https://www.jinse.com/lives import requests import json header = { 'Accept': 'text/ht ...