当前位置:网站首页>Ribbon learning notes II
Ribbon learning notes II
2022-07-29 05:57:00 【People with different feelings】
Maintenance service instance list
The maintenance of service instance list is mainly in abstract classes BaseLoadBalancer To realize . stay BaseLoadBalancer Class mainly implements :
1、 Maintained “up” Service list of status and list of all service instances
2、 adopt IPing Set the status of the service instance list
List of service instances
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
Add service instance
stay BaseLoadBalancer Class , There are three ways to overload , As shown below :
@Override
public void addServers(List<Server> newServers) {
if (newServers != null && newServers.size() > 0) {
try {
ArrayList<Server> newList = new ArrayList<Server>();
newList.addAll(allServerList);
newList.addAll(newServers);
setServersList(newList);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
}
}
}
void addServers(Object[] newServers) {
if ((newServers != null) && (newServers.length > 0)) {
try {
ArrayList<Server> newList = new ArrayList<Server>();
newList.addAll(allServerList);
for (Object server : newServers) {
if (server != null) {
if (server instanceof String) {
server = new Server((String) server);
}
if (server instanceof Server) {
newList.add((Server) server);
}
}
}
setServersList(newList);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
}
}
}
public void addServer(Server newServer) {
if (newServer != null) {
try {
ArrayList<Server> newList = new ArrayList<Server>();
newList.addAll(allServerList);
newList.add(newServer);
setServersList(newList);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error adding newServer {}", name, newServer.getHost(), e);
}
}
}
The previous three overloaded methods , It's all through setServersList() The method is implemented as all and up Assign values to two lists .
public void setServersList(List lsrv) {
// Get write lock
Lock writeLock = allServerLock.writeLock();
logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);
// Define list set
ArrayList<Server> newServers = new ArrayList<Server>();
// Lock
writeLock.lock();
try {
ArrayList<Server> allServers = new ArrayList<Server>();
for (Object server : lsrv) {
if (server == null) {
// Handle null The situation of
continue;
}
if (server instanceof String) {
// If the storage is service Id, Then create the corresponding server object
server = new Server((String) server);
}
if (server instanceof Server) {
// Add to allServers Variable , That is, convert all service instances into server object , And stored in local variables allServers in
logger.debug("LoadBalancer [{}]: addServer [{}]", name, ((Server) server).getId());
allServers.add((Server) server);
} else {
throw new IllegalArgumentException(
"Type String or Server expected, instead found:"
+ server.getClass());
}
}
// Determine the list of service instances , Is there a change , If sending changes , Then trigger the corresponding monitoring method .
boolean listChanged = false;
if (!allServerList.equals(allServers)) {
listChanged = true;
if (changeListeners != null && changeListeners.size() > 0) {
List<Server> oldList = ImmutableList.copyOf(allServerList);
List<Server> newList = ImmutableList.copyOf(allServers);
for (ServerListChangeListener l: changeListeners) {
try {
l.serverListChanged(oldList, newList);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);
}
}
}
}
if (isEnablePrimingConnections()) {
for (Server server : allServers) {
if (!allServerList.contains(server)) {
server.setReadyToServe(false);
newServers.add((Server) server);
}
}
if (primeConnections != null) {
primeConnections.primeConnectionsAsync(newServers, this);
}
}
// Ignore the previous settings , Reinitialize upServerList Variable
allServerList = allServers;
if (canSkipPing()) {
for (Server s : allServerList) {
s.setAlive(true);
}
upServerList = allServerList;
} else if (listChanged) {
// If a change is sent , Call forceQuickPing() Verify the service instance list
forceQuickPing();
}
} finally {
writeLock.unlock();
}
}
public void forceQuickPing() {
if (canSkipPing()) {
return;
}
logger.debug("LoadBalancer [{}]: forceQuickPing invoking", name);
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error running forceQuickPing()", name, e);
}
}
Inner class Pinger And finally passed pingerStrategy.pingServers() Method realizes the service instance availability verification , Verify the result , To reset upServerList list .
class Pinger {
private final IPingStrategy pingerStrategy;
public Pinger(IPingStrategy pingerStrategy) {
this.pingerStrategy = pingerStrategy;
}
public void runPinger() throws Exception {
if (!pingInProgress.compareAndSet(false, true)) {
return; // Ping in progress - nothing to do
}
// we are "in" - we get to Ping
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
try {
/* * The readLock should be free unless an addServer operation is * going on... */
allLock = allServerLock.readLock();
allLock.lock();
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
int numCandidates = allServers.length;
// Inner class SerialPingStrategy
results = pingerStrategy.pingServers(ping, allServers);
final List<Server> newUpList = new ArrayList<Server>();
final List<Server> changedServers = new ArrayList<Server>();
for (int i = 0; i < numCandidates; i++) {
boolean isAlive = results[i];
Server svr = allServers[i];
boolean oldIsAlive = svr.isAlive();
svr.setAlive(isAlive);
if (oldIsAlive != isAlive) {
changedServers.add(svr);
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
if (isAlive) {
newUpList.add(svr);
}
}
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
upLock.unlock();
notifyServerStatusChangeListener(changedServers);
} finally {
pingInProgress.set(false);
}
}
}
Inner class SerialPingStrategy :
private static class SerialPingStrategy implements IPingStrategy {
@Override
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
for (int i = 0; i < numCandidates; i++) {
results[i] = false; /* Default answer is DEAD. */
try {
if (ping != null) {
results[i] = ping.isAlive(servers[i]);
}
} catch (Exception e) {
logger.error("Exception while pinging Server: '{}'", servers[i], e);
}
}
return results;
}
}
Service instance offline
public void markServerDown(Server server) {
if (server == null || !server.isAlive()) {
return;
}
logger.error("LoadBalancer [{}]: markServerDown called on [{}]", name, server.getId());
server.setAlive(false);
// forceQuickPing();
notifyServerStatusChangeListener(singleton(server));
}
Trigger offline notification .
private void notifyServerStatusChangeListener(final Collection<Server> changedServers) {
if (changedServers != null && !changedServers.isEmpty() && !serverStatusListeners.isEmpty()) {
for (ServerStatusChangeListener listener : serverStatusListeners) {
try {
listener.serverStatusChanged(changedServers);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error invoking server status change listener", name, e);
}
}
}
}
List of service instances object
public interface ServerList<T extends Server> {
// Get the list of initialization service instances
public List<T> getInitialListOfServers();
// Get the updated service instance list
public List<T> getUpdatedListOfServers();
}
The default configuration
//EurekaRibbonClientConfiguration.java
@Configuration(proxyBeanMethods = false)
public class EurekaRibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config,
Provider<EurekaClient> eurekaClientProvider) {
// If you customize , Then use custom
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
// By default DiscoveryEnabledNIWSServerList As a list of service instances
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
}
Get the list of service instances
DiscoveryEnabledNIWSServerList The core method of the class , adopt EurekaClient object , Realization and EurekaServer Interaction , Then get the list of service instances .
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
return obtainServersViaDiscovery();
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
// Get the service instance client
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// Traversing application collection
// Use eurekaClient Get the corresponding InstanceInfo aggregate
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
// Traverse InstanceInfo Instance set
for (InstanceInfo ii : listOfInstanceInfo) {
// obtain up Service instance of state
if (ii.getStatus().equals(InstanceStatus.UP)) {
// Whether to enable rewrite interface , Not enabled by default
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// Use the rewrite interface to set the interface of the service instance
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
// establish DiscoveryEnabledServer object , To add to serverList aggregate
DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
// establish DiscoveryEnabledServer object
protected DiscoveryEnabledServer createServer(final InstanceInfo instanceInfo, boolean useSecurePort, boolean useIpAddr) {
DiscoveryEnabledServer server = new DiscoveryEnabledServer(instanceInfo, useSecurePort, useIpAddr);
// Get availabilty zone for this instance.
EurekaClientConfig clientConfig = eurekaClientProvider.get().getEurekaClientConfig();
String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
String instanceZone = InstanceInfo.getZone(availZones, instanceInfo);
server.setZone(instanceZone);
return server;
}
List of service instances Updater -ServerListUpdater
Any instance may have active online and offline behaviors of developers , Or some network failures and accidents of its own hardware cause the instance to be in different states , Therefore, the list of service instances is also in a constantly changing process . To maintain this list ,Netflix Provides ServerListUpdater Interface , Through it, the list of service instances can be updated in time . And there are two ServerListUpdater Implementation class of interface , Namely EurekaNotificationServerListUpdater and PollingServerListUpdater.
public interface ServerListUpdater {
public interface UpdateAction {
void doUpdate();
}
void start(UpdateAction updateAction);
void stop();
String getLastUpdate();
long getDurationSinceLastUpdateMs();
int getNumberMissedCycles();
int getCoreThreads();
}
The default configuration
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@Import({
HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
}
}
start() Method
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
// Whether the current instance survival state has changed
final Runnable wrapperRunnable = new Runnable() {
// New child thread
@Override
public void run() {
if (!isActive.get()) {
// Not alive , Then exit the thread task
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
// If you survive , to update
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
// Thread task
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,// Thread task
initialDelayMs,// Delay execution time , Default 1s
refreshIntervalMs,// Refresh interval , Default 30s
TimeUnit.MILLISECONDS// Unit millisecond
);
} else {
logger.info("Already active, no-op");
}
}
Service instance heartbeat detection
Through a scheduled task ping The server , This task is every pingIntervalSeconds Once per second , such Eureka The client can get the latest service status . The execution content of the scheduled task here is through PingTask Class to implement
//BaseLoadBalancer.java
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
// Execute once during initialization
forceQuickPing();
}
and PingTask Through inner classes Pinger Of runPinger() Method realization .
class PingTask extends TimerTask {
public void run() {
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error pinging", name, e);
}
}
}
public void forceQuickPing() {
if (canSkipPing()) {
return;
}
logger.debug("LoadBalancer [{}]: forceQuickPing invoking", name);
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error running forceQuickPing()", name, e);
}
}
IPing Interface
The specific example of heartbeat monitoring service is IPing Interface .
public interface IPing {
public boolean isAlive(Server server);
}
stay Spring Boot In Engineering , If not configured Eureka The client of service discovery , Then use DummyPing, It will always return true. If the Eureka The client of service discovery , Then use NIWSDiscoveryPing, It will pass through and Eureka Service governance center communication mechanism to determine . Other implementations ,NoOpPing Is constant return true;PingConstant Is to set a Boolean value (boolean), Give Way isAlive Method always returns this Boolean value ;PingUrl It is through configuration url Request to determine whether the service is available .
NIWSDiscoveryPing
public class NIWSDiscoveryPing extends AbstractLoadBalancerPing {
BaseLoadBalancer lb = null;
public NIWSDiscoveryPing() {
}
public BaseLoadBalancer getLb() {
return lb;
}
public void setLb(BaseLoadBalancer lb) {
this.lb = lb;
}
public boolean isAlive(Server server) {
boolean isAlive = true;
// If you conclude that the service instance is Eureka Examples of service discovery , Use the state of the service instance to determine whether it is available .
if (server!=null && server instanceof DiscoveryEnabledServer){
DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;
InstanceInfo instanceInfo = dServer.getInstanceInfo();
if (instanceInfo!=null){
InstanceStatus status = instanceInfo.getStatus();
if (status!=null){
isAlive = status.equals(InstanceStatus.UP);
}
}
}
return isAlive;
}
@Override
public void initWithNiwsConfig(
IClientConfig clientConfig) {
}
}
Global configuration OR Separate configuration
Global configuration means that the content configured by the service consumer is valid for all service providers . Separate configuration means that the service consumer can configure to be effective for a service provider , But not for other service providers .
Local definition
- <clientName>.ribbon.NFLoadBalancerClassName: Load balancing class , Need to implement .ILoadBalancer Interface .
- <clientName>.ribbon.NFLoadBalancerRuleClassName: Load balancing strategy , Need to implement IRule Interface .
- <clientName>.ribbon.NFLoadBalancerPingClassName: Heartbeat monitoring , Need to implement IPing Interface .
- <clientName>.ribbon.NIWSServerListClassName: Service instance list class , Need to implement ServerList Interface .
- <clientName>.ribbon.NIWSServerListFilterClassName: Service instance list filter class , Need to implement ServerListFilter Interface .
Spring Cloud It also provides @RibbonClient and @RibbonClients. For a single micro service configuration class @RibbonClient, For multiple micro service configurations @RibbonClients.
边栏推荐
- DCAT batch operation popup and parameter transfer
- Read and understand move2earn project - move
- Flutter正在被悄悄放弃?浅析Flutter的未来
- 手撕ORM 框架(泛型+注解+反射)
- iSCSI vs iSER vs NVMe-TCP vs NVMe-RDMA
- Refresh, swagger UI theme changes
- File文件上传的使用(2)--上传到阿里云Oss文件服务器
- asyncawait和promise的区别
- 并发编程学习笔记 之 原子操作类AtomicInteger详解
- 并发编程学习笔记 之 原子操作类AtomicReference、AtomicStampedReference详解
猜你喜欢
Centos7 silently installs Oracle
Reporting Service 2016 自定义身份验证
Reporting Services- Web Service
我的理想工作,码农的绝对自由支配才是最重要的——未来创业的追求
Gluster cluster management analysis
30 knowledge points that must be mastered in quantitative development [what is level-2 data]
Idea using JDBC to connect mysql database personal detailed tutorial
华为2020校招笔试编程题 看这篇就够了(下)
Use of file upload (2) -- upload to Alibaba cloud OSS file server
iSCSI vs iSER vs NVMe-TCP vs NVMe-RDMA
随机推荐
Windos下安装pyspider报错:Please specify --curl-dir=/path/to/built/libcurl解决办法
mysql在查询字符串类型的时候带单引号和不带的区别和原因
与张小姐的春夏秋冬(1)
微信内置浏览器禁止缓存的问题
The Platonic metauniverse advocated by musk has long been verified by platofarm
【数据库】数据库课程设计一一疫苗接种数据库
全闪分布式,如何深度性能POC?
与张小姐的春夏秋冬(2)
The LAAS protocol of defi 2.0 is the key to revitalizing the development of defi track
Reporting Services- Web Service
『全闪实测』数据库加速解决方案
My ideal job, the absolute freedom of coder farmers is the most important - the pursuit of entrepreneurship in the future
Nailing alarm script
机器学习让文字识别更简单:Kotlin+MVVM+华为ML Kit
Realize the scheduled backup of MySQL database in Linux environment through simple script (mysqldump command backup)
Training log 6 of the project "construction of Shandong University mobile Internet development technology teaching website"
H5 semantic label
Research and implementation of flash loan DAPP
“山东大学移动互联网开发技术教学网站建设”项目实训日志二
Android Studio 实现登录注册-源代码 (连接MySql数据库)