当前位置:网站首页>Flink实时仓库-DWD层(流量域)模板代码
Flink实时仓库-DWD层(流量域)模板代码
2022-07-29 05:54:00 【顶尖高手养成计划】
简介
对于DWD层开发起到简介的作用
工具类
时间工具类
public class DateFormatUtil {
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final DateTimeFormatter dtfFull = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static Long toTs(String dtStr, boolean isFull) {
LocalDateTime localDateTime = null;
if (!isFull) {
dtStr = dtStr + " 00:00:00";
}
localDateTime = LocalDateTime.parse(dtStr, dtfFull);
return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
}
public static Long toTs(String dtStr) {
return toTs(dtStr, false);
}
public static String toDate(Long ts) {
Date dt = new Date(ts);
LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
return dtf.format(localDateTime);
}
public static String toYmdHms(Long ts) {
Date dt = new Date(ts);
LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
return dtfFull.format(localDateTime);
}
public static void main(String[] args) {
System.out.println(toYmdHms(System.currentTimeMillis()));
}
}kafka工具类
public class KafkaUtil {
static String BOOTSTRAP_SERVERS = "master:9092, node1:9092, node2:9092";
static String DEFAULT_TOPIC = "default_topic";
/**
* 根据主题还有消费者组得到消费者
* @param topic
* @param groupId
* @return
*/
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic,
//由于默认的解码器,如果字符串为空的时候他会保存,所以自定义一个
new KafkaDeserializationSchema<String>() {
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if(record == null || record.value() == null) {
return "";
}
return new String(record.value());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}, prop);
return consumer;
}
/**
* 根据主题得到生产者
* @param topic
* @return
*/
public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60 * 15 * 1000 + "");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String jsonStr, @Nullable Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, jsonStr.getBytes());
}
}, prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
return producer;
}
}流量域
前置知识
键值状态
public class Status {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> initData = env.fromElements("a", "a", "b", "c");
//先装换成键值状态才能够使用状态变量
SingleOutputStreamOperator<Tuple2<String, Integer>> mapKeyData = initData.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String dataItem) throws Exception {
return Tuple2.of(dataItem, 1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> firstViewDtState = mapKeyData.keyBy(data -> data.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
ValueState<String> firstViewDtState;
@Override
public void open(Configuration param) throws Exception {
super.open(param);
firstViewDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>(
"firstViewDtState", String.class
));
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
String keyStatus = firstViewDtState.value();
if (keyStatus == null) {
//用key的值保存进去
firstViewDtState.update(value.f0);
} else {
System.out.println(value + " 重复来了");
}
}
});
firstViewDtState.print();
env.execute();
}
}结果
(a,1) 重复来了侧输出流
public class OutputTagTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> initData = env.fromElements("a", "a", "b", "c");
// 1 定义侧输出流
OutputTag<String> a = new OutputTag<String>("a") {
};
OutputTag<String> b = new OutputTag<String>("b") {
};
SingleOutputStreamOperator<String> processData = initData.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
if (value.equals("a")) {
//写到侧输出流a
ctx.output(a, value);
} else if (value.equals("b")) {
//写到侧输出流b
ctx.output(b, value);
} else {
//写出到主流
out.collect(value);
}
}
});
//得到a的侧输出流
processData.getSideOutput(a).print("a>>");
//得到b的侧输出流
processData.getSideOutput(b).print("b>>");
//主流数据输出
processData.print("主流>>");
env.execute();
}
}结果
b>>:4> b
a>>:3> a
a>>:2> a
主流>>:5> c日志数据分流
首先为什么要对于启动日志进行分流?因为可以对于不同的日志类型分别分析,在分析的时候可以减少数据量
分流实现的程序示例
public class BaseLogApp {
public static void main(String[] args) throws Exception {
// TODO 1. 初始化环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2. 启用状态后端
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies
.failureRateRestart(10,
Time.of(3L, TimeUnit.DAYS),
Time.of(1L, TimeUnit.MINUTES)));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://master:8020/gmall/ck");
System.setProperty("HADOOP_USER_NAME", "bigdata");
// TODO 3. 从 Kafka 读取主流数据
String topic = "topic_logg";
String groupId = "base_log_consumer";
DataStreamSource<String> source = env.addSource(KafkaUtil.getKafkaConsumer(topic, groupId));
// TODO 4. 数据清洗,转换结构
// 4.1 定义错误侧输出流
OutputTag<String> dirtyStreamTag = new OutputTag<String>("dirtyStream") {
};
SingleOutputStreamOperator<String> cleanedStream = source.process(
new ProcessFunction<String, String>() {
@Override
public void processElement(String jsonStr, Context ctx, Collector<String> out) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(jsonStr);
out.collect(jsonStr);
} catch (Exception e) {
ctx.output(dirtyStreamTag, jsonStr);
}
}
}
);
// 4.2 将脏数据写出到 Kafka 指定主题
DataStream<String> dirtyStream = cleanedStream.getSideOutput(dirtyStreamTag);
String dirtyTopic = "dirty_data";
dirtyStream.addSink(KafkaUtil.getKafkaProducer(dirtyTopic));
// 4.3 转换主流数据结构 jsonStr -> jsonObj
SingleOutputStreamOperator<JSONObject> mappedStream = cleanedStream.map(JSON::parseObject);
// TODO 5. 新老访客状态标记修复
// 5.1 按照 mid 对数据进行分组
KeyedStream<JSONObject, String> keyedStream = mappedStream.keyBy(r -> r.getJSONObject("common").getString("mid"));
// 5.2 新老访客状态标记修复
SingleOutputStreamOperator<JSONObject> fixedStream = keyedStream.process(
new KeyedProcessFunction<String, JSONObject, JSONObject>() {
ValueState<String> firstViewDtState;
@Override
public void open(Configuration param) throws Exception {
super.open(param);
firstViewDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>(
"lastLoginDt", String.class
));
}
@Override
public void processElement(JSONObject jsonObj, Context ctx, Collector<JSONObject> out) throws Exception {
String isNew = jsonObj.getJSONObject("common").getString("is_new");
String firstViewDt = firstViewDtState.value();
Long ts = jsonObj.getLong("ts");
String dt = DateFormatUtil.toDate(ts);
if ("1".equals(isNew)) {
if (firstViewDt == null) {
firstViewDtState.update(dt);
} else {
if (!firstViewDt.equals(dt)) {
isNew = "0";
jsonObj.getJSONObject("common").put("is_new", isNew);
}
}
} else {
if (firstViewDt == null) {
// 将首次访问日期置为昨日
String yesterday = DateFormatUtil.toDate(ts - 1000 * 60 * 60 * 24);
firstViewDtState.update(yesterday);
}
}
out.collect(jsonObj);
}
}
);
// TODO 6. 分流
// 6.1 定义启动、曝光、动作、错误侧输出流
OutputTag<String> startTag = new OutputTag<String>("startTag") {
};
OutputTag<String> displayTag = new OutputTag<String>("displayTag") {
};
OutputTag<String> actionTag = new OutputTag<String>("actionTag") {
};
OutputTag<String> errorTag = new OutputTag<String>("errorTag") {
};
// 6.2 分流
SingleOutputStreamOperator<String> separatedStream = fixedStream.process(
new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject jsonObj, Context context, Collector<String> out) throws Exception {
// 6.2.1 收集错误数据
JSONObject error = jsonObj.getJSONObject("err");
if (error != null) {
context.output(errorTag, jsonObj.toJSONString());
}
// 剔除 "err" 字段
jsonObj.remove("err");
// 6.2.2 收集启动数据
JSONObject start = jsonObj.getJSONObject("start");
if (start != null) {
context.output(startTag, jsonObj.toJSONString());
} else {
// 获取 "page" 字段
JSONObject page = jsonObj.getJSONObject("page");
// 获取 "common" 字段
JSONObject common = jsonObj.getJSONObject("common");
// 获取 "ts"
Long ts = jsonObj.getLong("ts");
// 6.2.3 收集曝光数据
JSONArray displays = jsonObj.getJSONArray("displays");
if (displays != null) {
for (int i = 0; i < displays.size(); i++) {
JSONObject display = displays.getJSONObject(i);
JSONObject displayObj = new JSONObject();
displayObj.put("display", display);
displayObj.put("common", common);
displayObj.put("page", page);
displayObj.put("ts", ts);
context.output(displayTag, displayObj.toJSONString());
}
}
// 6.2.4 收集动作数据
JSONArray actions = jsonObj.getJSONArray("actions");
if (actions != null) {
for (int i = 0; i < actions.size(); i++) {
JSONObject action = actions.getJSONObject(i);
JSONObject actionObj = new JSONObject();
actionObj.put("action", action);
actionObj.put("common", common);
actionObj.put("page", page);
actionObj.put("ts", ts);
context.output(actionTag, actionObj.toJSONString());
}
}
// 6.2.5 收集页面数据
jsonObj.remove("displays");
jsonObj.remove("actions");
out.collect(jsonObj.toJSONString());
}
}
}
);
// 打印主流和各侧输出流查看分流效果
separatedStream.print("page>>>");
separatedStream.getSideOutput(startTag).print("start!!!");
separatedStream.getSideOutput(displayTag).print("[email protected]@@");
separatedStream.getSideOutput(actionTag).print("action###");
separatedStream.getSideOutput(errorTag).print("error$$$");
// TODO 7. 将数据输出到 Kafka 的不同主题
// // 7.1 提取各侧输出流
// DataStream<String> startDS = separatedStream.getSideOutput(startTag);
// DataStream<String> displayDS = separatedStream.getSideOutput(displayTag);
// DataStream<String> actionDS = separatedStream.getSideOutput(actionTag);
// DataStream<String> errorDS = separatedStream.getSideOutput(errorTag);
//
// // 7.2 定义不同日志输出到 Kafka 的主题名称
// String page_topic = "dwd_traffic_page_log";
// String start_topic = "dwd_traffic_start_log";
// String display_topic = "dwd_traffic_display_log";
// String action_topic = "dwd_traffic_action_log";
// String error_topic = "dwd_traffic_error_log";
//
// separatedStream.addSink(KafkaUtil.getKafkaProducer(page_topic));
// startDS.addSink(KafkaUtil.getKafkaProducer(start_topic));
// displayDS.addSink(KafkaUtil.getKafkaProducer(display_topic));
// actionDS.addSink(KafkaUtil.getKafkaProducer(action_topic));
// errorDS.addSink(KafkaUtil.getKafkaProducer(error_topic));
env.execute();
}
}初始日志数据
{
"actions": [
{
"action_id": "get_coupon",
"item": "3",
"item_type": "coupon_id",
"ts": 1592134620882
}
],
"common": {
"ar": "110000",
"ba": "Oneplus",
"ch": "oppo",
"is_new": "0",
"md": "Oneplus 7",
"mid": "mid_232163",
"os": "Android 10.0",
"uid": "898",
"vc": "v2.1.134"
},
"displays": [
{
"display_type": "query",
"item": "18",
"item_type": "sku_id",
"order": 1,
"pos_id": 2
},
{
"display_type": "promotion",
"item": "13",
"item_type": "sku_id",
"order": 2,
"pos_id": 4
},
{
"display_type": "query",
"item": "29",
"item_type": "sku_id",
"order": 3,
"pos_id": 2
},
{
"display_type": "query",
"item": "7",
"item_type": "sku_id",
"order": 4,
"pos_id": 4
},
{
"display_type": "query",
"item": "19",
"item_type": "sku_id",
"order": 5,
"pos_id": 4
},
{
"display_type": "promotion",
"item": "22",
"item_type": "sku_id",
"order": 6,
"pos_id": 4
},
{
"display_type": "query",
"item": "25",
"item_type": "sku_id",
"order": 7,
"pos_id": 5
}
],
"page": {
"during_time": 11764,
"item": "31",
"item_type": "sku_id",
"last_page_id": "good_list",
"page_id": "good_detail",
"source_type": "query"
},
"ts": 1592134615000
}分流后数据
主流(启动日志数据)
{
"common": {
"ar": "310000",
"uid": "780",
"os": "Android 11.0",
"ch": "oppo",
"is_new": "1",
"md": "Huawei P30",
"mid": "mid_619118",
"vc": "v2.1.134",
"ba": "Huawei"
},
"page": {
"page_id": "payment",
"item": "33,25",
"during_time": 5439,
"item_type": "sku_ids",
"last_page_id": "trade"
},
"ts": 1592134614000
}曝光日志数据
{
"common": {
"ar": "110000",
"uid": "914",
"os": "iOS 13.3.1",
"ch": "Appstore",
"is_new": "0",
"md": "iPhone Xs Max",
"mid": "mid_319090",
"vc": "v2.1.134",
"ba": "iPhone"
},
"display": {
"display_type": "query",
"item": "8",
"item_type": "sku_id",
"pos_id": 1,
"order": 7
},
"page": {
"page_id": "home",
"during_time": 4328
},
"ts": 1592134610000
}行为日志数据
{
"common": {
"ar": "230000",
"uid": "257",
"os": "Android 11.0",
"ch": "vivo",
"is_new": "0",
"md": "Xiaomi 9",
"mid": "mid_227516",
"vc": "v2.0.1",
"ba": "Xiaomi"
},
"action": {
"item": "35",
"action_id": "cart_minus_num",
"item_type": "sku_id",
"ts": 1592134612791
},
"page": {
"page_id": "cart",
"during_time": 3583,
"last_page_id": "good_detail"
},
"ts": 1592134611000
}错误日志数据
{
"common": {
"ar": "110000",
"uid": "780",
"os": "Android 11.0",
"ch": "huawei",
"is_new": "1",
"md": "Xiaomi 9",
"mid": "mid_503805",
"vc": "v2.1.134",
"ba": "Xiaomi"
},
"err": {
"msg": " Exception in thread \\ java.net.SocketTimeoutException\\n \\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)",
"error_code": 1245
},
"page": {
"page_id": "home",
"during_time": 17642
},
"displays": [
{
"display_type": "activity",
"item": "1",
"item_type": "activity_id",
"pos_id": 2,
"order": 1
},
{
"display_type": "query",
"item": "2",
"item_type": "sku_id",
"pos_id": 5,
"order": 2
},
{
"display_type": "query",
"item": "6",
"item_type": "sku_id",
"pos_id": 2,
"order": 3
},
{
"display_type": "query",
"item": "8",
"item_type": "sku_id",
"pos_id": 4,
"order": 4
},
{
"display_type": "query",
"item": "6",
"item_type": "sku_id",
"pos_id": 3,
"order": 5
},
{
"display_type": "promotion",
"item": "29",
"item_type": "sku_id",
"pos_id": 3,
"order": 6
}
],
"ts": 1592134611000
}最终的结果

