当前位置:网站首页>分布式(一致性协议)之领导人选举( DotNext.Net.Cluster 实现Raft 选举 )
分布式(一致性协议)之领导人选举( DotNext.Net.Cluster 实现Raft 选举 )
2022-07-06 09:36:00 【dotNET跨平台】
分布式(一致性协议)之领导人选举( DotNext.Net.Cluster 实现Raft 选举 )
继分布式锁之后的又一高可用技术爽文之分布式领导选举 或者说 分布式一致性协议的实现
分布式选举是实现高可用的必备技术,想实现主从,就必须得有选举的策略,有主从才会有一个真正的管理端进行资源的协调分配。
首先需要明确的是一致性算法的目标是什么,主要面对的问题是在只使用单个服务器时由于发生错误导致数据丢失等事情发生。解决这个问题的思路也很简单,就是备份,集群,多个服务器,将操作重复到多个机器上就不怕单个机器出错了。但随之而来的就是,数据不一致、乱序等问题,一致性算法想要做到的是即使有结点出错,对外仍是一个完整的可以正常工作的整体。
选举算法
实现一致性协议(选举)的主要算法有两种
1. Raft
2. Paxos
相当于 Paxos 来讲, Raft协议相对来讲简单一点。但是,Raft 实现起来也不是很容易,如果有朋友试图想去实现可以参考,这个地方
地址 :https://zinglix.xyz/2020/06/25/raft/
我个人也是 简单的理解了一下。
Raft 是一个非拜占庭的一致性算法,即所有通信是正确的而非伪造的。N 个结点的情况下(N为奇数)可以最多容忍 (N−1)/2个结点故障
为啥需要单独的选举算法
我曾经试图实现一个WEB服务功能,我不能保证这个服务的高可用,我又不想用其他的现有服务,我就想让服务自己本身能支持高可用。
当时,因为自己认知的问题,并没有短时间找到一个可用的方案,不过现在有了。
要是当时有,可能就是别样风景了,不好说。
分布式选举的大致算法
简单的来讲就是找到一个领头的,假设有一个leader key,redis里,谁抢到了,谁就是leader,也是可以实现的。这种分布式锁实现的领导选举也可以适用于简单的项目中,并且,支持单个服务主机。
想用分布式选举算法,机器最少得2台以上,或者是概念上的两台。
Raft中主要有三个角色 Leader(领导人)、Follower (跟随者)和 Candidate(候选人),当某台机器成为了领导人,就会成为主要对外对接人,然后把对接的事情同步给下边的跟随者或者候选人同步信息。
因为对外只能有一个主服务,起到协调管理的作用。
这样,就会把相应的指令(日志)分配各个客户端,起到数据一致性的作用 ,这样,当领导人废了,下个人接任,还能继续起到作用。
Raft 选举的实例
我找了很多.Net 实现的Raft,很多只能说是个玩具,不能用于生产。
不过幸好,还真有生产级别的。
那就是 DotNext.Net.Cluster 和 DotNext.AspNetCore.Cluster (支持http ) 。
主要是基于 DotNext 中的组件。
DotNext.Net.Cluster 和 DotNext.AspNetCore.Cluster
1. DotNext.Net.Cluster包含集群编程模型、Raft 算法的传输无关实现、Raft 的 TCP 和 UDP 传输绑定、HyParView membersip 协议的传输无关实现,用于基于 Gossip 的消息传递
2. DotNext.AspNetCore.Cluster是基于DotNext.Net.Cluster库的 Raft 和 HyParView 算法的具体实现,用于构建 ASP.NET Core 应用程序
支持的功能列表
支持的功能列表:
1. 网络传输:TCP、UDP、HTTP 1.1、HTTP/2、HTTP/3
2. TLS 支持:TCP、HTTP 1.1、HTTP/2、HTTP/3
3. 支持日志压缩的高性能、通用Persistent Write-Ahead Log
4. 跨集群节点复制日志条目
5. 与 ASP.NET Core 框架紧密集成
6. 对 Docker/LXC/Windows 容器友好
7. 一切都是可扩展的
• 7.1 自定义预写日志
• 7.2 自定义网络传输
• 7.3 集群成员发现
基于 DotNext.Net.Cluster 的TCP 选举实例
其实他也是支持http的,当然,更多姿势,得大佬自己去挖掘了。
项目大致结构
细心的小伙伴就会发现,这个是个.Net 6的项目,因为它的nuget包,只支持.Net 6的。
有需要的可以自己改改。
项目是参考源示例,改了一下,有需要的朋友直接去看官方案例
项目重点
Install-Package DotNext.Net.Cluster -Version 4.6.0
DataModifier.cs
internal sealed class DataModifier : BackgroundService
{
private readonly IRaftCluster cluster;
private readonly ISupplier<long> valueProvider;
public DataModifier(IRaftCluster cluster, ISupplier<long> provider)
{
this.cluster = cluster;
valueProvider = provider;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken).ConfigureAwait(false);
var leadershipToken = cluster.LeadershipToken;
TitleInfo.Show(!leadershipToken.IsCancellationRequested);
if (!leadershipToken.IsCancellationRequested)
{
var newValue = valueProvider.Invoke() + 500L;
Console.WriteLine("保存领导节点生成的值 {0}", newValue);
var source = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, leadershipToken);
try
{
var entry = new Int64LogEntry { Content = newValue, Term = cluster.Term };
await cluster.ReplicateAsync(entry, source.Token);
}
catch (Exception e)
{
Console.WriteLine("未知异常 {0}", e);
}
finally
{
source?.Dispose();
}
}
}
}
}
这个应该是核心服务,会与其他客户端进行通信和具体的选举,以及日志的传输
Program.cs
class Program
{
static async Task Main(string[] args)
{
await UseTcpTransport(Path.Combine(AppContext.BaseDirectory, "raftConfig"));
}
static Task UseTcpTransport(string path)
{
//获取所有配置
var jsonConfiguration = new ConfigurationBuilder().SetBasePath(Environment.CurrentDirectory).AddJsonFile("appsettings.json", optional: true, reloadOnChange: true).Build();
var NodeInfo = new NodeInfo();
jsonConfiguration.Bind("NodeInfo", NodeInfo);
Console.WriteLine($"MainNode:{NodeInfo.MainNode}");
TitleInfo.Node = NodeInfo.MainNode;
var configuration = new RaftCluster.TcpConfiguration(IPEndPoint.Parse(NodeInfo.MainNode))
{
RequestTimeout = TimeSpan.FromMilliseconds(140),
LowerElectionTimeout = 150,
UpperElectionTimeout = 300,
TransmissionBlockSize = 4096,
ColdStart = false,
};
//加载全部地址
//线上环境自己重写服务
var builder = configuration.UseInMemoryConfigurationStorage().CreateActiveConfigurationBuilder();
foreach (var item in NodeInfo.Nodes)
{
var address = IPEndPoint.Parse(item);
builder.Add(ClusterMemberId.FromEndPoint(address), address);
}
builder.Build();
TitleInfo.Show();
return UseConfiguration(configuration, path);
}
static async Task UseConfiguration(RaftCluster.NodeConfiguration config, string? persistentStorage)
{
var loggerFactory = new LoggerFactory();
var loggerOptions = new ConsoleLoggerOptions
{
LogToStandardErrorThreshold = LogLevel.Warning
};
loggerFactory.AddProvider(new ConsoleLoggerProvider(new FakeOptionsMonitor<ConsoleLoggerOptions>(loggerOptions)));
config.LoggerFactory = loggerFactory;
using var cluster = new RaftCluster(config);
cluster.LeaderChanged += ClusterConfigurator.LeaderChanged;
var modifier = default(DataModifier?);
if (!string.IsNullOrEmpty(persistentStorage))
{
var state = new SimplePersistentState(persistentStorage, new AppEventSource());
cluster.AuditTrail = state;
modifier = new DataModifier(cluster, state);
}
await cluster.StartAsync(CancellationToken.None);
await (modifier?.StartAsync(CancellationToken.None) ?? Task.CompletedTask);
//控制台等待取消
using var handler = new CancelKeyPressHandler();
Console.CancelKeyPress += handler.Handler;
await handler.WaitAsync();
Console.CancelKeyPress -= handler.Handler;
//停止服务
await (modifier?.StopAsync(CancellationToken.None) ?? Task.CompletedTask);
await cluster.StopAsync(CancellationToken.None);
}
}
总体来说,项目还是很简单的。
我把客户端的地址给配置了
appsettings.json
这个结构应该很容易理解,一个是当前端的地址,一个是所有节点的地址,当然也包含当前地址。
{
"NodeInfo": {
"MainNode": "127.0.0.1:6001" ,
"Nodes": [
"127.0.0.1:6001",
"127.0.0.1:6002",
"127.0.0.1:6003"
]
}
}
运行方式
我自己是把Bin目录复制三份,每份的 appsettings.json 修改下,然后,双击 RaftDemo.exe 就运行起来了。
注意
如果起用一个节点没个卵用,最少得两个节点。
运行效果
总结
这个库是可以用在生产环境的,所以,还是值得研究一下下的。
代码地址
https://github.com/kesshei/RaftDemo.git
https://gitee.com/kesshei/RaftDemo.git
参考文档
https://zinglix.xyz/2020/06/25/raft/
https://github.com/dotnet/dotNext/tree/master/src/cluster
阅
一键三连呦!,感谢大佬的支持,您的支持就是我的动力!
另
坚持更了大概一个月,也有几个铁粉了,会持续更,但是连续更太累了(按天更吃不消哦)。
感谢大佬的支持。
边栏推荐
- Garbage first of JVM garbage collector
- Interpretation of Flink source code (II): Interpretation of jobgraph source code
- C# WinForm系列-Button简单使用
- Coursera cannot play video
- Total / statistics function of MySQL
- 信息与网络安全期末复习(完整版)
- C#版Selenium操作Chrome全屏模式显示(F11)
- Flink源码解读(三):ExecutionGraph源码解读
- Deploy flask project based on LNMP
- About selenium starting Chrome browser flash back
猜你喜欢
自动答题 之 Selenium测试直接运行在浏览器中,就像真正的用户在操作一样。
集成开发管理平台
Huawei certified cloud computing hica
虚拟机启动提示Probing EDD (edd=off to disable)错误
Solr appears write Lock, solrexception: could not get leader props in the log
PySpark算子处理空间数据全解析(4): 先说说空间运算
【逆向初级】独树一帜
Interpretation of Flink source code (III): Interpretation of executiongraph source code
JVM garbage collector part 1
信息与网络安全期末复习(基于老师给的重点)
随机推荐
PySpark算子处理空间数据全解析(4): 先说说空间运算
Program counter of JVM runtime data area
Flink源码解读(二):JobGraph源码解读
04个人研发的产品及推广-数据推送工具
Introduction to spring trick of ByteDance: senior students, senior students, senior students, and the author "brocade bag"
Flink analysis (I): basic concept analysis
Learn the wisdom of investment Masters
JVM垃圾回收概述
How uipath determines that an object is null
Garbage first of JVM garbage collector
mysql的列的数据类型详解
自动化运维利器-Ansible-Playbook
CTF逆向入门题——掷骰子
06个人研发的产品及推广-代码统计工具
Junit单元测试
Error: Publish of Process project to Orchestrator failed. The operation has timed out.
[VNCTF 2022]ezmath wp
Redis quick start
网络分层概念及基本知识
CentOS7上Redis安装