
Data needs to be pushed to different customers in the product , The original implementation is to push data to customers immediately after each data is generated , It's HTTP agreement . Because each data is relatively small , And the frequency of data generation is also relatively high , This will be frequently established HTTP Connect , And every time HTTP The business data carried in the transmission is very small , The actual utilization rate of the network is not high . Hope to improve the utilization of the network , And reduce the load of the system .


A natural idea is to send multiple pieces of data together , Here are a few key points :

1、 Aggregation logic of multiple data : Save enough to send , Or send according to the time period . If you save enough to send , When the data is sparse or the generation frequency is not so stable , It may be difficult to save enough data pieces , At this time, we need an expiration time , Because the customer may not accept too many delays . Since you need to use time to control anyway , I simply choose to send according to the time period . The train of thought is : Since the last sending time , After a certain length of time , Just send all the data generated by the customer during this period .

2、 Data expiration judgment method : Now that you have chosen to send according to the time period , Then there must be a way to judge whether it is time to send . A very simple idea is polling , Poll all customers , See whose data has expired , Just send someone's . The time complexity of this algorithm is O(N), If there are many customers , It will consume too much time here . There's another way : If the customers are sorted by time , Then just take the data of the earliest customer and judge the time , Send when you are satisfied , All the way back , Until the time of obtaining customer data does not meet the conditions , Exit processing , Then wait for a while to judge and deal with . This requires a data structure that supports sorting , Automatic sorting when writing data , The time complexity of this data structure can generally be achieved O(log(n)). In principle, the read and write operation of this data structure is the operation mode of the queue , It's just a sortable queue .

3、 Distinguish between customers : Different customers have different data receiving addresses , When sending data to a specific customer , It should be convenient to aggregate his data , It's best to get the data you need to send directly . Dictionary data structures can be used to meet this requirement , The time complexity of retrieving a customer's data can be reduced to O(1).

4、 Security of data : If the program exits before the data is successfully sent , What about unsent data ? Yes, you can still send , Or just throw it away . If you want to recover the data that has not been sent successfully after the program restarts , Then the data must be synchronized to another place , For example, persistence to disk . Because the data security requirements here are not high , Losing some data is also allowed , So after receiving the data to be sent, put it into memory .


The sortable data structure mentioned above , have access to SortedList<TKey,TValue>, Key is time , The value is the customer identification list of data generated at this time . However, its read and write operations are not thread safe , You need to synchronize yourself , Here is a simple point to use lock 了 .

Data for different customers , For easy access , Use Dictionary<TKey,TValue> To satisfy , The key is the identification of the customer , Value is cumulative unsent customer data . This data reading and writing is not thread safe , You can talk to SortedList Read and write in the same lock in .

Below is their definition :

SortedList<DateTime, List<TKey>> _queue = new SortedList<DateTime, List<TKey>>();
Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();
readonly object _lock = new object();

When inserting data , Need to write first SortedList, And then write Dictionary. The code logic is relatively simple , Please have a look at :

    public void Publish(TKey key, TValue value)
DateTime now = DateTime.Now;
lock (_lock)
if (_queue.TryGetValue(now, out List<TKey>? keys))
if (!keys!.Contains(key))
_queue.Add(now, new List<TKey> { key });
} if (_data.TryGetValue(key, out List<TValue>? values))
_data.Add(key, new List<TValue> { value });

For consumption data , Here we use the mode of pulling data . The method logic written at the beginning is : Read a piece of data , Deal with it , Then delete from the queue . But this logic needs to read and write to the queue , So it must be locked . Generally, data processing is time-consuming , For example, here we need to pass HTTP send data , Locking may lead to a long blocking time when writing data to the queue . So what we achieve here is to extract all the data that can be sent , Then release the lock , Data processing is implemented outside the lock , In this way, the read and write performance of the queue is better .

    public List<(TKey key, List<TValue> value)> Pull(int maxNumberOfMessages)
List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
DateTime now = DateTime.Now; lock (_lock)
int messageCount = 0;
while (true)
if (!_queue.Any())
} var first = _queue.First();
var diffMillseconds = now.Subtract(first.Key).TotalMilliseconds;
if (diffMillseconds < _valueDequeueMillseconds)
} var keys = first.Value;
foreach (var key in keys)
if (_data.TryGetValue(key, out List<TValue>? keyValues))
result.Add((key, keyValues));
messageCount += keyValues!.Count;
_queue.RemoveAt(0); if (messageCount >= maxNumberOfMessages)
} return result;

