当前位置:网站首页>Redis -- lattice connects to redis cluster
Redis -- lattice connects to redis cluster
2022-07-01 08:51:00 【FlyLikeButterfly】
Lettuce Connect redis Clusters use cluster specific classes , image RedisClusterClient、StatefulRedisClusterConnection、RedisAdvancedClusterCommands、StatefulRedisClusterPubSubConnection wait ;
Lettuce Yes redis cluster Support for :
- Support all Cluster command ;
- Routing node based on key hash slot ;
- High level abstraction of cluster commands ;
- Execute commands on multiple cluster nodes ;
- Handle MOVED and ASK Redirect ;
- Through slots and ip The port is directly connected to the cluster node ;
- SSL And authentication ;
- Periodic and adaptive cluster topology updates ;
- Publish subscribe ;
At least one cluster node can be connected when starting , It can automatically topology all nodes of the cluster ; You can also use ReadFrom Set the read data source , Same as master-slave mode ;
although redis Its own multi key command requirements key Must be in the same slot , but Lettuce Some commands are optimized , Multi key commands can be executed across slots , By decomposing the operation commands for different slot keys into multiple commands , A single command starts with fork/join Run concurrently , Finally, merge the results and return ;
Commands that can span slots are :
- DEL: Delete key , Returns the deleted quantity ;
- EXISTS: Count the number of keys that exist across slots ;
- MGET: Get the values of all given keys , Order returns in the order of the keys ;
- MSET: Batch save key value pairs , Always returns OK;
- TOUCH: Change the last access time of a given key , Returns the number of keys changed ;
- UNLINK: Delete the key and reclaim memory in a different thread , Returns the deleted quantity ;
Provide cross slot commands api:RedisAdvancedClusterCommands、RedisAdvancedClusterAsyncCommands、RedisAdvancedClusterReactiveCommands;
The commands that can be executed on multiple cluster nodes are :
- CLIENT SETNAME: Set the name of the client on all known cluster nodes , Always returns OK;
- KEYS: Back to all master Stored on key;
- DBSIZE: Back to all master Stored on key The number of ;
- FLUSHALL: Empty master All the data on , Always returns OK;
- FLUSHDB: Empty master All the data on , Always returns OK;
- RANDOMKEY: From random master Return random on key;
- SCAN: according to ReadFrom Set the key space for scanning the entire cluster ;
- SCRIPT FLUSH: Delete all scripts from the script cache of all cluster nodes ;
- SCRIPT LOAD: Load on all cluster nodes lua Script ;
- SCRIPT KILL: Kill the script on all cluster nodes ;( Even if the script does not run, the call will not fail )
- SHUTDOWN: Save dataset synchronization to disk , Then shut down all nodes of the cluster ;
About publishing subscriptions :
Publish and subscribe to ordinary user space ,redis The cluster will send to each node , Publishers and subscribers do not need to be on the same node , Normal subscription publishing messages can be reconnected when the cluster topology changes . For key space Events , It will only be sent to its own node , It will not spread to other nodes , To subscribe to key space events, you can subscribe to appropriate multiple nodes , Or use RedisClusterClient Information dissemination and NodeSelection API Get a collection of managed connections ;
Be careful : Because of master-slave synchronization , Keys will be copied to multiple slave nodes , In particular, key expiration events , Expiration events will be generated on both master and slave nodes , If the subscription is from the node , You may receive multiple identical expiration Events ; The subscription is through NodeSelection API Or a single node calls subscribe(…) Emitted , The subscription is invalid for the new node ;
test Demo:(redis edition 7.0.2,Lettuce edition 6.1.8)
Cluster nodes : virtual machine 192.168.1.31, port 9001-9006, Cluster node has been set notify-keyspace-events AK;
/**
* 2022 year 6 month 23 The morning of 9:41:47
*/
package testlettuce;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.lettuce.core.ClientOptions.DisconnectedBehavior;
import io.lettuce.core.KeyScanCursor;
import io.lettuce.core.KeyValue;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisURI;
import io.lettuce.core.ScanCursor;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.SslOptions;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.Executions;
import io.lettuce.core.cluster.api.sync.NodeSelection;
import io.lettuce.core.cluster.api.sync.NodeSelectionCommands;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.async.NodeSelectionPubSubAsyncCommands;
import io.lettuce.core.cluster.pubsub.api.async.PubSubAsyncNodeSelection;
import io.lettuce.core.cluster.pubsub.api.reactive.RedisClusterPubSubReactiveCommands;
import io.lettuce.core.protocol.DecodeBufferPolicies;
import io.lettuce.core.protocol.ProtocolVersion;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
/**
* @author XWF
*
*/
public class TestLettuceCluster {
/**
* @param args
*/
public static void main(String[] args) {
List<RedisURI> nodeList = new ArrayList<>();
nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9001).withAuthentication("default", "123456").build());
nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9002).withAuthentication("default", "123456").build());
nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9003).withAuthentication("default", "123456").build());
nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9004).withAuthentication("default", "123456").build());
nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9005).withAuthentication("default", "123456").build());
nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9006).withAuthentication("default", "123456").build());
RedisClusterClient clusterClient = RedisClusterClient.create(nodeList);
ClusterTopologyRefreshOptions clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(5L))// Set the adaptive topology refresh timeout , Refresh once per timeout , Default 30s;
.closeStaleConnections(false)// Whether to close invalid connections when refreshing topology , Default true,isPeriodicRefreshEnabled() by true Effective when ;
.dynamicRefreshSources(true)// Discover new nodes from the topology , And take the new node as the source node of the topology , Dynamic refresh can discover all nodes and calculate the number of each client , Set up false Then only the initial node is the source and the number of clients is calculated ;
.enableAllAdaptiveRefreshTriggers()// Enable all triggers to adaptively refresh the topology , Off by default ;
.enablePeriodicRefresh(Duration.ofSeconds(5L))// Enable scheduled topology refresh and set the cycle ;
.refreshTriggersReconnectAttempts(3)// Long connection reconnect attempt n Only topology refresh
.build();
ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
.autoReconnect(true)// Turn automatic reconnection on or off when a connection is lost , Default true;
.cancelCommandsOnReconnectFailure(true)// Allow to cancel the queue command when reconnection fails , Default false;
.decodeBufferPolicy(DecodeBufferPolicies.always())// Set the policy of discarding decoding buffer , To reclaim memory ;always: Discard after decoding , Maximum memory efficiency ;alwaysSome: After decoding, part of it is discarded ;ratio(n) Discard based on ratio ,n/(1+n), Usually use 1-10 Corresponding 50%-90%;
.disconnectedBehavior(DisconnectedBehavior.DEFAULT)// Set the calling behavior of the command when the connection is disconnected , Reconnection is enabled by default ;DEFAULT: When enabled, receive commands during reconnection , When disabled, the command is rejected during reconnection ;ACCEPT_COMMANDS: Receive commands during reconnection ;REJECT_COMMANDS: Command rejected during reconnection ;
// .maxRedirects(5)// When keys are migrated from one node to another , Cluster redirection times , Default 5;
// .nodeFilter(nodeFilter)// Set node filter
// .pingBeforeActivateConnection(true)// Set before activating the connection PING, Default true;
// .protocolVersion(ProtocolVersion.RESP3)// Set protocol version , Default RESP3;
// .publishOnScheduler(false)// Use a dedicated scheduler to send a response signal , Default false, When enabled, data signals will be sent using the multithreading of the service ;
// .requestQueueSize(requestQueueSize)// Set the size of each connection request queue ;
// .scriptCharset(scriptCharset)// Set up Lua The script code is byte[] Character set for , Default StandardCharsets.UTF_8;
// .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(10)).keepAlive(true).tcpNoDelay(true).build())// Set the properties of low-level socket
// .sslOptions(SslOptions.builder().build())// Set up ssl attribute
// .suspendReconnectOnProtocolFailure(false)// Pause reconnection when protocol failure occurs during reconnection (SSL verification , Before connection failure PING), The default value is false;
// .timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(10)))// Set the timeout to cancel and terminate the command ;
.topologyRefreshOptions(clusterTopologyRefreshOptions)// Set topology update settings
.validateClusterNodeMembership(true)// Before allowing connections to cluster nodes , Verify the cluster node membership , The default value is true;
.build();
clusterClient.setDefaultTimeout(Duration.ofSeconds(5L));
clusterClient.setOptions(clusterClientOptions);
StatefulRedisClusterConnection<String, String> clusterConn = clusterClient.connect();
clusterConn.setReadFrom(ReadFrom.ANY);// Set which nodes to read data from ;
RedisAdvancedClusterCommands<String, String> clusterCmd = clusterConn.sync();
clusterCmd.set("a", "A");
clusterCmd.set("b", "B");
clusterCmd.set("c", "C");
clusterCmd.set("d", "D");
System.out.println("get a=" + clusterCmd.get("a"));
System.out.println("get b=" + clusterCmd.get("b"));
System.out.println("get c=" + clusterCmd.get("c"));
System.out.println("get d=" + clusterCmd.get("d"));
// Cross slot command
Map<String, String> kvmap = new HashMap<>();
kvmap.put("a", "AA");
kvmap.put("b", "BB");
kvmap.put("c", "CC");
kvmap.put("d", "DD");
clusterCmd.mset(kvmap);//Lettuce Optimized , Cross slot commands that support some commands ;
System.out.println("Lettuce mget:" + clusterCmd.mget("a", "b", "c", "d"));
// Select some node operations
NodeSelection<String, String> replicas = clusterCmd.replicas();
NodeSelectionCommands<String, String> replicaseCmd = replicas.commands();
Executions<KeyScanCursor<String>> executions = replicaseCmd.scan(ScanCursor.INITIAL);
executions.forEach(s -> {System.out.println(s.getKeys());});
// Subscribe to published messages
StatefulRedisClusterPubSubConnection<String, String> pubSubConn = clusterClient.connectPubSub();
pubSubConn.addListener(new RedisPubSubListener<String, String>() {
@Override
public void message(String channel, String message) {
System.out.println("[message]ch:" + channel + ",msg:" + message);
}
@Override
public void message(String pattern, String channel, String message) {
}
@Override
public void subscribed(String channel, long count) {
System.out.println("[subscribed]ch:" + channel);
}
@Override
public void psubscribed(String pattern, long count) {
}
@Override
public void unsubscribed(String channel, long count) {
}
@Override
public void punsubscribed(String pattern, long count) {
}
});
pubSubConn.sync().subscribe("TEST_Ch");//( The callback uses blocking calls internally or lettuce Sync api call , Asynchronous subscription is required )
clusterCmd.publish("TEST_Ch", "MSGMSGMSG");
// Responsive subscription , Can monitor ChannelMessage and PatternMessage, Use chain filtering to process calculation and other operations
RedisClusterPubSubReactiveCommands<String, String> pubsubReactive = pubSubConn.reactive();
pubsubReactive.subscribe("TEST_Ch2").subscribe();
pubsubReactive.observeChannels()
.filter(chmsg -> {return chmsg.getMessage().contains("tom");})
.doOnNext(chmsg -> {System.out.println("<tom>" + chmsg.getChannel() + ">>" + chmsg.getMessage());})
.subscribe();
clusterCmd.publish("TEST_Ch2", "send to jerry");
clusterCmd.publish("TEST_Ch", "tom MSG");
clusterCmd.publish("TEST_Ch2", "this is tom");
//keySpaceEvent event
StatefulRedisClusterPubSubConnection<String, String> clusterPubSubConn = clusterClient.connectPubSub();
clusterPubSubConn.setNodeMessagePropagation(true);// Enable disable node message propagation to this listener, For example, key event notifications that can only be notified in this node ;
RedisPubSubListener<String, String> listener = new RedisPubSubListener<String, String>() {
@Override
public void unsubscribed(String channel, long count) {
System.out.println("unsubscribed_ch:" + channel);
}
@Override
public void subscribed(String channel, long count) {
System.out.println("subscribed_ch:" + channel);
}
@Override
public void punsubscribed(String pattern, long count) {
System.out.println("punsubscribed_pattern:" + pattern);
}
@Override
public void psubscribed(String pattern, long count) {
System.out.println("psubscribed_pattern:" + pattern);
}
@Override
public void message(String pattern, String channel, String message) {
System.out.println("message_pattern:" + pattern + " ch:" + channel + " msg:" + message);
}
@Override
public void message(String channel, String message) {
System.out.println("message_ch:" + channel + " msg:" + message);
}
};
clusterPubSubConn.addListener(listener);
PubSubAsyncNodeSelection<String, String> allPubSubAsyncNodeSelection = clusterPubSubConn.async().all();
NodeSelectionPubSubAsyncCommands<String, String> pubsubAsyncCmd = allPubSubAsyncNodeSelection.commands();
clusterCmd.setex("a", 1, "A");
pubsubAsyncCmd.psubscribe("[email protected]__:*");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end");
}
}
Running results :
边栏推荐
- It is designed with high bandwidth, which is almost processed into an open circuit?
- It technology ebook collection
- Software Engineer Interview Question brushing website and experience method
- Nacos - 服务发现
- 小鸟识别APP
- Programming with C language: calculate with formula: e ≈ 1+1/1+ 1/2! …+ 1/n!, Accuracy is 10-6
- FreeRTOS学习简易笔记
- 足球篮球体育比赛比分直播平台源码/app开发建设项目
- Model and view of QT
- 如何做好固定资产管理?易点易动提供智能化方案
猜你喜欢
Brief introduction to AES
Computer tips
Public network cluster intercom +gps visual tracking | help the logistics industry with intelligent management and scheduling
《微机原理》-绪论
1.jetson与摄像头的对接
ARM v7的体系结构A、R、M区别,分别应用在什么领域?
Introduction to R language
Matlab [function derivation]
足球篮球体育比赛比分直播平台源码/app开发建设项目
Matlab tips (16) consistency verification of matrix eigenvector eigenvalue solution -- analytic hierarchy process
随机推荐
JCL and slf4j
Centos7 shell script one click installation of JDK, Mongo, Kafka, FTP, PostgreSQL, PostGIS, pgrouting
win7 pyinstaller打包exe 后报错 DLL load failed while importing _socket:参数错误
MD文档中插入数学公式,Typora中插入数学公式
Brief introduction to AES
如何高效拉齐团队认知
3. Detailed explanation of Modbus communication protocol
Jeecg restart alarm 40001
Vscode customize the color of each area
Matlab [function derivation]
如何一站式高效管理固定资产?
How can enterprises and developers take the lead in the outbreak of cloud native landing?
中考体育项目满分标准(深圳、安徽、湖北)
一文纵览主流 NFT 市场平台版税、服务费设计
明明设计的是高带宽,差点加工成开路?
用C语言编程:用公式计算:e≈1+1/1!+1/2! …+1/n!,精度为10-6
Shell script - positional parameters (command line parameters)
Only in China! Alicloud container service enters the Forrester leader quadrant
集团公司固定资产管理的痛点和解决方案
[untitled]