当前位置:网站首页>Redis -- lattice connects to redis cluster
Redis -- lattice connects to redis cluster
2022-07-01 08:51:00 【FlyLikeButterfly】
Lettuce Connect redis Clusters use cluster specific classes , image RedisClusterClient、StatefulRedisClusterConnection、RedisAdvancedClusterCommands、StatefulRedisClusterPubSubConnection wait ;
Lettuce Yes redis cluster Support for :
- Support all Cluster command ;
- Routing node based on key hash slot ;
- High level abstraction of cluster commands ;
- Execute commands on multiple cluster nodes ;
- Handle MOVED and ASK Redirect ;
- Through slots and ip The port is directly connected to the cluster node ;
- SSL And authentication ;
- Periodic and adaptive cluster topology updates ;
- Publish subscribe ;
At least one cluster node can be connected when starting , It can automatically topology all nodes of the cluster ; You can also use ReadFrom Set the read data source , Same as master-slave mode ;
although redis Its own multi key command requirements key Must be in the same slot , but Lettuce Some commands are optimized , Multi key commands can be executed across slots , By decomposing the operation commands for different slot keys into multiple commands , A single command starts with fork/join Run concurrently , Finally, merge the results and return ;
Commands that can span slots are :
- DEL: Delete key , Returns the deleted quantity ;
- EXISTS: Count the number of keys that exist across slots ;
- MGET: Get the values of all given keys , Order returns in the order of the keys ;
- MSET: Batch save key value pairs , Always returns OK;
- TOUCH: Change the last access time of a given key , Returns the number of keys changed ;
- UNLINK: Delete the key and reclaim memory in a different thread , Returns the deleted quantity ;
Provide cross slot commands api:RedisAdvancedClusterCommands、RedisAdvancedClusterAsyncCommands、RedisAdvancedClusterReactiveCommands;
The commands that can be executed on multiple cluster nodes are :
- CLIENT SETNAME: Set the name of the client on all known cluster nodes , Always returns OK;
- KEYS: Back to all master Stored on key;
- DBSIZE: Back to all master Stored on key The number of ;
- FLUSHALL: Empty master All the data on , Always returns OK;
- FLUSHDB: Empty master All the data on , Always returns OK;
- RANDOMKEY: From random master Return random on key;
- SCAN: according to ReadFrom Set the key space for scanning the entire cluster ;
- SCRIPT FLUSH: Delete all scripts from the script cache of all cluster nodes ;
- SCRIPT LOAD: Load on all cluster nodes lua Script ;
- SCRIPT KILL: Kill the script on all cluster nodes ;( Even if the script does not run, the call will not fail )
- SHUTDOWN: Save dataset synchronization to disk , Then shut down all nodes of the cluster ;
About publishing subscriptions :
Publish and subscribe to ordinary user space ,redis The cluster will send to each node , Publishers and subscribers do not need to be on the same node , Normal subscription publishing messages can be reconnected when the cluster topology changes . For key space Events , It will only be sent to its own node , It will not spread to other nodes , To subscribe to key space events, you can subscribe to appropriate multiple nodes , Or use RedisClusterClient Information dissemination and NodeSelection API Get a collection of managed connections ;
Be careful : Because of master-slave synchronization , Keys will be copied to multiple slave nodes , In particular, key expiration events , Expiration events will be generated on both master and slave nodes , If the subscription is from the node , You may receive multiple identical expiration Events ; The subscription is through NodeSelection API Or a single node calls subscribe(…) Emitted , The subscription is invalid for the new node ;
test Demo:(redis edition 7.0.2,Lettuce edition 6.1.8)
Cluster nodes : virtual machine 192.168.1.31, port 9001-9006, Cluster node has been set notify-keyspace-events AK;
/**
* 2022 year 6 month 23 The morning of 9:41:47
*/
package testlettuce;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.lettuce.core.ClientOptions.DisconnectedBehavior;
import io.lettuce.core.KeyScanCursor;
import io.lettuce.core.KeyValue;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisURI;
import io.lettuce.core.ScanCursor;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.SslOptions;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.Executions;
import io.lettuce.core.cluster.api.sync.NodeSelection;
import io.lettuce.core.cluster.api.sync.NodeSelectionCommands;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.async.NodeSelectionPubSubAsyncCommands;
import io.lettuce.core.cluster.pubsub.api.async.PubSubAsyncNodeSelection;
import io.lettuce.core.cluster.pubsub.api.reactive.RedisClusterPubSubReactiveCommands;
import io.lettuce.core.protocol.DecodeBufferPolicies;
import io.lettuce.core.protocol.ProtocolVersion;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
/**
* @author XWF
*
*/
public class TestLettuceCluster {
/**
* @param args
*/
public static void main(String[] args) {
List<RedisURI> nodeList = new ArrayList<>();
nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9001).withAuthentication("default", "123456").build());
nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9002).withAuthentication("default", "123456").build());
nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9003).withAuthentication("default", "123456").build());
nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9004).withAuthentication("default", "123456").build());
nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9005).withAuthentication("default", "123456").build());
nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9006).withAuthentication("default", "123456").build());
RedisClusterClient clusterClient = RedisClusterClient.create(nodeList);
ClusterTopologyRefreshOptions clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(5L))// Set the adaptive topology refresh timeout , Refresh once per timeout , Default 30s;
.closeStaleConnections(false)// Whether to close invalid connections when refreshing topology , Default true,isPeriodicRefreshEnabled() by true Effective when ;
.dynamicRefreshSources(true)// Discover new nodes from the topology , And take the new node as the source node of the topology , Dynamic refresh can discover all nodes and calculate the number of each client , Set up false Then only the initial node is the source and the number of clients is calculated ;
.enableAllAdaptiveRefreshTriggers()// Enable all triggers to adaptively refresh the topology , Off by default ;
.enablePeriodicRefresh(Duration.ofSeconds(5L))// Enable scheduled topology refresh and set the cycle ;
.refreshTriggersReconnectAttempts(3)// Long connection reconnect attempt n Only topology refresh
.build();
ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
.autoReconnect(true)// Turn automatic reconnection on or off when a connection is lost , Default true;
.cancelCommandsOnReconnectFailure(true)// Allow to cancel the queue command when reconnection fails , Default false;
.decodeBufferPolicy(DecodeBufferPolicies.always())// Set the policy of discarding decoding buffer , To reclaim memory ;always: Discard after decoding , Maximum memory efficiency ;alwaysSome: After decoding, part of it is discarded ;ratio(n) Discard based on ratio ,n/(1+n), Usually use 1-10 Corresponding 50%-90%;
.disconnectedBehavior(DisconnectedBehavior.DEFAULT)// Set the calling behavior of the command when the connection is disconnected , Reconnection is enabled by default ;DEFAULT: When enabled, receive commands during reconnection , When disabled, the command is rejected during reconnection ;ACCEPT_COMMANDS: Receive commands during reconnection ;REJECT_COMMANDS: Command rejected during reconnection ;
// .maxRedirects(5)// When keys are migrated from one node to another , Cluster redirection times , Default 5;
// .nodeFilter(nodeFilter)// Set node filter
// .pingBeforeActivateConnection(true)// Set before activating the connection PING, Default true;
// .protocolVersion(ProtocolVersion.RESP3)// Set protocol version , Default RESP3;
// .publishOnScheduler(false)// Use a dedicated scheduler to send a response signal , Default false, When enabled, data signals will be sent using the multithreading of the service ;
// .requestQueueSize(requestQueueSize)// Set the size of each connection request queue ;
// .scriptCharset(scriptCharset)// Set up Lua The script code is byte[] Character set for , Default StandardCharsets.UTF_8;
// .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(10)).keepAlive(true).tcpNoDelay(true).build())// Set the properties of low-level socket
// .sslOptions(SslOptions.builder().build())// Set up ssl attribute
// .suspendReconnectOnProtocolFailure(false)// Pause reconnection when protocol failure occurs during reconnection (SSL verification , Before connection failure PING), The default value is false;
// .timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(10)))// Set the timeout to cancel and terminate the command ;
.topologyRefreshOptions(clusterTopologyRefreshOptions)// Set topology update settings
.validateClusterNodeMembership(true)// Before allowing connections to cluster nodes , Verify the cluster node membership , The default value is true;
.build();
clusterClient.setDefaultTimeout(Duration.ofSeconds(5L));
clusterClient.setOptions(clusterClientOptions);
StatefulRedisClusterConnection<String, String> clusterConn = clusterClient.connect();
clusterConn.setReadFrom(ReadFrom.ANY);// Set which nodes to read data from ;
RedisAdvancedClusterCommands<String, String> clusterCmd = clusterConn.sync();
clusterCmd.set("a", "A");
clusterCmd.set("b", "B");
clusterCmd.set("c", "C");
clusterCmd.set("d", "D");
System.out.println("get a=" + clusterCmd.get("a"));
System.out.println("get b=" + clusterCmd.get("b"));
System.out.println("get c=" + clusterCmd.get("c"));
System.out.println("get d=" + clusterCmd.get("d"));
// Cross slot command
Map<String, String> kvmap = new HashMap<>();
kvmap.put("a", "AA");
kvmap.put("b", "BB");
kvmap.put("c", "CC");
kvmap.put("d", "DD");
clusterCmd.mset(kvmap);//Lettuce Optimized , Cross slot commands that support some commands ;
System.out.println("Lettuce mget:" + clusterCmd.mget("a", "b", "c", "d"));
// Select some node operations
NodeSelection<String, String> replicas = clusterCmd.replicas();
NodeSelectionCommands<String, String> replicaseCmd = replicas.commands();
Executions<KeyScanCursor<String>> executions = replicaseCmd.scan(ScanCursor.INITIAL);
executions.forEach(s -> {System.out.println(s.getKeys());});
// Subscribe to published messages
StatefulRedisClusterPubSubConnection<String, String> pubSubConn = clusterClient.connectPubSub();
pubSubConn.addListener(new RedisPubSubListener<String, String>() {
@Override
public void message(String channel, String message) {
System.out.println("[message]ch:" + channel + ",msg:" + message);
}
@Override
public void message(String pattern, String channel, String message) {
}
@Override
public void subscribed(String channel, long count) {
System.out.println("[subscribed]ch:" + channel);
}
@Override
public void psubscribed(String pattern, long count) {
}
@Override
public void unsubscribed(String channel, long count) {
}
@Override
public void punsubscribed(String pattern, long count) {
}
});
pubSubConn.sync().subscribe("TEST_Ch");//( The callback uses blocking calls internally or lettuce Sync api call , Asynchronous subscription is required )
clusterCmd.publish("TEST_Ch", "MSGMSGMSG");
// Responsive subscription , Can monitor ChannelMessage and PatternMessage, Use chain filtering to process calculation and other operations
RedisClusterPubSubReactiveCommands<String, String> pubsubReactive = pubSubConn.reactive();
pubsubReactive.subscribe("TEST_Ch2").subscribe();
pubsubReactive.observeChannels()
.filter(chmsg -> {return chmsg.getMessage().contains("tom");})
.doOnNext(chmsg -> {System.out.println("<tom>" + chmsg.getChannel() + ">>" + chmsg.getMessage());})
.subscribe();
clusterCmd.publish("TEST_Ch2", "send to jerry");
clusterCmd.publish("TEST_Ch", "tom MSG");
clusterCmd.publish("TEST_Ch2", "this is tom");
//keySpaceEvent event
StatefulRedisClusterPubSubConnection<String, String> clusterPubSubConn = clusterClient.connectPubSub();
clusterPubSubConn.setNodeMessagePropagation(true);// Enable disable node message propagation to this listener, For example, key event notifications that can only be notified in this node ;
RedisPubSubListener<String, String> listener = new RedisPubSubListener<String, String>() {
@Override
public void unsubscribed(String channel, long count) {
System.out.println("unsubscribed_ch:" + channel);
}
@Override
public void subscribed(String channel, long count) {
System.out.println("subscribed_ch:" + channel);
}
@Override
public void punsubscribed(String pattern, long count) {
System.out.println("punsubscribed_pattern:" + pattern);
}
@Override
public void psubscribed(String pattern, long count) {
System.out.println("psubscribed_pattern:" + pattern);
}
@Override
public void message(String pattern, String channel, String message) {
System.out.println("message_pattern:" + pattern + " ch:" + channel + " msg:" + message);
}
@Override
public void message(String channel, String message) {
System.out.println("message_ch:" + channel + " msg:" + message);
}
};
clusterPubSubConn.addListener(listener);
PubSubAsyncNodeSelection<String, String> allPubSubAsyncNodeSelection = clusterPubSubConn.async().all();
NodeSelectionPubSubAsyncCommands<String, String> pubsubAsyncCmd = allPubSubAsyncNodeSelection.commands();
clusterCmd.setex("a", 1, "A");
pubsubAsyncCmd.psubscribe("[email protected]__:*");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end");
}
}
Running results :

