当前位置:网站首页>Disorder of flinksql

Disorder of flinksql

2022-06-10 00:37:00 Hackergin

Disorder problem

Write in business FlinkSQL when , A very common problem is disorder , When something goes wrong , Very difficult to check , And can not be stably reproduced , In this way, both the business side , Or the platform side , Are in a very embarrassing situation .

In real time join in , If it is Regular Join, We use the Hash Join The way , The left and right tables are based on Join Key Conduct hash, Guaranteed to have the same Join Key That's the data that can Hash To the same concurrent , Conduct join The calculation of .

Take the following example to illustrate , Here are three tables , They are the order form , Order details , And commodity categories .

  • The real-time data of these three tables are from MySQL Acquire and write in real time Kafka, Will change in real time , Cannot use window calculation
  • Except that the order form has an order time , The other two tables have no time attribute , So I can't use watermark

CREATE TABLE orders (
	order_id VARCHAR,
	order_time TIMESTAMP
) WITH (
	'connector' = 'kafka',
	'format' = 'changelog-json'
	...
);

CREATE TABLE order_item (
	order_id VARCHAR,
	item_id VARCHAR
) WITH (
	'connector' = 'kafka',)
	'format' = 'changelog-json'
	...
);

CREATE TABLE item_detail (
	item_id VARCHAR,
	item_name VARCHAR,
	item_price BIGINT
) WITH (
	'connector' = 'kafka',
	'format' = 'changelog-json'
	...
);

Use Regular Join Perform multiplexing Join, The data table widening operation is as follows


SELECT o.order_id, i.item_id, d.item_name, d.item_price, o.order_time
FROM orders o
LEFT JOIN order_item i ON o.order_id = i.order_id
LEFT JOIN item_detail d ON i.item_id = d.item_id

Finally generated DAG The figure is shown below :

You can find :
first join ( Hereinafter uniformly referred to as ijoin1) Is the condition of order_id, The join The two inputs of will be in the form of order_id Conduct hash, Have the same order_id Data can be sent to the same subtask

the second join ( Hereinafter uniformly referred to as join2) The condition is item_id, The join The two inputs of will be in the form of item_id Conduct hash, Have the same item_id Data will be sent to the same subtask.

Under normal circumstances , Have the same order_id The data of , Must have the same item_id, But because the above example code , We use left join Writing , Even without join On , It will also be output as null The data of , This may lead to uncertainty in the final result .

Take the following data as an example , Explain it in detail :

TABLE orders

order_idorder_time
id_0012022-06-03 00:00:00

TABLE order_item

order_iditem_id
id_001item_001

TABLE item_detail

item_iditem_nameitem_price
item_001 category 110

The output data is as follows :
1) Indicates the concurrency of output data
+I Represents the attributes of the data (+I, , -D, -U, +U)
first JOIN Output

1) +I(id_001, null, 2022-06-03 00:00:00)
1) -D(id_001, null, 2022-06-03 00:00:00)
1) +I(id_001, item_001, 2022-06-03 00:00:00)

the second JOIN Output

1) +I(id_001, null, null, null, 2022-06-03 00:00:00)
1) -D(id_001, null, null, null, 2022-06-03 00:00:00)
2) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) +I(id_001, item_001,  category 1, 10, 2022-06-03 00:00:00)

The above results are only one of the possible situations of the above operations , It doesn't have to happen in actual operation . We can find out join1 Send results to join2 after , same order_id It doesn't have to be sent to the same subtask, So when the data goes through join2, same order_id Data will fall into different concurrency , So in the subsequent data processing , There is a very high probability that the final result will be uncertain .

Let's subdivide the following scenarios to consider , Assume that after join2 The result is join_view:

  1. hypothesis join2 after , We are based on item_id Aggregate , Count the orders of the same category
SELECT item_id, sum(order_id)
FROM join_view
GROUP BY item_id

Obviously , The above disorder does not affect the result of this logic , item_id by null The data will be calculated , But it will not affect item_id by item_001 Result .

  1. hypothesis join2 after , We write the results directly into MySQL, MySQL The primary key is order_id
CREATE TABLE MySQL_Sink (
	order_id VARCHAR,
	item_id VARCHAR,
	item_name VARCHAR,
	item_price INT,
	order_time TIMESTAMP,
	PRIMARY KEY (order_id) NOT ENFORCED
) with (
	'connector' = 'jdbc'
);

INSERT INTO MySQL_Sink SELECT * FROM JOIN_VIEW;

Because we are Sink connector Concurrency is not separately set in , therefore sink The concurrency of is and join2 Concurrency is the same , therefore join2 The output of will be sent directly to sink operator , And written to the MySQL in .
Because of different concurrency, I am writing MySQL , So actually write MySQL The order of may be as follows :

2) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) +I(id_001, item_001,  category 1, 10, 2022-06-03 00:00:00)
1) +I(id_001, null, null, 2022-06-03 00:00:00)
1) -D(id_001, null, null, 2022-06-03 00:00:00)

Obviously , The end result will be by empty , What is finally written is a delete data

  1. hypothesis join2 after , We write the results directly into MySQL, The primary key is order_id, item_id, item_name
CREATE TABLE MySQL_Sink (
	order_id VARCHAR,
	item_id VARCHAR,
	item_name VARCHAR,
	item_price INT,
	order_time TIMESTAMP,
	PRIMARY KEY (order_id) NOT ENFORCED
) with (
	'connector' = 'jdbc'
);

INSERT INTO MySQL_Sink SELECT * FROM JOIN_VIEW;

And examples 2 equally , We have not set it separately sink Concurrent , So the data will be sent to sink operator , Suppose writing MySQL Sequence and examples of 2 equally :

2) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) +I(id_001, item_001,  category 1, 10, 2022-06-03 00:00:00)
1) +I(id_001, null, null, null, 2022-06-03 00:00:00)
1) -D(id_001, null, null, null, 2022-06-03 00:00:00)

The end result will be

2) +I(id_001, item_001,  category 1, 2022-06-03 00:00:00)

because MySQL Is the primary key order_id, item_id, item_name So the last -D Records are not deleted subtask 2 Written data , So the final result is correct .

  1. hypothesis join2 after , We write the results to kafka, The writing format is changelog-json , Downstream operation consumption kafka And process it
CREATE TABLE kafka_sink (
	order_id VARCHAR,
	item_id VARCHAR,
	item_name VARCHAR,
	item_price INT,
	order_time TIMESTAMP,
	PRIMARY KEY (order_id, item_id, item_name) NOT ENFORCED
) with (
	'connector' = 'kafka',
	'format' = 'changelog-json',
	'topic' = 'join_result'
);

INSERT INTO kafka_sink select * from JOIN_VIEW;

Default , If not set partitioner, kafka sink Will take us in DDL The primary key configured in generates the corresponding hash key, Used by hash Value generation partition id.
One thing we need to pay attention to , because join2 The output of is already in different concurrency , So no matter kafka_sink Choose to order_id As the only primary key , Or to order_id, item_id, item_name A primary key , We can't control different concurrent writes kafka The order of , We can only ensure that the same concurrent data can be written orderly kafka The same as partition .

  • If you set order_id Primary key , We can ensure that all the above data can be written to the same partition
  • If you set order_id, item_id, item_name Then different concurrent outputs above may be written to different partition

therefore , Here's what we need to focus on , When data is written kafka after , How does the downstream process this data :

  1. be based on order_id Deduplication , And aggregate by day , Calculate the cumulative value of the day .
    With the following SQL For example , Downstream consumption kafka when , To avoid data duplication , Based on order_id I did a weight removal , use order_id As a partition condition , be based on proctime() Deduplication ( increase table.exec.source.cdc-events-duplicate This parameter , The framework will automatically generate the de duplication operator ).
-- https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/
set 'table.exec.source.cdc-events-duplicate'='true';

CREATE TABLE kafka_source (
	order_id VARCHAR,
	item_id VARCHAR,
	item_name VARCHAR,
	item_price INT,
	order_time TIMESTAMP,
	PRIMARY KEY (order_id) NOT ENFORCED
) with (
	'connector' = 'kafka',
	'format' = 'changelog-json',
	'topic' = 'join_result'
);

--  Press order_time  polymerization ,  Calculate daily revenue 

SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd'), sum(item_price)
FROM kafka_source
GROUP BY DATE_FORMAT(order_time, 'yyyy-MM-dd')

Results the above calculation , How we expect the results to be output :
The possible output after de duplication is :

1) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) +I(id_001, item_001,  category 1, 10, 2022-06-03 00:00:00)
1) -D(id_001, item_001,  category 1, 10, 2022-06-03 00:00:00)
1) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)

After the aggregation operator :

1) +I(2022-06-03, null)
1) -D(2022-06-03, null)
1) +I(2022-06-03, 10)
1) -D(2022-06-03, 10)
1) +I(2022-06-03, 0)
1) -D(2022-06-03, 0)
1) +I(2022-06-03, null)
1) -D(2022-06-03, null)

You can find , The final output is zero 2022-06-03, null, The examples listed in this article are not perfect , Under normal circumstances , There must be other records on that day , The result of the day may not be null, But what we can know is , Because the data is out of order , The data and actual results are no longer accurate .
2) be based on order_id, item_id, item_name duplicate removal , Then aggregate by day , Calculate the cumulative value of the day .