独立访客实时表
前提知识
状态TTL
public class StateTTl {
public static void main(String[] args) throws Exception {
// 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//初始化数据
DataStreamSource<String> initData = env.fromElements("a", "a", "b", "c", "b");
SingleOutputStreamOperator<Tuple2<String,Integer>> targetData = initData.map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String,Integer> map(String value) throws Exception {
if (value.equals("a")) {
Thread.sleep(2000);
}
return Tuple2.of(value,1);
}
});
//测试状态过期
SingleOutputStreamOperator<Tuple2<String, Integer>> testTTL = targetData.keyBy(data -> data.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private ValueState<String> lastVisitDt;
@Override
public void open(Configuration paramenters) throws Exception {
super.open(paramenters);
ValueStateDescriptor<String> valueStateDescriptor =
new ValueStateDescriptor<>("testTTL", String.class);
valueStateDescriptor.enableTimeToLive(
StateTtlConfig
.newBuilder(Time.seconds(1L))
// 设置在创建和更新状态时更新存活时间
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build()
);
//对于keyStatus设置ttl
lastVisitDt = getRuntimeContext().getState(valueStateDescriptor);
}
@Override
public void processElement(Tuple2<String, Integer> value, KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
String valueState = lastVisitDt.value();
if (valueState == null) {
System.out.println(value);
lastVisitDt.update(value.f0);
} else {
System.out.println(value.f0 + " 有对应的状态了");
}
}
});
testTTL.print();
env.execute();
}
}输出结果
(a,1)
(a,1)
(b,1)
(c,1)
b 有对应的状态了结果可以看出第一个a经过一秒以后删除了
代码实现
public class DwdTrafficUniqueVisitorDetail {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://master:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "bigdata");
// TODO 3. 从 kafka dwd_traffic_page_log 主题读取日志数据,封装为流
String topic = "dwd_traffic_page_log";
String groupId = "dwd_traffic_user_jump_detail";
FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(topic, groupId);
DataStreamSource<String> pageLog = env.addSource(kafkaConsumer);
// TODO 4. 转换结构
SingleOutputStreamOperator<JSONObject> mappedStream = pageLog.map(JSON::parseObject);
// TODO 5. 过滤 last_page_id 不为 null 的数据
SingleOutputStreamOperator<JSONObject> firstPageStream = mappedStream.filter(
jsonObj -> jsonObj
.getJSONObject("page")
.getString("last_page_id") == null
);
// TODO 6. 按照 mid 分组
KeyedStream<JSONObject, String> keyedStream = firstPageStream
.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));
// TODO 7. 通过 Flink 状态编程过滤独立访客记录
SingleOutputStreamOperator<JSONObject> filteredStream = keyedStream.filter(
new RichFilterFunction<JSONObject>() {
private ValueState<String> lastVisitDt;
@Override
public void open(Configuration paramenters) throws Exception {
super.open(paramenters);
ValueStateDescriptor<String> valueStateDescriptor =
new ValueStateDescriptor<>("last_visit_dt", String.class);
valueStateDescriptor.enableTimeToLive(
StateTtlConfig
.newBuilder(Time.days(1L))
// 设置在创建和更新状态时更新存活时间
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build()
);
lastVisitDt = getRuntimeContext().getState(valueStateDescriptor);
}
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
String visitDt = DateFormatUtil.toDate(jsonObj.getLong("ts"));
String lastDt = lastVisitDt.value();
if (lastDt == null || !lastDt.equals(visitDt)) {
lastVisitDt.update(visitDt);
return true;
}
return false;
}
}
);
// TODO 8. 将独立访客数据写入
// Kafka dwd_traffic_unique_visitor_detail 主题
String targetTopic = "dwd_traffic_unique_visitor_detail";
FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(targetTopic);
filteredStream.map(JSONAware::toJSONString).addSink(kafkaProducer);
// TODO 9. 启动任务
env.execute();
}
}结果
{
"common": {
"ar": "310000",
"uid": "201",
"os": "Android 11.0",
"ch": "vivo",
"is_new": "1",
"md": "Xiaomi Mix2 ",
"mid": "mid_994205",
"vc": "v2.0.1",
"ba": "Xiaomi"
},
"page": {
"page_id": "home",
"during_time": 19868
},
"ts": 1592133292000
}跳出事务事实表
前提知识
要保证数据是有序的首先使用的是水位线,然后使用窗口的操作才能使数据有序
FlinkCEP的使用
public class CEPTest {
public static void main(String[] args) throws Exception {
//TODO 1得到执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO 注意在多个分区的时候 因为并行度多个的话 ,watermark还是不会被提升的,还是触发不了计算
env.setParallelism(1);
//TODO 2得到网络端口的数据 源
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9998);
//TODO 3假设我们得到的数据是a 1,第一个是数据第二个是秒
SingleOutputStreamOperator<Tuple2<String, Long>> mappedStream = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
return Tuple2.of(s[0], Integer.parseInt(s[1])*1000L);
}
});
//TODO 4为了保证数据的有序就得使用水位线
SingleOutputStreamOperator<Tuple2<String, Long>> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks(
WatermarkStrategy
//设置延迟0秒
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(
new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> initData, long recordTimestamp) {
// System.out.println(initData);
return initData.f1;
}
}
)
);
//TODO 5 分组以后对于每一个组进行分组开窗
KeyedStream<Tuple2<String, Long>, String> keyedStream = withWatermarkStream.keyBy(data -> {
return data.f0;
});
// keyedStream.print("keyedStream");
//TODO 6这里主要是使用CEP匹配到自己想要的数据,我们这里想要的是a 和 b在10秒内是连续的,否则就是超时了
Pattern<Tuple2<String, Long>, Tuple2<String, Long>> pattern = Pattern.<Tuple2<String, Long>>begin("first").where(
new SimpleCondition<Tuple2<String, Long>>() {
@Override
public boolean filter(Tuple2<String, Long> firstData) throws Exception {
return firstData.f0.equals("a");
}
}
).next("second").where(
new SimpleCondition<Tuple2<String, Long>>() {
@Override
public boolean filter(Tuple2<String, Long> secondData) throws Exception {
return secondData.f0.equals("a");
}
}
// 上文调用了同名 Time 类,此处需要使用全类名
).within(org.apache.flink.streaming.api.windowing.time.Time.seconds(10L));
PatternStream<Tuple2<String, Long>> patternStream = CEP.pattern(keyedStream, pattern);
//超时侧输出流
OutputTag<Tuple2<String, Long>> timeoutTag = new OutputTag<Tuple2<String, Long>>("timeoutTag") {
};
SingleOutputStreamOperator<Tuple2<String, Long>> flatSelectStream = patternStream.flatSelect(
timeoutTag,
new PatternFlatTimeoutFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
@Override
public void timeout(Map<String, List<Tuple2<String, Long>>> pattern, long timeoutTimestamp, Collector<Tuple2<String, Long>> out) throws Exception {
//得到超时的数据
Tuple2<String, Long> first = pattern.get("first").get(0);
out.collect(first);
}
},
new PatternFlatSelectFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
@Override
public void flatSelect(Map<String, List<Tuple2<String, Long>>> pattern, Collector<Tuple2<String, Long>> out) throws Exception {
//得到了匹配到的数据
Tuple2<String, Long> second = pattern.get("second").get(0);
out.collect(second);
}
}
);
DataStream<Tuple2<String, Long>> timeOutDStream = flatSelectStream.getSideOutput(timeoutTag);
//打印超时的数据
timeOutDStream.print("timeOut");
//打印正常匹配到的数据
flatSelectStream.print("flatSelectStream");
env.execute();
}
}测试数据
[[email protected] createdata]$ nc -lk 9998
a 2
a 3
a 15得到结果
flatSelectStream> (a,3000)
timeOut> (a,3000)
结论在a 2,a 3的时候还没有关闭10秒的窗口,输入15的时候关闭窗口,由于a 2,a 3然后a 3是符合10秒内的数据的所以输出,然后是输入a 15,那么前面的a 3就是超时的数据
Union
public class UnionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> dataStream1 = env.fromElements(1, 2, 3);
DataStreamSource<Integer> dataStream2 = env.fromElements(4, 5, 6);
dataStream1.union(dataStream2)
.print();
env.execute();
}
}输出
2> 5
3> 6
1> 4
11> 3
9> 1
10> 2实战代码
public class DwdTrafficUserJumpDetail {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://master:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "bigdata");
// TODO 3. 从 kafka dwd_traffic_page_log 主题读取日志数据,封装为流
String topic = "dwd_traffic_page_log";
String groupId = "dwd_traffic_user_jump_detail";
FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(topic, groupId);
DataStreamSource<String> pageLog = env.addSource(kafkaConsumer);
// 测试数据
/*DataStream<String> kafkaDS = env
.fromElements(
"{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
"{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",
"{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
"\"home\"},\"ts\":15000} ",
"{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
"\"detail\"},\"ts\":30000} "
);*/
// TODO 4. 转换结构
SingleOutputStreamOperator<JSONObject> mappedStream = pageLog.map(JSON::parseObject);
// TODO 5. 设置水位线,用于用户跳出统计
SingleOutputStreamOperator<JSONObject> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObj, long recordTimestamp) {
return jsonObj.getLong("ts");
}
}
)
);
// TODO 6. 按照 mid 分组
KeyedStream<JSONObject, String> keyedStream = withWatermarkStream.keyBy(jsonOjb -> jsonOjb.getJSONObject("common").getString("mid"));
// TODO 7. 定义 CEP 匹配规则
Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("first").where(
new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
return lastPageId == null;
}
}
).next("second").where(
new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
return lastPageId == null;
}
}
// 上文调用了同名 Time 类,此处需要使用全类名
).within(org.apache.flink.streaming.api.windowing.time.Time.seconds(10L));
// TODO 8. 把 Pattern 应用到流上
PatternStream<JSONObject> patternStream = CEP.pattern(keyedStream, pattern);
// TODO 9. 提取匹配上的事件以及超时事件
OutputTag<JSONObject> timeoutTag = new OutputTag<JSONObject>("timeoutTag") {
};
SingleOutputStreamOperator<JSONObject> flatSelectStream = patternStream.flatSelect(
timeoutTag,
new PatternFlatTimeoutFunction<JSONObject, JSONObject>() {
@Override
public void timeout(Map<String, List<JSONObject>> pattern, long timeoutTimestamp, Collector<JSONObject> out) throws Exception {
JSONObject element = pattern.get("first").get(0);
out.collect(element);
}
},
new PatternFlatSelectFunction<JSONObject, JSONObject>() {
@Override
public void flatSelect(Map<String, List<JSONObject>> pattern, Collector<JSONObject> out) throws Exception {
JSONObject element = pattern.get("first").get(0);
out.collect(element);
}
}
);
DataStream<JSONObject> timeOutDStream = flatSelectStream.getSideOutput(timeoutTag);
// TODO 11. 合并两个流并将数据写出到 Kafka
DataStream<JSONObject> unionDStream = flatSelectStream.union(timeOutDStream);
String targetTopic = "dwd_traffic_user_jump_detail";
FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(targetTopic);
unionDStream .map(JSONAware::toJSONString)
.addSink(kafkaProducer);
env.execute();
}
}边栏推荐
- 2022年SQL经典面试题总结(带解析)
- Simulation volume leetcode [normal] 081. Search rotation sort array II
- 王树尧老师运筹学课程笔记 10 线性规划与单纯形法(关于检测数与退化的讨论)
- Cesium反射
- Teacher Wu Enda machine learning course notes 05 octave tutorial
- 联邦学习后门攻击总结(2019-2022)
- CVPR2022Oral专题系列(一):低光增强
- Analog volume leetcode [normal] 093. Restore IP address
- Idea cannot find a database solution
- 王树尧老师运筹学课程笔记 04 线性代数基础
猜你喜欢