边栏推荐
- Shell脚本-数组定义以及获取数组元素
- MySQL8.0学习记录17 -Create Table
- V79.01 Hongmeng kernel source code analysis (user mode locking) | how to use the fast lock futex (Part 1) | hundreds of blogs analyze the openharmony source code
- FreeRTOS learning easy notes
- Shell script case in statement
- Matlab [function derivation]
- 集团公司固定资产管理的痛点和解决方案
- yolov5训练可视化指标的含义
- Jetson Nano 安装TensorFlow GPU及问题解决
- 《微机原理》——微处理器内部及外部结构
猜你喜欢

Public network cluster intercom +gps visual tracking | help the logistics industry with intelligent management and scheduling

Nacos - service discovery

MD文档中插入数学公式,Typora中插入数学公式

Performance improvement 2-3 times! The second generation Kunlun core server of Baidu AI Cloud was launched

如何做好固定资产管理?易点易动提供智能化方案

截图小妙招

钓鱼识别app

Computer tips

19Mn6 German standard pressure vessel steel plate 19Mn6 Wugang fixed binding 19Mn6 chemical composition

Glitch Free时钟切换技术
随机推荐
中小企业固定资产管理办法哪种好?
避免按钮重复点击的小工具bimianchongfu.queren()
Nacos - 配置管理
《微机原理》-绪论
Redis——Lettuce连接redis集群
JCL 和 SLF4J
R语言观察日志(part24)--初始化设置
Shell script echo command escape character
Share 7 books I read in the first half of 2022
嵌入式工程师常见面试题2-MCU_STM32
Full mark standard for sports items in the high school entrance examination (Shenzhen, Anhui and Hubei)
【MFC开发(16)】树形控件Tree Control
Shell script -while loop explanation
《微机原理》——微处理器内部及外部结构
Glitch Free时钟切换技术
1. Connection between Jetson and camera
小鸟识别APP
易点易动助力企业设备高效管理,提升设备利用率
Matlab tips (23) matrix analysis -- simulated annealing
jeecg 重启报40001