当前位置:网站首页>The transformation based on vertx web sstore redis to realize the distributed session of vertx HTTP application
The transformation based on vertx web sstore redis to realize the distributed session of vertx HTTP application
2022-07-05 20:58:00 【forwardMyLife】
1. vertx Based on redis Distributed session Deficiency
vertx It has redis Distributed session The implementation of the . Only the following dependencies need to be introduced
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-sstore-redis</artifactId>
<version>4.1.4</version>
</dependency>
RedisOptions redisOptions = new RedisOptions();
redisOptions.addConnectionString("redis://192.168.72.8:6379");
Redis redis = new RedisClient(vertx,redisOptions);
RedisSessionStore redisSessionStore = RedisSessionStore.create(vertx,redis);
//SessionStore sessionStore = SessionStore.create(vertx);
router.routeWithRegex("/static.*").handler(StaticHandler.create());
router.routeWithRegex(".*service.*")
.handler(SessionHandler.create(redisSessionStore)).handler(ctx->{
if(ctx.request().path().contains("initSession")){
ctx.request().response().setStatusCode(401)
.putHeader("Content-Type","application/json").end((new JsonObject().put("msg","illegal request").encode()));
}else{
ctx.next();
}
})
Just put the original SessionStore Switch to RedisSessionStore that will do .
Its own Redis The client is based on vertx Realized , Yes redis The reading and writing of is asynchronous , It won't block io Threads . But its current RedisStore Implementation has a fatal problem , It cannot directly support the customization of objects or jdk Existing common objects such as ArrayList,HashMap Do serialization and deserialization , except 8 Large basic types and their packaging , Other complex objects must implement their serialization interfaces .ClusterSerializable ,
package io.vertx.core.shareddata.impl;
import io.vertx.core.buffer.Buffer;
/** * Objects implementing this interface will be write to and read from a {@link Buffer} when respectively * stored and read from an {@link io.vertx.core.shareddata.AsyncMap}. * <p> * Implementations must have a public no-argument constructor. * * @author <a href="http://tfox.org">Tim Fox</a> */
public interface ClusterSerializable {
void writeToBuffer(Buffer buffer);
int readFromBuffer(int pos, Buffer buffer);
}
/* * Copyright 2014 Red Hat, Inc. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Apache License v2.0 which accompanies this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * * The Apache License v2.0 is available at * http://www.opensource.org/licenses/apache2.0.php * * You may elect to redistribute this code under either of these licenses. */
package io.vertx.ext.web.sstore.impl;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.shareddata.Shareable;
import io.vertx.core.shareddata.impl.ClusterSerializable;
import io.vertx.ext.auth.VertxContextPRNG;
import io.vertx.ext.web.impl.Utils;
import io.vertx.ext.web.sstore.AbstractSession;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** * @author <a href="http://tfox.org">Tim Fox</a> */
public class SharedDataSessionImpl extends AbstractSession implements ClusterSerializable, Shareable {
private static final Charset UTF8 = StandardCharsets.UTF_8;
private static final byte TYPE_LONG = 1;
private static final byte TYPE_INT = 2;
private static final byte TYPE_SHORT = 3;
private static final byte TYPE_BYTE = 4;
private static final byte TYPE_DOUBLE = 5;
private static final byte TYPE_FLOAT = 6;
private static final byte TYPE_CHAR = 7;
private static final byte TYPE_BOOLEAN = 8;
private static final byte TYPE_STRING = 9;
private static final byte TYPE_BUFFER = 10;
private static final byte TYPE_BYTES = 11;
private static final byte TYPE_CLUSTER_SERIALIZABLE = 13;
/** * Important note: This constructor (even though not referenced anywhere) is required for serialization purposes. Do * not remove. */
public SharedDataSessionImpl() {
super();
}
public SharedDataSessionImpl(VertxContextPRNG random) {
super(random);
}
public SharedDataSessionImpl(VertxContextPRNG random, long timeout, int length) {
super(random, timeout, length);
}
@Override
public void writeToBuffer(Buffer buff) {
byte[] bytes = id().getBytes(UTF8);
buff.appendInt(bytes.length).appendBytes(bytes);
buff.appendLong(timeout());
buff.appendLong(lastAccessed());
buff.appendInt(version());
// use cache
Buffer dataBuf = writeDataToBuffer();
buff.appendBuffer(dataBuf);
}
@Override
public int readFromBuffer(int pos, Buffer buffer) {
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
pos += len;
setId(new String(bytes, UTF8));
setTimeout(buffer.getLong(pos));
pos += 8;
setLastAccessed(buffer.getLong(pos));
pos += 8;
setVersion(buffer.getInt(pos));
pos += 4;
pos = readDataFromBuffer(pos, buffer);
return pos;
}
private Buffer writeDataToBuffer() {
Buffer buffer = Buffer.buffer();
if (isEmpty()) {
buffer.appendInt(0);
} else {
final Map<String, Object> data = data();
buffer.appendInt(data.size());
for (Map.Entry<String, Object> entry : data.entrySet()) {
String key = entry.getKey();
byte[] keyBytes = key.getBytes(UTF8);
buffer.appendInt(keyBytes.length).appendBytes(keyBytes);
Object val = entry.getValue();
if (val instanceof Long) {
buffer.appendByte(TYPE_LONG).appendLong((long) val);
} else if (val instanceof Integer) {
buffer.appendByte(TYPE_INT).appendInt((int) val);
} else if (val instanceof Short) {
buffer.appendByte(TYPE_SHORT).appendShort((short) val);
} else if (val instanceof Byte) {
buffer.appendByte(TYPE_BYTE).appendByte((byte) val);
} else if (val instanceof Double) {
buffer.appendByte(TYPE_DOUBLE).appendDouble((double) val);
} else if (val instanceof Float) {
buffer.appendByte(TYPE_FLOAT).appendFloat((float) val);
} else if (val instanceof Character) {
buffer.appendByte(TYPE_CHAR).appendShort((short) ((Character) val).charValue());
} else if (val instanceof Boolean) {
buffer.appendByte(TYPE_BOOLEAN).appendByte((byte) ((boolean) val ? 1 : 0));
} else if (val instanceof String) {
byte[] bytes = ((String) val).getBytes(UTF8);
buffer.appendByte(TYPE_STRING).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof Buffer) {
Buffer buff = (Buffer) val;
buffer.appendByte(TYPE_BUFFER).appendInt(buff.length()).appendBuffer(buff);
} else if (val instanceof byte[]) {
byte[] bytes = (byte[]) val;
buffer.appendByte(TYPE_BYTES).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof ClusterSerializable) {
buffer.appendByte(TYPE_CLUSTER_SERIALIZABLE);
String className = val.getClass().getName();
byte[] classNameBytes = className.getBytes(UTF8);
buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);
((ClusterSerializable) val).writeToBuffer(buffer);
} else {
if (val != null) {
throw new IllegalStateException("Invalid type for data in session: " + val.getClass());
}
}
}
}
return buffer;
}
private int readDataFromBuffer(int pos, Buffer buffer) {
try {
int entries = buffer.getInt(pos);
pos += 4;
if (entries > 0) {
final Map<String, Object> data = new ConcurrentHashMap<>(entries);
for (int i = 0; i < entries; i++) {
int keylen = buffer.getInt(pos);
pos += 4;
byte[] keyBytes = buffer.getBytes(pos, pos + keylen);
pos += keylen;
String key = new String(keyBytes, UTF8);
byte type = buffer.getByte(pos++);
Object val;
switch (type) {
case TYPE_LONG:
val = buffer.getLong(pos);
pos += 8;
break;
case TYPE_INT:
val = buffer.getInt(pos);
pos += 4;
break;
case TYPE_SHORT:
val = buffer.getShort(pos);
pos += 2;
break;
case TYPE_BYTE:
val = buffer.getByte(pos);
pos++;
break;
case TYPE_FLOAT:
val = buffer.getFloat(pos);
pos += 4;
break;
case TYPE_DOUBLE:
val = buffer.getDouble(pos);
pos += 8;
break;
case TYPE_CHAR:
short s = buffer.getShort(pos);
pos += 2;
val = (char) s;
break;
case TYPE_BOOLEAN:
byte b = buffer.getByte(pos);
pos++;
val = b == 1;
break;
case TYPE_STRING:
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
val = new String(bytes, UTF8);
pos += len;
break;
case TYPE_BUFFER:
len = buffer.getInt(pos);
pos += 4;
bytes = buffer.getBytes(pos, pos + len);
val = Buffer.buffer(bytes);
pos += len;
break;
case TYPE_BYTES:
len = buffer.getInt(pos);
pos += 4;
val = buffer.getBytes(pos, pos + len);
pos += len;
break;
case TYPE_CLUSTER_SERIALIZABLE:
int classNameLen = buffer.getInt(pos);
pos += 4;
byte[] classNameBytes = buffer.getBytes(pos, pos + classNameLen);
pos += classNameLen;
String className = new String(classNameBytes, UTF8);
Class<?> clazz = Utils.getClassLoader().loadClass(className);
if (!ClusterSerializable.class.isAssignableFrom(clazz)) {
throw new ClassCastException(new String(classNameBytes) + " is not assignable from ClusterSerializable");
}
ClusterSerializable obj = (ClusterSerializable) clazz.getDeclaredConstructor().newInstance();
pos = obj.readFromBuffer(pos, buffer);
val = obj;
break;
default:
throw new IllegalStateException("Invalid serialized type: " + type);
}
data.put(key, val);
}
setData(data);
}
return pos;
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
throw new VertxException(e);
}
}
}
If complex objects are not implemented ClusterSerializable Interface , Will prompt IllegalStateException.
2. rewrite ShareDataSessionImpl
So we add PB type , We all take complex objects pb Serialization and deserialization .
pb Serialization tool class
package com.ly;
import com.ly.entity.Good;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class ProtoStuffUtil {
private static final Map<Class<?>,Schema<?>> SCHEMA_MAP = new ConcurrentHashMap<>();
private static final Class<SerializeDeserializeWrapperObj> SERIALIZE_DESERIALIZE_WRAPPER_OBJ_CLASS =
SerializeDeserializeWrapperObj.class;
private static final Schema<SerializeDeserializeWrapperObj> WRAPPER_SCHEMA =
RuntimeSchema.createFrom(SERIALIZE_DESERIALIZE_WRAPPER_OBJ_CLASS);
private static final Set<Class<?>> WRAPPER_CLASS_SET = new HashSet<>();
static{
WRAPPER_CLASS_SET.add(List.class);
WRAPPER_CLASS_SET.add(Integer.class);
WRAPPER_CLASS_SET.add(Boolean.class);
WRAPPER_CLASS_SET.add(Character.class);
WRAPPER_CLASS_SET.add(Double.class);
WRAPPER_CLASS_SET.add(int.class);
WRAPPER_CLASS_SET.add(boolean.class);
WRAPPER_CLASS_SET.add(char.class);
WRAPPER_CLASS_SET.add(double.class);
WRAPPER_CLASS_SET.add(ArrayList.class);
WRAPPER_CLASS_SET.add(Set.class);
WRAPPER_CLASS_SET.add(Map.class);
WRAPPER_CLASS_SET.add(HashMap.class);
WRAPPER_CLASS_SET.add(Date.class);
}
public static <T> byte[] serializer(T o){
if(WRAPPER_CLASS_SET.contains(o.getClass())){
return ProtostuffIOUtil.toByteArray(SerializeDeserializeWrapperObj.builder(o),WRAPPER_SCHEMA, LinkedBuffer.allocate(1024));
}else{
return ProtostuffIOUtil.toByteArray(o,getSchema(o.getClass()), LinkedBuffer.allocate(1024));
}
}
public static <T> byte[] serializerV1(T o){
Schema<T> schema = getSchema(o.getClass());
return ProtostuffIOUtil.toByteArray(o,schema, LinkedBuffer.allocate(1024));
}
public static <T> Schema getSchema(Class<T> clazz){
if(SCHEMA_MAP.containsKey(clazz)){
return SCHEMA_MAP.get(clazz);
}else{
Schema<T> schema = RuntimeSchema.createFrom(clazz);
SCHEMA_MAP.put(clazz,schema);
return schema;
}
}
public static <T> T deserializer(byte[] bytes,Class<T> clazz) {
if(WRAPPER_CLASS_SET.contains(clazz)){
SerializeDeserializeWrapperObj<T> obj = new SerializeDeserializeWrapperObj<>();
ProtostuffIOUtil.mergeFrom(bytes, obj, WRAPPER_SCHEMA);
return obj.getData();
}else{
Schema<T> schema = getSchema(clazz);
T obj = null;
try {
obj = clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
return null;
}
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
}
public static Object deserializerToObj(byte[] bytes,Class clazz) {
if(WRAPPER_CLASS_SET.contains(clazz)){
SerializeDeserializeWrapperObj obj = new SerializeDeserializeWrapperObj<>();
ProtostuffIOUtil.mergeFrom(bytes, obj, WRAPPER_SCHEMA);
return obj.getData();
}else{
Schema schema = RuntimeSchema.createFrom(clazz);
Object obj = null;
try {
obj = clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
return null;
}
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
}
public static <T> T deserializerV1(byte[] bytes,Class<T> clazz) throws IllegalAccessException, InstantiationException {
Schema<T> schema = getSchema(clazz);
T obj = clazz.newInstance();
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
private static class SerializeDeserializeWrapperObj<T> {
private T data;
public static <T> SerializeDeserializeWrapperObj<T> builder(T data) {
SerializeDeserializeWrapperObj<T> wrapper = new SerializeDeserializeWrapperObj<>();
wrapper.setData(data);
return wrapper;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
}
}
rewrite ShareDataSessionImpl
/* * Copyright 2014 Red Hat, Inc. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Apache License v2.0 which accompanies this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * * The Apache License v2.0 is available at * http://www.opensource.org/licenses/apache2.0.php * * You may elect to redistribute this code under either of these licenses. */
package com.ly.session;
import com.ly.ProtoStuffUtil;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.impl.ClusterSerializable;
import io.vertx.ext.auth.VertxContextPRNG;
import io.vertx.ext.web.Session;
import io.vertx.ext.web.impl.Utils;
import io.vertx.ext.web.sstore.AbstractSession;
import io.vertx.ext.web.sstore.SessionStore;
import io.vertx.ext.web.sstore.redis.RedisSessionStore;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import static io.vertx.redis.client.Command.*;
import static io.vertx.redis.client.Request.cmd;
/** * @author <a href="http://tfox.org">Tim Fox</a> */
public class ReactiveRedisSessionStore implements SessionStore {
private Redis redis;
private VertxContextPRNG random;
private long retryTimeout;
@Override
public SessionStore init(Vertx vertx, JsonObject options) {
Objects.requireNonNull(options, "options are required");
long timeout = options.getLong("retryTimeout", RedisSessionStore.DEFAULT_RETRY_TIMEOUT_MS);
Redis redis = Redis.createClient(vertx, new RedisOptions(options));
return init(vertx, timeout, redis);
}
public SessionStore init(Vertx vertx, long retryTimeout, Redis redis) {
random = VertxContextPRNG.current(vertx);
this.retryTimeout = retryTimeout;
this.redis = Objects.requireNonNull(redis, "redis is required");
return this;
}
@Override
public long retryTimeout() {
return retryTimeout;
}
@Override
public Session createSession(long timeout) {
return createSession(timeout, DEFAULT_SESSIONID_LENGTH);
}
@Override
public Session createSession(long timeout, int length) {
return new RedisShardDataSessionImpl(random, timeout, length);
}
public ReactiveRedisSessionStore() {
super();
}
@Override
public void get(String id, Handler<AsyncResult<Session>> resultHandler) {
redis.send(cmd(GET).arg(id), resGet -> {
if (resGet.failed()) {
resultHandler.handle(Future.failedFuture(resGet.cause()));
return;
}
Response response = resGet.result();
if (response != null) {
RedisShardDataSessionImpl session = new RedisShardDataSessionImpl(random);
session.readFromBuffer(0, response.toBuffer());
// postpone expiration time, this cannot be done in a single frame with GET cmd
// redis.send(cmd(PEXPIRE).arg(id).arg(session.timeout()), resExpire -> {
// if (resExpire.failed()) {
// resultHandler.handle(Future.failedFuture(resExpire.cause()));
// } else {
// resultHandler.handle(Future.succeededFuture(session));
// }
// });
resultHandler.handle(Future.succeededFuture(session));
} else {
resultHandler.handle(Future.succeededFuture());
}
});
}
@Override
public void delete(String id, Handler<AsyncResult<Void>> resultHandler) {
redis.send(cmd(DEL).arg(id), res -> {
if (res.failed()) {
resultHandler.handle(Future.failedFuture(res.cause()));
} else {
resultHandler.handle(Future.succeededFuture());
}
});
}
@Override
public void put(Session session, Handler<AsyncResult<Void>> resultHandler) {
Buffer buffer = Buffer.buffer();
RedisShardDataSessionImpl sessionImpl = (RedisShardDataSessionImpl) session;
sessionImpl.writeToBuffer(buffer);
// submit with all session data & expiration TO in ms
Request rq = cmd(SET)
.arg(session.id()).arg(buffer)
.arg("PX").arg(session.timeout());
redis.send(rq, res -> {
if (res.failed()) {
resultHandler.handle(Future.failedFuture(res.cause()));
} else {
resultHandler.handle(Future.succeededFuture());
}
});
}
@Override
public void clear(Handler<AsyncResult<Void>> resultHandler) {
}
@Override
public void size(Handler<AsyncResult<Integer>> resultHandler) {
}
@Override
public void close() {
}
public static class RedisShardDataSessionImpl extends AbstractSession{
private static final Charset UTF8 = StandardCharsets.UTF_8;
private static final byte TYPE_LONG = 1;
private static final byte TYPE_INT = 2;
private static final byte TYPE_SHORT = 3;
private static final byte TYPE_BYTE = 4;
private static final byte TYPE_DOUBLE = 5;
private static final byte TYPE_FLOAT = 6;
private static final byte TYPE_CHAR = 7;
private static final byte TYPE_BOOLEAN = 8;
private static final byte TYPE_STRING = 9;
private static final byte TYPE_BUFFER = 10;
private static final byte TYPE_BYTES = 11;
private static final byte TYPE_CLUSTER_SERIALIZABLE = 13;
private static final byte TYPE_PB_SERIALIZABLE = 14;
/** * Important note: This constructor (even though not referenced anywhere) is required for serialization purposes. Do * not remove. */
public RedisShardDataSessionImpl() {
super();
}
public RedisShardDataSessionImpl(VertxContextPRNG random) {
super(random);
}
public RedisShardDataSessionImpl(VertxContextPRNG random, long timeout, int length) {
super(random, timeout, length);
}
public void writeToBuffer(Buffer buff) {
byte[] bytes = id().getBytes(UTF8);
buff.appendInt(bytes.length).appendBytes(bytes);
buff.appendLong(timeout());
buff.appendLong(lastAccessed());
buff.appendInt(version());
// use cache
Buffer dataBuf = writeDataToBuffer();
buff.appendBuffer(dataBuf);
}
public int readFromBuffer(int pos, Buffer buffer) {
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
pos += len;
setId(new String(bytes, UTF8));
setTimeout(buffer.getLong(pos));
pos += 8;
setLastAccessed(buffer.getLong(pos));
pos += 8;
setVersion(buffer.getInt(pos));
pos += 4;
pos = readDataFromBuffer(pos, buffer);
return pos;
}
private Buffer writeDataToBuffer() {
Buffer buffer = Buffer.buffer();
if (isEmpty()) {
buffer.appendInt(0);
} else {
final Map<String, Object> data = data();
buffer.appendInt(data.size());
for (Map.Entry<String, Object> entry : data.entrySet()) {
String key = entry.getKey();
byte[] keyBytes = key.getBytes(UTF8);
buffer.appendInt(keyBytes.length).appendBytes(keyBytes);
Object val = entry.getValue();
if (val instanceof Long) {
buffer.appendByte(TYPE_LONG).appendLong((long) val);
} else if (val instanceof Integer) {
buffer.appendByte(TYPE_INT).appendInt((int) val);
} else if (val instanceof Short) {
buffer.appendByte(TYPE_SHORT).appendShort((short) val);
} else if (val instanceof Byte) {
buffer.appendByte(TYPE_BYTE).appendByte((byte) val);
} else if (val instanceof Double) {
buffer.appendByte(TYPE_DOUBLE).appendDouble((double) val);
} else if (val instanceof Float) {
buffer.appendByte(TYPE_FLOAT).appendFloat((float) val);
} else if (val instanceof Character) {
buffer.appendByte(TYPE_CHAR).appendShort((short) ((Character) val).charValue());
} else if (val instanceof Boolean) {
buffer.appendByte(TYPE_BOOLEAN).appendByte((byte) ((boolean) val ? 1 : 0));
} else if (val instanceof String) {
byte[] bytes = ((String) val).getBytes(UTF8);
buffer.appendByte(TYPE_STRING).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof Buffer) {
Buffer buff = (Buffer) val;
buffer.appendByte(TYPE_BUFFER).appendInt(buff.length()).appendBuffer(buff);
} else if (val instanceof byte[]) {
byte[] bytes = (byte[]) val;
buffer.appendByte(TYPE_BYTES).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof ClusterSerializable) {
buffer.appendByte(TYPE_CLUSTER_SERIALIZABLE);
String className = val.getClass().getName();
byte[] classNameBytes = className.getBytes(UTF8);
buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);
((ClusterSerializable) val).writeToBuffer(buffer);
} else {
// Default serialization to pb
buffer.appendByte(TYPE_PB_SERIALIZABLE);
String className = val.getClass().getName();
byte[] classNameBytes = className.getBytes(UTF8);
buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);
byte[] serializedBytes = ProtoStuffUtil.serializer(val);
buffer.appendInt(serializedBytes.length).appendBytes(serializedBytes);
}
}
}
return buffer;
}
private int readDataFromBuffer(int pos, Buffer buffer) {
try {
int entries = buffer.getInt(pos);
pos += 4;
if (entries > 0) {
final Map<String, Object> data = new ConcurrentHashMap<>(entries);
for (int i = 0; i < entries; i++) {
int keylen = buffer.getInt(pos);
pos += 4;
byte[] keyBytes = buffer.getBytes(pos, pos + keylen);
pos += keylen;
String key = new String(keyBytes, UTF8);
byte type = buffer.getByte(pos++);
Object val;
switch (type) {
case TYPE_LONG:
val = buffer.getLong(pos);
pos += 8;
break;
case TYPE_INT:
val = buffer.getInt(pos);
pos += 4;
break;
case TYPE_SHORT:
val = buffer.getShort(pos);
pos += 2;
break;
case TYPE_BYTE:
val = buffer.getByte(pos);
pos++;
break;
case TYPE_FLOAT:
val = buffer.getFloat(pos);
pos += 4;
break;
case TYPE_DOUBLE:
val = buffer.getDouble(pos);
pos += 8;
break;
case TYPE_CHAR:
short s = buffer.getShort(pos);
pos += 2;
val = (char) s;
break;
case TYPE_BOOLEAN:
byte b = buffer.getByte(pos);
pos++;
val = b == 1;
break;
case TYPE_STRING:
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
val = new String(bytes, UTF8);
pos += len;
break;
case TYPE_BUFFER:
len = buffer.getInt(pos);
pos += 4;
bytes = buffer.getBytes(pos, pos + len);
val = Buffer.buffer(bytes);
pos += len;
break;
case TYPE_BYTES:
len = buffer.getInt(pos);
pos += 4;
val = buffer.getBytes(pos, pos + len);
pos += len;
break;
case TYPE_CLUSTER_SERIALIZABLE:
int classNameLen = buffer.getInt(pos);
pos += 4;
byte[] classNameBytes = buffer.getBytes(pos, pos + classNameLen);
pos += classNameLen;
String className = new String(classNameBytes, UTF8);
Class<?> clazz = Utils.getClassLoader().loadClass(className);
if (!ClusterSerializable.class.isAssignableFrom(clazz)) {
throw new ClassCastException(new String(classNameBytes) + " is not assignable from ClusterSerializable");
}
ClusterSerializable obj = (ClusterSerializable) clazz.getDeclaredConstructor().newInstance();
pos = obj.readFromBuffer(pos, buffer);
val = obj;
break;
case TYPE_PB_SERIALIZABLE:
classNameLen = buffer.getInt(pos);
pos += 4;
classNameBytes = buffer.getBytes(pos, pos + classNameLen);
pos += classNameLen;
className = new String(classNameBytes, UTF8);
Class<?> clazzByPb = Utils.getClassLoader().loadClass(className);
int serializeObjLength = buffer.getInt(pos);
pos+=4;
byte[] serializeObjBytes = buffer.getBytes(pos,pos+serializeObjLength);
pos+=serializeObjLength;
val = ProtoStuffUtil.deserializerToObj(serializeObjBytes,clazzByPb);
break;
default:
throw new IllegalStateException("Invalid serialized type: " + type);
}
data.put(key, val);
}
setData(data);
}
return pos;
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
throw new VertxException(e);
}
}
}
}
When serializing, the default is pb Do serialization , When deserializing , If the type is pb Just use pb Do deserialization .

Look at the complete code github
https://github.com/haozhi-ly/spring-boot-tutorial/tree/master/spring-boot-vertx
边栏推荐
- 教你自己训练的pytorch模型转caffe(三)
- 从架构上详解技术(SLB,Redis,Mysql,Kafka,Clickhouse)的各类热点问题
- Return to blowing marshland -- travel notes of zhailidong, founder of duanzhitang
- sql系列(基础)-第二章 限制和排序数据
- 2. < tag hash table, string> supplement: Sword finger offer 50 The first character DBC that appears only once
- Popular science | does poor English affect the NPDP exam?
- LeetCode: Distinct Subsequences [115]
- Material Design组件 - 使用BottomSheet展现扩展内容(二)
- Analysis of steam education mode under the integration of five Education
- Promouvoir le développement de l'industrie culturelle et touristique par la recherche, l'apprentissage et l'enseignement pratique du tourisme
猜你喜欢

Prosci LAG-3 recombinant protein specification

phpstudy小皮的mysql点击启动后迅速闪退,已解决

Phpstudy Xiaopi's MySQL Click to start and quickly flash back. It has been solved

How to make ERP inventory accounts of chemical enterprises more accurate

Duchefa s0188 Chinese and English instructions of spectinomycin hydrochloride pentahydrate

Abnova丨培养细胞总 RNA 纯化试剂盒中英文说明书

教你自己训练的pytorch模型转caffe(一)

Abnova丨E (DIII) (WNV) 重组蛋白 中英文说明书

Mathematical analysis_ Notes_ Chapter 9: curve integral and surface integral

2.<tag-哈希表, 字符串>补充: 剑指 Offer 50. 第一个只出现一次的字符 dbc
随机推荐
【案例】定位的运用-淘宝轮播图
教你自己训练的pytorch模型转caffe(一)
Use of thread pool
How to renew NPDP? Here comes the operation guide!
基于flask写一个接口
Abnova DNA marker high quality control test program
Abnova CD81 monoclonal antibody related parameters and Applications
Monorepo管理方法论和依赖安全
Specification of protein quantitative kit for abbkine BCA method
CareerCup它1.8 串移包括问题
五层网络协议
CLion配置visual studio(msvc)和JOM多核编译
Monorepo management methodology and dependency security
Traps in the explode function in PHP
当用户登录,经常会有实时的下拉框,例如,输入邮箱,将会@qq.com,@163.com,@sohu.com
Duchefa cytokinin dihydrozeatin (DHZ) instructions
leetcode:1755. 最接近目标值的子序列和
重上吹麻滩——段芝堂创始人翟立冬游记
Abnova丨血液总核酸纯化试剂盒预装相关说明书
Which is the best online collaboration product? Microsoft loop, notion, flowus