demand
Data needs to be pushed to different customers in the product , The original implementation is to push data to customers immediately after each data is generated , It's HTTP agreement . Because each data is relatively small , And the frequency of data generation is also relatively high , This will be frequently established HTTP Connect , And every time HTTP The business data carried in the transmission is very small , The actual utilization rate of the network is not high . Hope to improve the utilization of the network , And reduce the load of the system .
analysis
A natural idea is to send multiple pieces of data together , Here are a few key points :
1、 Aggregation logic of multiple data : Save enough to send , Or send according to the time period . If you save enough to send , When the data is sparse or the generation frequency is not so stable , It may be difficult to save enough data pieces , At this time, we need an expiration time , Because the customer may not accept too many delays . Since you need to use time to control anyway , I simply choose to send according to the time period . The train of thought is : Since the last sending time , After a certain length of time , Just send all the data generated by the customer during this period .
2、 Data expiration judgment method : Now that you have chosen to send according to the time period , Then there must be a way to judge whether it is time to send . A very simple idea is polling , Poll all customers , See whose data has expired , Just send someone's . The time complexity of this algorithm is O(N), If there are many customers , It will consume too much time here . There's another way : If the customers are sorted by time , Then just take the data of the earliest customer and judge the time , Send when you are satisfied , All the way back , Until the time of obtaining customer data does not meet the conditions , Exit processing , Then wait for a while to judge and deal with . This requires a data structure that supports sorting , Automatic sorting when writing data , The time complexity of this data structure can generally be achieved O(log(n)). In principle, the read and write operation of this data structure is the operation mode of the queue , It's just a sortable queue .
3、 Distinguish between customers : Different customers have different data receiving addresses , When sending data to a specific customer , It should be convenient to aggregate his data , It's best to get the data you need to send directly . Dictionary data structures can be used to meet this requirement , The time complexity of retrieving a customer's data can be reduced to O(1).
4、 Security of data : If the program exits before the data is successfully sent , What about unsent data ? Yes, you can still send , Or just throw it away . If you want to recover the data that has not been sent successfully after the program restarts , Then the data must be synchronized to another place , For example, persistence to disk . Because the data security requirements here are not high , Losing some data is also allowed , So after receiving the data to be sent, put it into memory .
Realization
The sortable data structure mentioned above , have access to SortedList<TKey,TValue>, Key is time , The value is the customer identification list of data generated at this time . However, its read and write operations are not thread safe , You need to synchronize yourself , Here is a simple point to use lock 了 .
Data for different customers , For easy access , Use Dictionary<TKey,TValue> To satisfy , The key is the identification of the customer , Value is cumulative unsent customer data . This data reading and writing is not thread safe , You can talk to SortedList Read and write in the same lock in .
Below is their definition :
SortedList<DateTime, List<TKey>> _queue = new SortedList<DateTime, List<TKey>>();
Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();
readonly object _lock = new object();
When inserting data , Need to write first SortedList, And then write Dictionary. The code logic is relatively simple , Please have a look at :
public void Publish(TKey key, TValue value)
{
DateTime now = DateTime.Now;
lock (_lock)
{
if (_queue.TryGetValue(now, out List<TKey>? keys))
{
if (!keys!.Contains(key))
{
keys.Add(key);
}
}
else
{
_queue.Add(now, new List<TKey> { key });
}
if (_data.TryGetValue(key, out List<TValue>? values))
{
values.Add(value);
}
else
{
_data.Add(key, new List<TValue> { value });
}
}
}
For consumption data , Here we use the mode of pulling data . The method logic written at the beginning is : Read a piece of data , Deal with it , Then delete from the queue . But this logic needs to read and write to the queue , So it must be locked . Generally, data processing is time-consuming , For example, here we need to pass HTTP send data , Locking may lead to a long blocking time when writing data to the queue . So what we achieve here is to extract all the data that can be sent , Then release the lock , Data processing is implemented outside the lock , In this way, the read and write performance of the queue is better .
public List<(TKey key, List<TValue> value)> Pull(int maxNumberOfMessages)
{
List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
DateTime now = DateTime.Now;
lock (_lock)
{
int messageCount = 0;
while (true)
{
if (!_queue.Any())
{
break;
}
var first = _queue.First();
var diffMillseconds = now.Subtract(first.Key).TotalMilliseconds;
if (diffMillseconds < _valueDequeueMillseconds)
{
break;
}
var keys = first.Value;
foreach (var key in keys)
{
if (_data.TryGetValue(key, out List<TValue>? keyValues))
{
result.Add((key, keyValues));
_data.Remove(key);
messageCount += keyValues!.Count;
}
}
_queue.RemoveAt(0);
if (messageCount >= maxNumberOfMessages)
{
break;
}
}
}
return result;
}
This code is longer , Let me sort out the logic : Get the first data in the queue , Judge whether the time reaches the sending cycle , If not, exit directly , Method returns an empty list . If the sending cycle is reached , Then take out the customer ID stored in the first piece of data , Then get the corresponding unsent data of the customer according to these identifications , Add these data to the return list according to the customer dimension , Remove these customers and their data from the queue , Return the list with data . There is also a limit on the number of pieces of data pulled , It is convenient to control according to the actual situation of the business .
Let's see how to use this queue , Here we simulate multiple producers plus one consumer , In fact, there can be any number of producers and consumers :
TimeSortedQueue<string, string> queue = new TimeSortedQueue<string, string>(3000);
List<Task> publishTasks = new List<Task>();
for (int i = 0; i < 4; i++)
{
var j = i;
publishTasks.Add(Task.Factory.StartNew(() =>
{
int k = 0;
while (true)
{
queue.Publish($"key_{k}", $"value_{j}_{k}");
Thread.Sleep(15);
k++;
}
}, TaskCreationOptions.LongRunning));
}
Task.Factory.StartNew(() =>
{
while (true)
{
var list = queue.Pull(100);
if (list.Count <= 0)
{
Thread.Sleep(100);
continue;
}
foreach (var item in list)
{
Console.WriteLine($"{DateTime.Now.ToString("mmss.fff")}:{item.key}, {string.Join(",", item.value)}");
}
}
}, TaskCreationOptions.LongRunning);
Task.WaitAll(publishTasks.ToArray());
The above is a queue sorted by time for this specific requirement .
A queue in which everything can be sorted
It's easy for us to think , Since you can sort by time , Then sorting by other data types is also possible . This data structure can be applied in many scenarios , For example, queues sorted by weight 、 Queues sorted by priority 、 Queues sorted by age 、 A queue sorted by bank deposits , wait . This is a queue in which everything can be sorted .
I'll post the main code here ( See the end of the article for complete code and examples ):
public class SortedQueue<TSortKey, TKey, TValue>
where TSortKey : notnull, IComparable
where TKey : notnull
where TValue : notnull
{
Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();
SortedList<TSortKey, List<TKey>> _queue = new SortedList<TSortKey, List<TKey>>();
readonly object _lock = new object();
/// <summary>
/// Create a new instance of SortedQueue
/// </summary>
public SortedQueue(int maxNumberOfMessageConsumedOnce)
{
}
/// <summary>
/// Publish a message to queue
/// </summary>
/// <param name="sortKey">The key in the queue for sorting. Different messages can use the same key.</param>
/// <param name="key">The message key.</param>
/// <param name="value">The message value.</param>
public void Publish(TSortKey sortKey, TKey key, TValue value)
{
lock (_lock)
{
if (_queue.TryGetValue(sortKey, out List<TKey>? keys))
{
keys.Add(key);
}
else
{
_queue.Add(sortKey, new List<TKey> { key });
}
if (_data.TryGetValue(key, out List<TValue>? values))
{
values.Add(value);
}
else
{
_data.Add(key, new List<TValue> { value });
}
}
}
/// <summary>
/// Pull a batch of messages.
/// </summary>
/// <param name="maxNumberOfMessages">The maximum number of pull messages.</param>
/// <returns></returns>
public List<(TKey Key, List<TValue> Value)> Pull(int maxNumberOfMessages)
{
List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
lock (_lock)
{
int messageCount = 0;
while (true)
{
if (!_queue.Any())
{
break;
}
var keys = _queue.First().Value;
foreach (var key in keys)
{
if (_data.TryGetValue(key, out List<TValue>? keyValues))
{
result.Add((key, keyValues));
_data.Remove(key);
messageCount += keyValues!.Count;
}
}
_queue.RemoveAt(0);
if (messageCount >= maxNumberOfMessages)
{
break;
}
}
}
return result;
}
}
The code logic is relatively simple , Don't be wordy , If you have any questions, please leave a message .
Besides, data security
Because in this implementation, all the data to be processed is in memory , Losing data will bring certain risks , Because there is a queue in front of my program , Even if the program crashes , Only a small part of the unprocessed data is lost , Acceptable in business , So there's no problem . If you are interested in this program , You need to carefully consider your application scenarios .
Let's look at two possible situations of data loss :
First, the program restarts while the data is still in the queue : In this case , As mentioned earlier, synchronize data to other places , Such as writing Redis、 Write to database 、 Write to disk, etc . But because of the Internet IO、 disk IO slower , This often leads to a sharp drop in throughput , Want to ensure a certain throughput , Some fragmentation mechanisms have to be introduced , And because distributed is unreliable , You may have to add some fault tolerance mechanisms , More complicated , You can refer to Kafka.
Second, data processing failed : In this case , You can make the program retry ; But if the exception causes the program to crash , Data has been removed from memory or other storage , Data will still be lost . At this time, you can use a ACK Mechanism , Send a message to the queue after successful processing ACK, Carry the processed data identification , The queue deletes data according to the identity . Otherwise, consumers can still consume these data .
These problems do not have to be completely solved , It still depends on the business scenario , It is possible for you to persist data to Redis That's enough , Or you don't have to introduce ACK Mechanism , Just record which one you have handled .
The above is the main content of this paper , For complete code and examples, please visit Github:https://github.com/bosima/dotnet-demo/tree/main/CSharp-SortedList
C# More related articles on implementing a queue in which everything can be sorted
- 20190814 On Java8 The third chapter Everything is object
The third chapter Everything is object objects creating data storage Yes 5 There are two different places to store data : register (Registers) Fastest storage area , be located CPU Inside . No direct control . Stack memory (Stack) It exists in regular memory RAM ...
- Go Language : Everything is asynchronous
source :https://www.jianshu.com/p/62c0cd107da3 Synchronous and asynchronous . Blocking and non-blocking The first thing to be clear is , Sync (Synchronous) And asynchronous (Asynchronous), Blocking (B ...
- Everything has its beginning and end : Hawk5 From now on, stop the upgrade iteration
From now on ,Hawk The upgrade will stop , Its version number will remain 5. https://github.com/ferventdesert/Hawk Hawk Has been developed and maintained 6 Years time. , It once carried a lot of expectations of developers . behind ...
- js Realize random selection [10,100) Medium 10 It's an integer , Store an array , And sort . Another consideration (10,100] and [10,100] Two cases .
1.js Realize random selection [10,100) Medium 10 It's an integer , Store an array , And sort . <!DOCTYPE html> <html lang="en"> <hea ...
- Javascript Everything is object ?
stay javascript In the world of , There's a phrase , Everything is object . But this object , How to understand ? exm........??, Are value types also objects ?!! Of course , No . To be exact, it is for “ Reference type ” for . that , stay ...
- js sort Method to sort according to a property value of an object in the array ( Practical methods )
js sort Method to sort according to a property value of an object in the array sort Method takes a function as an argument , A layer of nested functions is used to receive object property names , Other parts of the code and normal use sort In the same way . var arr = [ {nam ...
- Hand written interview programming questions - Array weight removal Deep copy Get text nodes Set odd and even background colors JS The detection variable in is string Method of type The first 6 Problem closure Merge two arrays into one How to add 、 remove 、 Move 、 Copy 、 Create and find nodes ? Inherit Random sort of an array Let the element level Three ways to center vertically adopt jQuery Of extend Method to implement deep copy
The first 1 topic ==> Implementation of array de duplication adopt new Set( Array name ) // var arr = [12, 12, 3, 4, 5, 4, 5, 6, 6]; // var newarr1 = new Set ...
- One use JS Array implementation of the queue
One use JS Array implementation of the queue /* A queue implemented with an array */ function Queue(){ this.dataStore = [];// An array of queues , Initialization is empty this.enqueue = enq ...
- PHP Multidimensional arrays are sorted according to the value of one of the fields
Usually simple one-dimensional array or simple array sorting is not introduced here , This is mainly for the possible situations in the project , Sort according to one of the multidimensional arrays . Use of php The function is :array_multisort. Ideas : To get one you need ...
- Algorithm class computer experiment ( A simple GUI Sorting algorithm comparison program )
( On the computer at home Linux Deepin Cut the figure of , If the screen is bigger ,deepin It's very good to use it ) This should be a small program in the computer experiment of sophomore algorithm course , And my first GUI Applet , I can't remember what I realized , Just remember ...
Random recommendation
- JAVASE02-Unit04: Collections framework 、 Set operations —— The linear table
Unit04: Collections framework . Set operations -- The linear table Operate set element related methods package day04; import java.util.ArrayList; import java.util.Co ...
- 20145330Java The third experiment of programming
20145330<Java Programming > The third experiment report Experiment three Agile development and XP practice Experimental content 1. Use git Upload code 2. Use git Implementation of code development practice 3. Implement code overload The experimental steps Use git On ...
- flash Animation data export To coco2d-js ,cocos2d-x The record of the problem
1: It has to be clear that flash Coordinate system and cocos2d The difference in the coordinate system of 2: about cocos2d Deep understanding of series coordinate systems : We used to think that coco2d-x Of X,Y It's a relative coordinate system , Relative to the parent node X,Y Coordinates of , This kind of view ...
- linux(centos) build svn
1.yum install subversion 2. Input rpm -ql subversion Check the installation location Input svn --help You can see svn How to use 3. establish svn Version Library Directory mkdir - ...
- UIMenuController collocation UIPasteboard, Execute Copy - Paste operation -b
One . Basic concepts UIKit In the frame , You can directly perform copy and paste operations :UITextView.UITextField and UIWebView, Other controls need to implement related methods . About UIPasteboard · The adhesive board is ap ...
- Baidu star 2017 Preliminaries A round 1001 Small C The multiple problem of
Small C The multiple problem of Accepts: 1990 Submissions: 4931 Time Limit: 2000/1000 MS (Java/Others) Memory Limit: 32768/3 ...
- SQL Get the current time month as two digits
-- Get the current time month as two digits )),) -- Get the current time. The month of last month is in double digits , )),)
- C++ Programming - tuple、any Containers
C++ Programming - tuple.any Containers flyfish 2014-10-29 One tuple tuple It's a fixed size container , Each element type can be different effect 1 Replace struct struct t1 { in ...
- C++11 And std::future and std::promise
Why? C++11 introduce std::future and std::promise?C++11 After creating the thread , We can't go straight from thread.join() Get the results , A variable must be defined , When the thread executes , Assign a value to this variable , Then hold ...
- Study Java You have to read books and procedures ( Reprint )
The original address :http://blog.csdn.net/yongjian1092/article/details/7372678 Java Language foundation talk about Java Books on basic language learning , You will definitely recommend Bruce ...