需求
产品中需要向不同的客户推送数据,原来的实现是每条数据产生后就立即向客户推送数据,走的的是HTTP协议。因为每条数据都比较小,而数据生成的频次也比较高,这就会频繁的建立HTTP连接,而且每次HTTP传输中携带的业务数据都很小,对网络的实际利用率不高。希望能够提高网络的利用率,并降低系统的负载。
分析
一个很自然的想法就是将多条数据一起发送,这里有几个关键点:
1、多条数据的聚合逻辑: 是攒够几条发送,还是按照时间周期发送。如果是攒够几条发送,在数据比较稀疏或者产生频率不那么稳定的时候,攒够需要的数据条数可能比较困难,这时候还得需要一个过期时间,因为客户可能接受不了太多的延迟。既然不管怎样都需要使用时间进行控制,我这里索性就选择按照时间周期发送了。思路是:自上次发送时间起,经过了某个时长之后,就发送客户在这段时间内产生的所有数据。
2、数据到期判断方法:既然选择了按照时间周期发送,那么就必须有办法判断是否到了发送时间。一个很简单的想法就是轮询,把所有客户轮询一遍,看看谁的数据到期了,就发送谁的。这个算法的时间复杂度是O(N),如果客户比较多,就会消耗过多的时间在这上边。还有一个办法:如果客户按照时间排序好了,那么只需要取时间最早的客户的数据时间判断就好了,满足就发送,一直向后找,直到获取的客户数据时间不符合条件,则退出处理,然后等一会再进行判断处理。这就需要有一个支持排序的数据结构,写入数据时自动排序,这种数据结构的时间复杂度一般可以做到O(log(n))。对于这个数据结构的读写操作原理上就是队列的操作方式,只不过是个可排序的队列。
3、区分客户:不同客户的数据接收地址不同,向具体某个客户发送数据时,应该能比较方便的聚合他的数据,最好是直接就能拿到需要发送的数据。可以使用字典数据结构来满足这个需求,取某个客户数据的时间复杂度可以降低到O(1)。
4、数据的安全性问题:如果程序在数据发送成功之前退出了,未发送的数据怎么办?是还能继续发送,还是就丢掉不管了。如果要在程序重启后恢复未发送成功的数据,则必须将数据同步到别的地方,比如持久化到磁盘。因为我这里的数据安全性要求不高,丢失一些数据也是允许的,所以要发送的数据收到之后放到内存就行了。
实现
上文提到可排序的数据结构,可以使用SortedList<TKey,TValue>,键是时间,值是这个时间产生了数据的客户标识列表。不过它的读写操作不是线程安全的,需要自己做同步,这里简单点就使用lock了。
对于不同客户的数据,为了方便获取,使用Dictionary<TKey,TValue>来满足,键是客户的标识,值是累积的未发送客户数据。这个数据读写也不是线程安全的,可以和SortedList的读写放到同一个lock中。
下边是它们的定义:
SortedList<DateTime, List<TKey>> _queue = new SortedList<DateTime, List<TKey>>();
Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();
readonly object _lock = new object();
插入数据的时候,需要先写入SortedList,然后再写入Dictionary。代码逻辑比较简单,请看:
public void Publish(TKey key, TValue value)
{
DateTime now = DateTime.Now;
lock (_lock)
{
if (_queue.TryGetValue(now, out List<TKey>? keys))
{
if (!keys!.Contains(key))
{
keys.Add(key);
}
}
else
{
_queue.Add(now, new List<TKey> { key });
}
if (_data.TryGetValue(key, out List<TValue>? values))
{
values.Add(value);
}
else
{
_data.Add(key, new List<TValue> { value });
}
}
}
对于消费数据,这里采用拉数据的模式。最开始写的方法逻辑是:读取一条数据,处理它,然后从队列中删除。但是这个逻辑需要对队列进行读写,所以必须加锁。一般处理数据比较耗时,比如这里要通过HTTP发送数据,加锁的话就可能导致写数据到队列时阻塞的时间比较长。所以这里实现的是把可以发送的数据全部提取出来,然后就释放锁,数据的处理放到锁的外部实现,这样队列的读写性能就比较好了。
public List<(TKey key, List<TValue> value)> Pull(int maxNumberOfMessages)
{
List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
DateTime now = DateTime.Now;
lock (_lock)
{
int messageCount = 0;
while (true)
{
if (!_queue.Any())
{
break;
}
var first = _queue.First();
var diffMillseconds = now.Subtract(first.Key).TotalMilliseconds;
if (diffMillseconds < _valueDequeueMillseconds)
{
break;
}
var keys = first.Value;
foreach (var key in keys)
{
if (_data.TryGetValue(key, out List<TValue>? keyValues))
{
result.Add((key, keyValues));
_data.Remove(key);
messageCount += keyValues!.Count;
}
}
_queue.RemoveAt(0);
if (messageCount >= maxNumberOfMessages)
{
break;
}
}
}
return result;
}
这段代码比较长一些,我梳理下逻辑:取队列的第一条数据,判断时间是否达到发送周期,未达到则直接退出,方法返回空列表。如果达到发送周期,则取出第一条数据中存储的客户标识,然后根据这些标识获取对应的客户未发送数据,将这些数据按照客户维度添加到返回列表中,将这些客户及其数据从队列中移除,返回有数据的列表。这里还增加了一个拉取数据的条数限制,方便根据业务实际情况进行控制。
再来看一下怎么使用这个队列,这里模拟多个生产者加一个消费者,其实可以任意多个生产者和消费者:
TimeSortedQueue<string, string> queue = new TimeSortedQueue<string, string>(3000);
List<Task> publishTasks = new List<Task>();
for (int i = 0; i < 4; i++)
{
var j = i;
publishTasks.Add(Task.Factory.StartNew(() =>
{
int k = 0;
while (true)
{
queue.Publish($"key_{k}", $"value_{j}_{k}");
Thread.Sleep(15);
k++;
}
}, TaskCreationOptions.LongRunning));
}
Task.Factory.StartNew(() =>
{
while (true)
{
var list = queue.Pull(100);
if (list.Count <= 0)
{
Thread.Sleep(100);
continue;
}
foreach (var item in list)
{
Console.WriteLine($"{DateTime.Now.ToString("mmss.fff")}:{item.key}, {string.Join(",", item.value)}");
}
}
}, TaskCreationOptions.LongRunning);
Task.WaitAll(publishTasks.ToArray());
以上就是针对这个特定需求实现的一个按照时间进行排序的队列。
万物皆可排序的队列
我们很容易想到,既然可以按照时间排序,那么按照别的数据类型排序也是可以的。这个数据结构可以应用的场景很多,比如按照权重排序的队列、按照优先级排序的队列、按照年龄排序的队列、按照银行存款排序的队列,等等。这就是一个万物皆可排序的队列。
我这里把主要代码贴出来(完整代码和示例请看文末):
public class SortedQueue<TSortKey, TKey, TValue>
where TSortKey : notnull, IComparable
where TKey : notnull
where TValue : notnull
{
Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();
SortedList<TSortKey, List<TKey>> _queue = new SortedList<TSortKey, List<TKey>>();
readonly object _lock = new object();
/// <summary>
/// Create a new instance of SortedQueue
/// </summary>
public SortedQueue(int maxNumberOfMessageConsumedOnce)
{
}
/// <summary>
/// Publish a message to queue
/// </summary>
/// <param name="sortKey">The key in the queue for sorting. Different messages can use the same key.</param>
/// <param name="key">The message key.</param>
/// <param name="value">The message value.</param>
public void Publish(TSortKey sortKey, TKey key, TValue value)
{
lock (_lock)
{
if (_queue.TryGetValue(sortKey, out List<TKey>? keys))
{
keys.Add(key);
}
else
{
_queue.Add(sortKey, new List<TKey> { key });
}
if (_data.TryGetValue(key, out List<TValue>? values))
{
values.Add(value);
}
else
{
_data.Add(key, new List<TValue> { value });
}
}
}
/// <summary>
/// Pull a batch of messages.
/// </summary>
/// <param name="maxNumberOfMessages">The maximum number of pull messages.</param>
/// <returns></returns>
public List<(TKey Key, List<TValue> Value)> Pull(int maxNumberOfMessages)
{
List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
lock (_lock)
{
int messageCount = 0;
while (true)
{
if (!_queue.Any())
{
break;
}
var keys = _queue.First().Value;
foreach (var key in keys)
{
if (_data.TryGetValue(key, out List<TValue>? keyValues))
{
result.Add((key, keyValues));
_data.Remove(key);
messageCount += keyValues!.Count;
}
}
_queue.RemoveAt(0);
if (messageCount >= maxNumberOfMessages)
{
break;
}
}
}
return result;
}
}
代码逻辑还是比较简单的,就不罗嗦了,如有问题欢迎留言交流。
再说数据安全
因为在这个实现中所有待处理的数据都在内存中,丢失数据会带来一定的风险,因为我这个程序前边还有一个队列,即使程序崩溃了,也只损失没处理的一小部分数据,业务上可以接受,所以这样做没有问题。如果你对这个程序感兴趣,需要慎重考虑你的应用场景。
来看看数据丢失可能发生的两种情况:
一是数据还在队列中时程序重启了:对于这种情况,前文提到将数据同步到其它地方,比如写入Redis、写入数据库、写入磁盘等等。不过因为网络IO、磁盘IO较慢,这往往会带来吞吐量的大幅下降,想要保证一定的吞吐量,还得引入一些分片机制,又因为分布式的不可靠,可能还得增加一些容错容灾机制,比较复杂,可以参考Kafka。
二是数据处理的时候失败了:对于这种情况,可以让程序重试;但是如果异常导致程序崩溃了,数据已经从内存或者其它存储中移除了,数据还是会发生丢失。这时候可以采用一个ACK机制,处理成功后向队列发送一个ACK,携带已经处理的数据标识,队列根据标识删除数据。否则消费者还能消费到这些数据。
这些问题并不一定要完全解决,还是得看业务场景,有可能你把数据持久化到Redis就够了,或者你也不用引入ACK机制,记录下处理到哪一条了就行了。
以上就是本文的主要内容了,完整代码和示例请访问Github:https://github.com/bosima/dotnet-demo/tree/main/CSharp-SortedList
C#实现一个万物皆可排序的队列的更多相关文章
- 20190814 On Java8 第三章 万物皆对象
第三章 万物皆对象 对象创建 数据存储 有5个不同的地方可以存储数据: 寄存器 (Registers) 最快的存储区域,位于CPU内部 .无法直接控制. 栈内存(Stack) 存在于常规内存 RAM ...
- Go语言: 万物皆异步
来源:https://www.jianshu.com/p/62c0cd107da3 同步和异步.阻塞和非阻塞 首先要明确的是,同步(Synchronous)和异步(Asynchronous),阻塞(B ...
- 万物皆有始有终: Hawk5即日起停止升级迭代
从即日起,Hawk将停止升级工作,其版本号将停留在5. https://github.com/ferventdesert/Hawk Hawk已经开发和维护6年时间了,它曾经承载了开发者很多的期待.背后 ...
- js实现随机选取[10,100)中的10个整数,存入一个数组,并排序。 另考虑(10,100]和[10,100]两种情况。
1.js实现随机选取[10,100)中的10个整数,存入一个数组,并排序. <!DOCTYPE html> <html lang="en"> <hea ...
- Javascript万物皆对象?
在javascript的世界里,有这么一句话,万物皆对象. 但是这个对象,应该怎么理解呢? exm........??,难道值类型也是对象?!! 当然,不是. 准确地讲是对于“引用类型”而言. 那,在 ...
- js sort方法根据数组中对象的某一个属性值进行排序(实用方法)
js sort方法根据数组中对象的某一个属性值进行排序 sort方法接收一个函数作为参数,这里嵌套一层函数用来接收对象属性名,其他部分代码与正常使用sort方法相同. var arr = [ {nam ...
- 手写面试编程题- 数组去重 深拷贝 获取文本节点 设置奇数偶数背景色 JS中检测变量为string类型的方法 第6题闭包 将两个数组合并为一个数组 怎样添加、移除、移动、复制、创建和查找节点? 继承 对一个数组实现随机排序 让元素水平 垂直居中的三种方式 通过jQuery的extend方法实现深拷贝
第1题==>实现数组去重 通过 new Set(数组名) // var arr = [12, 12, 3, 4, 5, 4, 5, 6, 6]; // var newarr1 = new Set ...
- 一个用JS数组实现的队列
一个用JS数组实现的队列 /*一个用数组实现的队列*/ function Queue(){ this.dataStore = [];//存放队列的数组,初始化为空 this.enqueue = enq ...
- PHP多维数组根据其中一个字段的值排序
平时简单的一维数组或者简单的数组排序这里就不多作介绍,这里主要是针对平时做项目中的可能遇到的情况,根据多维数组中的其中一个排序.用到的php函数是:array_multisort. 思路:获取其中你需 ...
- 算法课上机实验(一个简单的GUI排序算法比较程序)
(在家里的电脑上Linux Deepin截的图,屏幕大一点的话,deepin用着还挺不错的说) 这个应该是大二的算法课程上机实验时做的一个小程序,也是我的第一个GUI小程序,实现什么的都记不清了,只记 ...
随机推荐
- JAVASE02-Unit04: 集合框架 、 集合操作 —— 线性表
Unit04: 集合框架 . 集合操作 -- 线性表 操作集合元素相关方法 package day04; import java.util.ArrayList; import java.util.Co ...
- 20145330Java程序设计第三次实验
20145330<Java程序设计>第三次实验报告 实验三 敏捷开发与XP实践 实验内容 1.使用git上传代码 2.使用git实现代码开发实践 3.实现代码的重载 实验步骤 使用git上 ...
- flash 动画数据导出 到 coco2d-js ,cocos2d-x 问题的记录
1:必须搞清flash坐标系 和 cocos2d 的坐标系的差异2:对于cocos2d系列坐标系的深入理解: 以前我们常认为 coco2d-x的X,Y是相对坐标系,相对于父节点的X,Y的坐标,这种说法 ...
- linux(centos)搭建svn
1.yum install subversion 2.输入rpm -ql subversion查看安装位置 输入 svn --help可以查看svn的使用方法 3.创建svn版本库目录 mkdir - ...
- UIMenuController搭配UIPasteboard,执行拷贝-黏贴操作-b
一.基本概念 UIKit框架中,可以直接执行拷贝黏贴操作的有:UITextView.UITextField和UIWebView,其他控件需要实现相关方法. 关于UIPasteboard ·黏贴板是ap ...
- 百度之星2017初赛A轮 1001 小C的倍数问题
小C的倍数问题 Accepts: 1990 Submissions: 4931 Time Limit: 2000/1000 MS (Java/Others) Memory Limit: 32768/3 ...
- SQL获取当前时间月份为两位数
--获取当前时间月份为两位数 )),) --获取当前时间上月月份为两位数 , )),)
- C++编程 - tuple、any容器
C++编程 - tuple.any容器 flyfish 2014-10-29 一 tuple tuple是固定大小的容器,每一个元素类型能够不同 作用1 替换struct struct t1 { in ...
- C++11之std::future和std::promise
为什么C++11引入std::future和std::promise?C++11创建了线程以后,我们不能直接从thread.join()得到结果,必须定义一个变量,在线程执行时,对这个变量赋值,然后执 ...
- 学习Java必看书籍和步骤(转载)
原地址:http://blog.csdn.net/yongjian1092/article/details/7372678 Java语言基础 谈到Java语言基础学习的书籍,大家肯定会推荐Bruce ...