基于C语言设计的学生成绩排名系统

Federal learning backdoor attack summary (2019-2022)

SDN topology discovery principle

Why does 5g N2 interface control plane use SCTP protocol?

Unity探索地块通路设计分析 & 流程+代码具体实现

如何优雅的写 Controller 层代码?

vscode通过remotessh结合xdebug远程调试php解决方案

【冷冻电镜】Relion4.0——subtomogram教程

Analysis of four isolation levels of MySQL things

Actual combat! Talk about how to solve the deep paging problem of MySQL
随机推荐
10道面试常问JVM题
Junda technology | applicable to "riyueyuan" brand ups wechat cloud monitoring card
Pytorch多GPU条件下DDP集群分布训练实现(简述-从无到有)
mysql查询区分大小写
数仓建模,什么是宽表?如何设计?好处与不足
SS command details
Software definition boundary SDP
Mutual conversion between Base64 and file
5g service interface and reference point
Shallow reading of condition object source code
Ali gave several SQL messages and asked how many tree search operations need to be performed?
王树尧老师运筹学课程笔记 02 高等数学基础
leetcode-592:分数加减运算
模拟卷Leetcode【普通】061. 旋转链表
王树尧老师运筹学课程笔记 06 线性规划与单纯形法(几何意义)
Teacher wangshuyao's operations research course notes 07 linear programming and simplex method (standard form, base, base solution, base feasible solution, feasible base)
线程 - 线程安全 - 线程优化
Can MySQL export tables regularly?
没那么简单的单例模式
图像加噪声与矩阵求逆