-- https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/
set 'table.exec.source.cdc-events-duplicate'='true';

CREATE TABLE kafka_source (
	order_id VARCHAR,
	item_id VARCHAR,
	item_name VARCHAR,
	item_price INT,
	order_time TIMESTAMP,
	PRIMARY KEY (order_id, item_id, item_name) NOT ENFORCED
) with (
	'connector' = 'kafka',
	'format' = 'changelog-json',
	'topic' = 'join_result'
);

--  Press order_time  polymerization ,  Calculate daily revenue 

SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd'), sum(item_price)
FROM kafka_source
GROUP BY DATE_FORMAT(order_time, 'yyyy-MM-dd')

Output after de duplication :

1) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) +I(id_001, item_001,  category 1, 10, 2022-06-03 00:00:00)
2) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)

Because our primary key is set to order_id, item_id, item_name therefore
(id_001, item_001, null, null, 2022-06-03 00:00:00) and
1) +I(id_001, item_001, category 1, 10, 2022-06-03 00:00:00) Are different primary keys , So it doesn't affect each other .
Aggregated results :

1) +I(2022-06-03, null)
1) -D(2022-06-03, null)
1) +I(2022-06-03, 10)
1) -D(2022-06-03, 10)
1) +I(2022-06-03, 10)
1) -D(2022-06-03, 10)
1) +I(2022-06-03, 10)

The above is the output of the final result , We can find that our final result is no problem .

Cause analysis

The following figure shows the data flow changes of the original job

graph LR orders(orders) --> |hash:order_id| join1(join1) order_item(order_item) -->|hash:order_id| join1 join1 --> |hash:item_id| join2(join2) item_detail(item_detail) --> |hash:item_id| join2
  • A be based on item_id polymerization , Calculate the number of orders in the same category ( Results the correct )
graph LR join2(join2) --> |hash:item_id| group(group count:order_id)
  • B take join The data of sink to MySQL ( The primary key is order_id) ( The result error )
graph LR join2(join2) --> |forward| sink(sink) sink --> |jdbc send| MySQL(MySQL primarykey:order_id)
  • C take join The data of sink to MySQL ( The primary key is order_id, item_id, item_name) ( Results the correct )
graph LR join2(join2) --> |forward| sink(sink) sink --> |jdbc send| MySQL(MySQL primarykey:order_id)
  • D take join The data of sink to kafka, Downstream consumption kafka Data and de reprocess , Downstream treatment , It can be divided into two situations .
    • D-1 Press order_id Partition and remove duplicates ( The result error )
    • D-2 Press order_id, item_id, item_name Partition and remove duplicates ( Results the correct )
graph TD join2(join2) --> |forward| sink(sink) sink --> |kafka client| kafka(Kafka fixed partitioner) kafka --> |hash:order_id| rank(rank orderby:proctime) rank --> |"hash:date_format(order_time, 'yyyy-MM-dd')"| group("group agg:sum(item_price)")

graph TD join2(join2) --> |forward| sink(sink key) sink --> |kafka client| kafka(Kafka fixed partitioner) kafka --> |hash:order_id+item_id+item_name| rank(rank orderby:proctime) rank --> |"hash:date_format(order_time, 'yyyy-MM-dd')"| group("group agg:sum(item_price)")

From above A, B, C, D-1, D-2 These four case, It's not hard to see , Under what circumstances will lead to wrong results , Under what circumstances will not lead to wrong results , The key is to look at each task Between hash The rules .

case B The main causes of disorder are sink operator, hash The condition of is changed from the original order_id+item_id_item_name Turned into order_id
case D-1 The disorder mainly occurs in the de duplication operator, hash The rule of is changed from the original order_id+item_id+item_name Change into order_id

We can probably sum up the following experience

  • Flink The framework can guarantee operator and operator hash when , It must be guaranteed to have the same hash Value of the data in two operator Transmission sequence between
  • Flink The framework cannot guarantee that the data is continuous operator hash The order of , When operator and operator Between hash Conditions change , There may be a problem of data order .
  • When hash When conditions change from less to more , There will be no sequencing problem , When hash Conditions vary from time to time , There may be a sequence problem .

summary

Most businesses use the original real-time tasks , The core logic remains unchanged , Just put the original Hive Replace with Message queue Source surface , The result of this run , In general, it is difficult to match with offline , Although the flow batch integration is Flink The advantages of , But for some case , There are still differences between real-time results and offline results , So we're writing FlinkSQL Code , Make sure the data is ready , When writing code , Be sure to know how our data will flow , What kind of results , The logic written in this way is in line with expectations .

原网站

版权声明
本文为[Hackergin]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/161/202206092357365463.html