PasteCluster集群组件
$ dotnet add package PasteCluster源码查阅: https://gitee.com/pastecode/paste-cluster
1.需要集群部署的服务,需要有主从的集群服务
2.如果是定时的任务,建议使用PasteTask任务调度器实现
3.组件适用于.net8.0以上的框架
4.集群对外的消息使用Channel队列发出
1.先引入IHttpClientFactory,示例:context.Services.AddHttpClient();
2.添加对组件的应用 示例:<PackageReference Include="PasteCluster" Version="1.0.0" />
3.使用单例注入,示例:context.Services.AddSingleton<PasteClusterHandler>();
4.配置配置文件,示例:context.Services.Configure<PasteSloveConfig>(Configuration.GetSection("ClusterConfig"));
"ClusterConfig": {
"SloveToken": "zxcvfr43dr56hgt5",//集群密钥,可自定义,防止其他集群乱入!
"ClusterHost": "",//已有的节点地址链路,示例http://192.110.0.3:80;http://192.110.0.7:80
"CurrentHost": ""//当前节点的访问地址是多少 示例http://192.110.0.6:80
}
以上是基础配置,需要自定义更多的请查阅PasteSloveConfig的属性
5.如果使用了Route路由,注意本组件需要/api/cluster/的路由转发配置
1.初始化集群组件,写入必要的信息,比如集群中还有谁,自己的HOST是多少
2.定义一个HostedService用于接收集群产生的事件
示例代码:
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using PasteCluster;
using TestCluster.redismodel;
namespace TestCluster
{
/// <summary>
/// 系统初始化
/// </summary>
public class StartHostedService : IHostedService
{
private readonly IServiceProvider _serviceProvider;
private PasteClusterHandler _cluster_handler;
private IAppCache _appCache;
private ILogger<StartHostedService> _logger;
/// <summary>
///
/// </summary>
/// <param name="serviceProvider"></param>
/// <param name="pasteClusterHandler"></param>
/// <param name="appCache"></param>
/// <param name="logger"></param>
public StartHostedService(IServiceProvider serviceProvider, PasteClusterHandler pasteClusterHandler, IAppCache appCache, ILogger<StartHostedService> logger)
{
_serviceProvider = serviceProvider;
_cluster_handler = pasteClusterHandler;
_appCache = appCache;
_logger = logger;
}
private const string node_list_cache_key = "cache:last:nodes";
//private System.Timers.Timer _timer;
/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="System.NotImplementedException"></exception>
public Task StartAsync(CancellationToken cancellationToken)
{
//可以从他处读取集群节点列表,然后把列表写入到系统中
var reads = _appCache.Get<List<PasteNodeModel>>(node_list_cache_key);
if (reads != null && reads.Count > 0)
{
foreach (var _node in reads)
{
_cluster_handler.AddNodeToList(_node).Wait();
//await _cluster_handler.AddNodeToList(_node);
}
}
//如果已知当前节点的Host(这里示例默认为80端口)信息,可以用下方的函数写入,也可以在启动的时候写入配置中
// -e ClusterConfig:CurrentHost="http://192.168.1.100"
//如果使用PasteSpider部署,则是-e ClusterConfig:CurrentHost="http://{{App.ProAddress}}"
//_cluster_handler.Register("http://192.168.1.100",0,0);
//启动集群中的当前节点(在这之前必须要确认当前节点的host是多少)
_cluster_handler.StartCluster();
//读取集群产生的数据,比如其他节点发送的数据等,这里一般是业务相关的消息
ReadClusterChannel();
//_timer = new System.Timers.Timer();
//hit_index = new Random().Next(1,100);
//_timer.Interval = 1000;
//_timer.AutoReset = true;
//_timer.Elapsed += _timer_Elapsed;
//_timer.Start();
return Task.CompletedTask;
}
//private int hit_index = 5;
//测试产生数据进行交互
//private void _timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
//{
// try
// {
// hit_index--;
// if (hit_index == 0)
// {
// _cluster_handler.PushMsgToNode($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
// hit_index = new Random().Next(1, 20);
// }
// }
// catch (Exception exl)
// {
// _logger.LogException(exl);
// }
//}
/// <summary>
/// 处理集群产生的数据
/// </summary>
private async Task ReadClusterChannel()
{
try
{
var _info = await _cluster_handler.ChannelCluster.Reader.ReadAsync();
if (_info != null && _info != default)
{
switch (_info.msg_type)
{
case 1:
{
//当前节点成为master _info.body ==PasteMasterResponse.JsonString
}
break;
case 2:
{
//当前节点不再是master _info.body ==PasteMasterResponse.JsonString
}
break;
case 3:
{
//有节点加入 _info.body == PasteNodeModel.JsonString
}
break;
case 4:
{
if (_cluster_handler.CurrentIsMaster)
{
var _node = System.Text.Json.JsonSerializer.Deserialize<PasteNodeModel>(_info.body);
if (_node != null && _node != default)
{
if (!String.IsNullOrEmpty(_node.host))
{
await _appCache.HashDeleteAsync(node_list_cache_key, _node.host);
}
}
}
}
break;
default:
{
//业务代码 // _info.body是什么消息格式要基于业务而定
}
break;
}
}
}
catch (Exception exl)
{
_logger.LogException(exl);
await Task.Delay(1000);
}
finally
{
ReadClusterChannel();
}
}
/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="System.NotImplementedException"></exception>
public Task StopAsync(CancellationToken cancellationToken)
{
_cluster_handler.Dispose();
return Task.CompletedTask;
}
}
}
3.在入口函数中启动这个HostedService
context.Services.AddHostedService<StartHostedService>();
1.如何知道当前运行情况
curl http://xxxx.xxxx.xxxx.xxxx/api/cluster/status
2.获取所有节点信息
curl http://xxxx.xxxx.xxxx.xxxx/api/cluster/nodes
3.手动注入当前节点IP信息
curl http://xxxx.xxxx.xxxx.xxxx/api/cluster/joinin?host=http://192.168.1.5
4.如何查找当前节点?
有一种模式,就是知道所有集群的IP,需要找到A是A,那么有前提
a.先要把节点的信息写入到组件,所有的节点
b.给当前节点设定一个code,可以使用SetNodeCode
c.后者使用环境信息 -e ClusterConfig:CurrentCode="xxxxxuuuu323423"
d.StartCluster()后,组件会遍历节点,查找哪个节点是自己!主要他们的链接要能通!
5.如何防止多Master的情况出现
a.在每次交互的时候,会附带当前节点的信息,比如master是谁,有多少各节点等,通过这个信息进行判断各自节点的信息是否一致,不一致的时候触发信息同步!
b.master冲突的时候,会检查哪一个节点的选举时间更早,更早的为主,时间采用时间戳ms模式,如果时间戳一致,则再对比对应的Id信息
c.节点数据交互的时候,会进行master和count的消息确定
6.消息完整性?
a.消息分成2部分,一部分是组件内部消息,另一部分是涉及业务的部分
b.内部消息,其实在配置的时间周期内会重新发送重新检查,以便维护集群节点的正确性
c.业务消息,一般是节点发送消息给master,在发送失败的时候,会压入队列,等待选举完成后再解压队列发送,这里有疑问的是,业务消息是否也有可以丢弃的消息,比如如下场景:
x.已知有N个node节点,作为在线客服的websocket的服务端,内部链接又使用site分隔用户
xx.如果要获得某一个站点site有多少人在线,要么遍历所有的node节点,然后统计数据,要么所有节点有信息的时候告知某一个点,很显然在这个需求中,各node节点有在线数变更的时候主动通知是最合适的
xxx.比如node1告知node1:site1:30,node2告知node2:site1:25 信息归总到master中,master中本地缓存这个数据,当有变动的时候,推送给所有客服节点即可!
xxxx.这里要处理的就是,某一个节点宕机的情况,比如node5不可用了,如何告知master他掉了!
1.Master的选举逻辑
a.当前组件集群支持从1到N,这个数量理论上不是无上限的,要基于配置文件中的最长交互时间决定
b.当当前没有Master的时候,会从节点队列中问询谁是master,被问询的如果也没有master信息,则被问询方直接成为master,并进入master的选举环节
c.节点交互会重置最后交互时间,主要在于master和cluster
d.组件会定期检查这个交互时间是否过期,如果过期了,会尝试健康检查,如果不通,则进入选举模式
2.选举原则
a.谁先启动谁为主
b.谁被告知消息不符,谁放弃,进行master查找
1.当前组件的代码还没有经过严格测试,我估计还有问题的地方在于
a.时效性的问题,虽然可以修改检查的间隔时间,但是这个时间也是有间隙的
b.选举间隔,产生太多消息的话?消息重复的话!压缩的消息是否对业务有影响
c.如果同时多个节点掉线,那么这个选举的时间会更长
d.当前集群模式将在后续更新到PasteSpider的集群策略中,替换到目前的版本!
1.如果没有注入节点,则默认为master
1.PushMsgToMaster的msg_type没有跟随赋值的问题!
1.添加对xml的支持,也就是文档
2.集群的消息,修正2和修正5 5表示自己不再是master了
3.准备启用新的版本号yy.MM.dd.version
1.消息添加from_api,表示这个消息是从其他节点过来的,因为节点之间的通讯是api形式,这样可以避免消息回环!(如果自己是master,消息都是来自channel,这时候就要区分来源了)
2.mag_type划分0|>10表示业务的类型,1为当前节点选举为master,可以在这个之后处理一些master的事宜,其他的类型留空
3.添加发送消息的模式,可以直接对所有节点发送消息,因为选举完成后,每一个节点上存储的节点消息都是同样的
4.节点消息有host name id group 这几个除了host是严格排重的,其他的没有排重的说法,需要基于业务自己处理,比如我有一个业务,某些linux使用某个name的节点进行处理任务
5.节点列表居然没有自己!!! master的时候 判断是否有自己,如果没有,则执行scan
==自实现模式==
1.可以加入一种自己查找模式,就是在各自节点启动的时候,向某一个节点加入自己的信息,然后读取信息,然后有人先写入自己的Host信息,则组建这个集群了!
2.Hashset模式,由master进行管理(这里主要是指remove的时候,需要业务上进行处理) field使用host value使用PasteNodeModel的信息
3.当前这种模式在于需要有公共的缓存对象,比如redis,还有需要处理msg_type==4的时候的数据,用于移除节点信息
1.消息交互之间,添加来源信息,添加来源的当前配置信息,这样用于同步集群消息,比如node_count master_host等,通过这个可以判断是否出现了多个集群的情况
2.消息交互采用最后消息时间戳的形式,所有有附带from_node这样可以更加灵活的知道某一个节点是否离线太久了
3.离线判断采用相互模式,就是master监听对所有节点的最小交互时间,每个节点检查自己对master的最后交互时间,从而处理node离线或者master离线的事宜
4.内部采用2个Channel,一个是集群内部使用的,其实是为了提高响应速度,毕竟修改成队列模式了,还有一个就是和业务有关的队列了!无论是Master或者Node都需要消费这个队列来处理任务!有点类似EventBus
5.选举采用先进原则,不采用他们说的单偶模式,采用vote_time和id双重排序,谁先谁就是master,确定后快速确认一遍,然后群发,这样可以极大的提高选举的效率和防止出现多个小集群的情况
6.发送消息模式,可以基于节点的id/name/group等模式发送
1.修复master的节点总数小于分支的节点总数,分支没有减少的Bug!
2.修复由于定时器重叠引起的互锁的问题!
4.PasteTask项目引入PasteCluster插件,实现集群部署!
5.修复集群分层现象,就是master是一个,但是不同节点有不一样的集群信息,master都存在
6.节点上报是有先后的,那就有了
a.1
b.2 1
c.3 2 1
d.4 3 2 1
e.5.4.3.2.1
7.scan的时候需要查询非自己的节点 如果全部不能访问,则自己是master
8.添加状态互斥代码,选举中状态等!减少不必要的问询!
9.查找master的时候,全部全部遍历查找,如果有多少,个选定最小那个!提升选举速度!可以极大的减少由于多次触发选举造成的流量混乱
10.master的信息,在特定的情况下,按需发送,不一定群发!
11.获取到master后,需要主动通知下,被通知的,如果不是master,则需要把master信息返回
12.为了防止游离的发生,被ask后,通知master群发master信息,用于进一步广播master信息
1.新的发现,master居然不在nodes里面!!这是如何发生的?
2.当自己成为master后,检查自己是否在列表,如果不在,则添加进去
3.添加log的日志记录
1.修改之前的async Task代码 改成async Task 更加安全!