This code is longer , Let me sort out the logic : Get the first data in the queue , Judge whether the time reaches the sending cycle , If not, exit directly , Method returns an empty list . If the sending cycle is reached , Then take out the customer ID stored in the first piece of data , Then get the corresponding unsent data of the customer according to these identifications , Add these data to the return list according to the customer dimension , Remove these customers and their data from the queue , Return the list with data . There is also a limit on the number of pieces of data pulled , It is convenient to control according to the actual situation of the business .

Let's see how to use this queue , Here we simulate multiple producers plus one consumer , In fact, there can be any number of producers and consumers :

TimeSortedQueue<string, string> queue = new TimeSortedQueue<string, string>(3000);

List<Task> publishTasks = new List<Task>();

for (int i = 0; i < 4; i++)
var j = i;
publishTasks.Add(Task.Factory.StartNew(() =>
int k = 0;
while (true)
queue.Publish($"key_{k}", $"value_{j}_{k}");
}, TaskCreationOptions.LongRunning));
} Task.Factory.StartNew(() =>
while (true)
var list = queue.Pull(100);
if (list.Count <= 0)
} foreach (var item in list)
Console.WriteLine($"{DateTime.Now.ToString("mmss.fff")}:{item.key}, {string.Join(",", item.value)}");
} }, TaskCreationOptions.LongRunning); Task.WaitAll(publishTasks.ToArray());

The above is a queue sorted by time for this specific requirement .

A queue in which everything can be sorted

It's easy for us to think , Since you can sort by time , Then sorting by other data types is also possible . This data structure can be applied in many scenarios , For example, queues sorted by weight 、 Queues sorted by priority 、 Queues sorted by age 、 A queue sorted by bank deposits , wait . This is a queue in which everything can be sorted .

I'll post the main code here ( See the end of the article for complete code and examples ):

public class SortedQueue<TSortKey, TKey, TValue>
where TSortKey : notnull, IComparable
where TKey : notnull
where TValue : notnull
Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>(); SortedList<TSortKey, List<TKey>> _queue = new SortedList<TSortKey, List<TKey>>(); readonly object _lock = new object(); /// <summary>
/// Create a new instance of SortedQueue
/// </summary>
public SortedQueue(int maxNumberOfMessageConsumedOnce)
} /// <summary>
/// Publish a message to queue
/// </summary>
/// <param name="sortKey">The key in the queue for sorting. Different messages can use the same key.</param>
/// <param name="key">The message key.</param>
/// <param name="value">The message value.</param>
public void Publish(TSortKey sortKey, TKey key, TValue value)
lock (_lock)
if (_queue.TryGetValue(sortKey, out List<TKey>? keys))
_queue.Add(sortKey, new List<TKey> { key });
} if (_data.TryGetValue(key, out List<TValue>? values))
_data.Add(key, new List<TValue> { value });
} /// <summary>
/// Pull a batch of messages.
/// </summary>
/// <param name="maxNumberOfMessages">The maximum number of pull messages.</param>
/// <returns></returns>
public List<(TKey Key, List<TValue> Value)> Pull(int maxNumberOfMessages)
List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
lock (_lock)
int messageCount = 0;
while (true)
if (!_queue.Any())
} var keys = _queue.First().Value;
foreach (var key in keys)
if (_data.TryGetValue(key, out List<TValue>? keyValues))
result.Add((key, keyValues));
messageCount += keyValues!.Count;
_queue.RemoveAt(0); if (messageCount >= maxNumberOfMessages)
} return result;

The code logic is relatively simple , Don't be wordy , If you have any questions, please leave a message .

Besides, data security

Because in this implementation, all the data to be processed is in memory , Losing data will bring certain risks , Because there is a queue in front of my program , Even if the program crashes , Only a small part of the unprocessed data is lost , Acceptable in business , So there's no problem . If you are interested in this program , You need to carefully consider your application scenarios .

Let's look at two possible situations of data loss :

First, the program restarts while the data is still in the queue : In this case , As mentioned earlier, synchronize data to other places , Such as writing Redis、 Write to database 、 Write to disk, etc . But because of the Internet IO、 disk IO slower , This often leads to a sharp drop in throughput , Want to ensure a certain throughput , Some fragmentation mechanisms have to be introduced , And because distributed is unreliable , You may have to add some fault tolerance mechanisms , More complicated , You can refer to Kafka.

Second, data processing failed : In this case , You can make the program retry ; But if the exception causes the program to crash , Data has been removed from memory or other storage , Data will still be lost . At this time, you can use a ACK Mechanism , Send a message to the queue after successful processing ACK, Carry the processed data identification , The queue deletes data according to the identity . Otherwise, consumers can still consume these data .

These problems do not have to be completely solved , It still depends on the business scenario , It is possible for you to persist data to Redis That's enough , Or you don't have to introduce ACK Mechanism , Just record which one you have handled .

The above is the main content of this paper , For complete code and examples, please visit Github:

