当前位置:网站首页>The transformation based on vertx web sstore redis to realize the distributed session of vertx HTTP application
The transformation based on vertx web sstore redis to realize the distributed session of vertx HTTP application
2022-07-05 20:58:00 【forwardMyLife】
1. vertx Based on redis Distributed session Deficiency
vertx It has redis Distributed session The implementation of the . Only the following dependencies need to be introduced
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-sstore-redis</artifactId>
<version>4.1.4</version>
</dependency>
RedisOptions redisOptions = new RedisOptions();
redisOptions.addConnectionString("redis://192.168.72.8:6379");
Redis redis = new RedisClient(vertx,redisOptions);
RedisSessionStore redisSessionStore = RedisSessionStore.create(vertx,redis);
//SessionStore sessionStore = SessionStore.create(vertx);
router.routeWithRegex("/static.*").handler(StaticHandler.create());
router.routeWithRegex(".*service.*")
.handler(SessionHandler.create(redisSessionStore)).handler(ctx->{
if(ctx.request().path().contains("initSession")){
ctx.request().response().setStatusCode(401)
.putHeader("Content-Type","application/json").end((new JsonObject().put("msg","illegal request").encode()));
}else{
ctx.next();
}
})
Just put the original SessionStore Switch to RedisSessionStore that will do .
Its own Redis The client is based on vertx Realized , Yes redis The reading and writing of is asynchronous , It won't block io Threads . But its current RedisStore Implementation has a fatal problem , It cannot directly support the customization of objects or jdk Existing common objects such as ArrayList,HashMap Do serialization and deserialization , except 8 Large basic types and their packaging , Other complex objects must implement their serialization interfaces .ClusterSerializable ,
package io.vertx.core.shareddata.impl;
import io.vertx.core.buffer.Buffer;
/** * Objects implementing this interface will be write to and read from a {@link Buffer} when respectively * stored and read from an {@link io.vertx.core.shareddata.AsyncMap}. * <p> * Implementations must have a public no-argument constructor. * * @author <a href="http://tfox.org">Tim Fox</a> */
public interface ClusterSerializable {
void writeToBuffer(Buffer buffer);
int readFromBuffer(int pos, Buffer buffer);
}
/* * Copyright 2014 Red Hat, Inc. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Apache License v2.0 which accompanies this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * * The Apache License v2.0 is available at * http://www.opensource.org/licenses/apache2.0.php * * You may elect to redistribute this code under either of these licenses. */
package io.vertx.ext.web.sstore.impl;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.shareddata.Shareable;
import io.vertx.core.shareddata.impl.ClusterSerializable;
import io.vertx.ext.auth.VertxContextPRNG;
import io.vertx.ext.web.impl.Utils;
import io.vertx.ext.web.sstore.AbstractSession;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** * @author <a href="http://tfox.org">Tim Fox</a> */
public class SharedDataSessionImpl extends AbstractSession implements ClusterSerializable, Shareable {
private static final Charset UTF8 = StandardCharsets.UTF_8;
private static final byte TYPE_LONG = 1;
private static final byte TYPE_INT = 2;
private static final byte TYPE_SHORT = 3;
private static final byte TYPE_BYTE = 4;
private static final byte TYPE_DOUBLE = 5;
private static final byte TYPE_FLOAT = 6;
private static final byte TYPE_CHAR = 7;
private static final byte TYPE_BOOLEAN = 8;
private static final byte TYPE_STRING = 9;
private static final byte TYPE_BUFFER = 10;
private static final byte TYPE_BYTES = 11;
private static final byte TYPE_CLUSTER_SERIALIZABLE = 13;
/** * Important note: This constructor (even though not referenced anywhere) is required for serialization purposes. Do * not remove. */
public SharedDataSessionImpl() {
super();
}
public SharedDataSessionImpl(VertxContextPRNG random) {
super(random);
}
public SharedDataSessionImpl(VertxContextPRNG random, long timeout, int length) {
super(random, timeout, length);
}
@Override
public void writeToBuffer(Buffer buff) {
byte[] bytes = id().getBytes(UTF8);
buff.appendInt(bytes.length).appendBytes(bytes);
buff.appendLong(timeout());
buff.appendLong(lastAccessed());
buff.appendInt(version());
// use cache
Buffer dataBuf = writeDataToBuffer();
buff.appendBuffer(dataBuf);
}
@Override
public int readFromBuffer(int pos, Buffer buffer) {
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
pos += len;
setId(new String(bytes, UTF8));
setTimeout(buffer.getLong(pos));
pos += 8;
setLastAccessed(buffer.getLong(pos));
pos += 8;
setVersion(buffer.getInt(pos));
pos += 4;
pos = readDataFromBuffer(pos, buffer);
return pos;
}
private Buffer writeDataToBuffer() {
Buffer buffer = Buffer.buffer();
if (isEmpty()) {
buffer.appendInt(0);
} else {
final Map<String, Object> data = data();
buffer.appendInt(data.size());
for (Map.Entry<String, Object> entry : data.entrySet()) {
String key = entry.getKey();
byte[] keyBytes = key.getBytes(UTF8);
buffer.appendInt(keyBytes.length).appendBytes(keyBytes);
Object val = entry.getValue();
if (val instanceof Long) {
buffer.appendByte(TYPE_LONG).appendLong((long) val);
} else if (val instanceof Integer) {
buffer.appendByte(TYPE_INT).appendInt((int) val);
} else if (val instanceof Short) {
buffer.appendByte(TYPE_SHORT).appendShort((short) val);
} else if (val instanceof Byte) {
buffer.appendByte(TYPE_BYTE).appendByte((byte) val);
} else if (val instanceof Double) {
buffer.appendByte(TYPE_DOUBLE).appendDouble((double) val);
} else if (val instanceof Float) {
buffer.appendByte(TYPE_FLOAT).appendFloat((float) val);
} else if (val instanceof Character) {
buffer.appendByte(TYPE_CHAR).appendShort((short) ((Character) val).charValue());
} else if (val instanceof Boolean) {
buffer.appendByte(TYPE_BOOLEAN).appendByte((byte) ((boolean) val ? 1 : 0));
} else if (val instanceof String) {
byte[] bytes = ((String) val).getBytes(UTF8);
buffer.appendByte(TYPE_STRING).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof Buffer) {
Buffer buff = (Buffer) val;
buffer.appendByte(TYPE_BUFFER).appendInt(buff.length()).appendBuffer(buff);
} else if (val instanceof byte[]) {
byte[] bytes = (byte[]) val;
buffer.appendByte(TYPE_BYTES).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof ClusterSerializable) {
buffer.appendByte(TYPE_CLUSTER_SERIALIZABLE);
String className = val.getClass().getName();
byte[] classNameBytes = className.getBytes(UTF8);
buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);
((ClusterSerializable) val).writeToBuffer(buffer);
} else {
if (val != null) {
throw new IllegalStateException("Invalid type for data in session: " + val.getClass());
}
}
}
}
return buffer;
}
private int readDataFromBuffer(int pos, Buffer buffer) {
try {
int entries = buffer.getInt(pos);
pos += 4;
if (entries > 0) {
final Map<String, Object> data = new ConcurrentHashMap<>(entries);
for (int i = 0; i < entries; i++) {
int keylen = buffer.getInt(pos);
pos += 4;
byte[] keyBytes = buffer.getBytes(pos, pos + keylen);
pos += keylen;
String key = new String(keyBytes, UTF8);
byte type = buffer.getByte(pos++);
Object val;
switch (type) {
case TYPE_LONG:
val = buffer.getLong(pos);
pos += 8;
break;
case TYPE_INT:
val = buffer.getInt(pos);
pos += 4;
break;
case TYPE_SHORT:
val = buffer.getShort(pos);
pos += 2;
break;
case TYPE_BYTE:
val = buffer.getByte(pos);
pos++;
break;
case TYPE_FLOAT:
val = buffer.getFloat(pos);
pos += 4;
break;
case TYPE_DOUBLE:
val = buffer.getDouble(pos);
pos += 8;
break;
case TYPE_CHAR:
short s = buffer.getShort(pos);
pos += 2;
val = (char) s;
break;
case TYPE_BOOLEAN:
byte b = buffer.getByte(pos);
pos++;
val = b == 1;
break;
case TYPE_STRING:
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
val = new String(bytes, UTF8);
pos += len;
break;
case TYPE_BUFFER:
len = buffer.getInt(pos);
pos += 4;
bytes = buffer.getBytes(pos, pos + len);
val = Buffer.buffer(bytes);
pos += len;
break;
case TYPE_BYTES:
len = buffer.getInt(pos);
pos += 4;
val = buffer.getBytes(pos, pos + len);
pos += len;
break;
case TYPE_CLUSTER_SERIALIZABLE:
int classNameLen = buffer.getInt(pos);
pos += 4;
byte[] classNameBytes = buffer.getBytes(pos, pos + classNameLen);
pos += classNameLen;
String className = new String(classNameBytes, UTF8);
Class<?> clazz = Utils.getClassLoader().loadClass(className);
if (!ClusterSerializable.class.isAssignableFrom(clazz)) {
throw new ClassCastException(new String(classNameBytes) + " is not assignable from ClusterSerializable");
}
ClusterSerializable obj = (ClusterSerializable) clazz.getDeclaredConstructor().newInstance();
pos = obj.readFromBuffer(pos, buffer);
val = obj;
break;
default:
throw new IllegalStateException("Invalid serialized type: " + type);
}
data.put(key, val);
}
setData(data);
}
return pos;
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
throw new VertxException(e);
}
}
}
If complex objects are not implemented ClusterSerializable Interface , Will prompt IllegalStateException.
2. rewrite ShareDataSessionImpl
So we add PB type , We all take complex objects pb Serialization and deserialization .
pb Serialization tool class
package com.ly;
import com.ly.entity.Good;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class ProtoStuffUtil {
private static final Map<Class<?>,Schema<?>> SCHEMA_MAP = new ConcurrentHashMap<>();
private static final Class<SerializeDeserializeWrapperObj> SERIALIZE_DESERIALIZE_WRAPPER_OBJ_CLASS =
SerializeDeserializeWrapperObj.class;
private static final Schema<SerializeDeserializeWrapperObj> WRAPPER_SCHEMA =
RuntimeSchema.createFrom(SERIALIZE_DESERIALIZE_WRAPPER_OBJ_CLASS);
private static final Set<Class<?>> WRAPPER_CLASS_SET = new HashSet<>();
static{
WRAPPER_CLASS_SET.add(List.class);
WRAPPER_CLASS_SET.add(Integer.class);
WRAPPER_CLASS_SET.add(Boolean.class);
WRAPPER_CLASS_SET.add(Character.class);
WRAPPER_CLASS_SET.add(Double.class);
WRAPPER_CLASS_SET.add(int.class);
WRAPPER_CLASS_SET.add(boolean.class);
WRAPPER_CLASS_SET.add(char.class);
WRAPPER_CLASS_SET.add(double.class);
WRAPPER_CLASS_SET.add(ArrayList.class);
WRAPPER_CLASS_SET.add(Set.class);
WRAPPER_CLASS_SET.add(Map.class);
WRAPPER_CLASS_SET.add(HashMap.class);
WRAPPER_CLASS_SET.add(Date.class);
}
public static <T> byte[] serializer(T o){
if(WRAPPER_CLASS_SET.contains(o.getClass())){
return ProtostuffIOUtil.toByteArray(SerializeDeserializeWrapperObj.builder(o),WRAPPER_SCHEMA, LinkedBuffer.allocate(1024));
}else{
return ProtostuffIOUtil.toByteArray(o,getSchema(o.getClass()), LinkedBuffer.allocate(1024));
}
}
public static <T> byte[] serializerV1(T o){
Schema<T> schema = getSchema(o.getClass());
return ProtostuffIOUtil.toByteArray(o,schema, LinkedBuffer.allocate(1024));
}
public static <T> Schema getSchema(Class<T> clazz){
if(SCHEMA_MAP.containsKey(clazz)){
return SCHEMA_MAP.get(clazz);
}else{
Schema<T> schema = RuntimeSchema.createFrom(clazz);
SCHEMA_MAP.put(clazz,schema);
return schema;
}
}
public static <T> T deserializer(byte[] bytes,Class<T> clazz) {
if(WRAPPER_CLASS_SET.contains(clazz)){
SerializeDeserializeWrapperObj<T> obj = new SerializeDeserializeWrapperObj<>();
ProtostuffIOUtil.mergeFrom(bytes, obj, WRAPPER_SCHEMA);
return obj.getData();
}else{
Schema<T> schema = getSchema(clazz);
T obj = null;
try {
obj = clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
return null;
}
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
}
public static Object deserializerToObj(byte[] bytes,Class clazz) {
if(WRAPPER_CLASS_SET.contains(clazz)){
SerializeDeserializeWrapperObj obj = new SerializeDeserializeWrapperObj<>();
ProtostuffIOUtil.mergeFrom(bytes, obj, WRAPPER_SCHEMA);
return obj.getData();
}else{
Schema schema = RuntimeSchema.createFrom(clazz);
Object obj = null;
try {
obj = clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
return null;
}
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
}
public static <T> T deserializerV1(byte[] bytes,Class<T> clazz) throws IllegalAccessException, InstantiationException {
Schema<T> schema = getSchema(clazz);
T obj = clazz.newInstance();
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
private static class SerializeDeserializeWrapperObj<T> {
private T data;
public static <T> SerializeDeserializeWrapperObj<T> builder(T data) {
SerializeDeserializeWrapperObj<T> wrapper = new SerializeDeserializeWrapperObj<>();
wrapper.setData(data);
return wrapper;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
}
}
rewrite ShareDataSessionImpl
/* * Copyright 2014 Red Hat, Inc. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Apache License v2.0 which accompanies this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * * The Apache License v2.0 is available at * http://www.opensource.org/licenses/apache2.0.php * * You may elect to redistribute this code under either of these licenses. */
package com.ly.session;
import com.ly.ProtoStuffUtil;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.impl.ClusterSerializable;
import io.vertx.ext.auth.VertxContextPRNG;
import io.vertx.ext.web.Session;
import io.vertx.ext.web.impl.Utils;
import io.vertx.ext.web.sstore.AbstractSession;
import io.vertx.ext.web.sstore.SessionStore;
import io.vertx.ext.web.sstore.redis.RedisSessionStore;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import static io.vertx.redis.client.Command.*;
import static io.vertx.redis.client.Request.cmd;
/** * @author <a href="http://tfox.org">Tim Fox</a> */
public class ReactiveRedisSessionStore implements SessionStore {
private Redis redis;
private VertxContextPRNG random;
private long retryTimeout;
@Override
public SessionStore init(Vertx vertx, JsonObject options) {
Objects.requireNonNull(options, "options are required");
long timeout = options.getLong("retryTimeout", RedisSessionStore.DEFAULT_RETRY_TIMEOUT_MS);
Redis redis = Redis.createClient(vertx, new RedisOptions(options));
return init(vertx, timeout, redis);
}
public SessionStore init(Vertx vertx, long retryTimeout, Redis redis) {
random = VertxContextPRNG.current(vertx);
this.retryTimeout = retryTimeout;
this.redis = Objects.requireNonNull(redis, "redis is required");
return this;
}
@Override
public long retryTimeout() {
return retryTimeout;
}
@Override
public Session createSession(long timeout) {
return createSession(timeout, DEFAULT_SESSIONID_LENGTH);
}
@Override
public Session createSession(long timeout, int length) {
return new RedisShardDataSessionImpl(random, timeout, length);
}
public ReactiveRedisSessionStore() {
super();
}
@Override
public void get(String id, Handler<AsyncResult<Session>> resultHandler) {
redis.send(cmd(GET).arg(id), resGet -> {
if (resGet.failed()) {
resultHandler.handle(Future.failedFuture(resGet.cause()));
return;
}
Response response = resGet.result();
if (response != null) {
RedisShardDataSessionImpl session = new RedisShardDataSessionImpl(random);
session.readFromBuffer(0, response.toBuffer());
// postpone expiration time, this cannot be done in a single frame with GET cmd
// redis.send(cmd(PEXPIRE).arg(id).arg(session.timeout()), resExpire -> {
// if (resExpire.failed()) {
// resultHandler.handle(Future.failedFuture(resExpire.cause()));
// } else {
// resultHandler.handle(Future.succeededFuture(session));
// }
// });
resultHandler.handle(Future.succeededFuture(session));
} else {
resultHandler.handle(Future.succeededFuture());
}
});
}
@Override
public void delete(String id, Handler<AsyncResult<Void>> resultHandler) {
redis.send(cmd(DEL).arg(id), res -> {
if (res.failed()) {
resultHandler.handle(Future.failedFuture(res.cause()));
} else {
resultHandler.handle(Future.succeededFuture());
}
});
}
@Override
public void put(Session session, Handler<AsyncResult<Void>> resultHandler) {
Buffer buffer = Buffer.buffer();
RedisShardDataSessionImpl sessionImpl = (RedisShardDataSessionImpl) session;
sessionImpl.writeToBuffer(buffer);
// submit with all session data & expiration TO in ms
Request rq = cmd(SET)
.arg(session.id()).arg(buffer)
.arg("PX").arg(session.timeout());
redis.send(rq, res -> {
if (res.failed()) {
resultHandler.handle(Future.failedFuture(res.cause()));
} else {
resultHandler.handle(Future.succeededFuture());
}
});
}
@Override
public void clear(Handler<AsyncResult<Void>> resultHandler) {
}
@Override
public void size(Handler<AsyncResult<Integer>> resultHandler) {
}
@Override
public void close() {
}
public static class RedisShardDataSessionImpl extends AbstractSession{
private static final Charset UTF8 = StandardCharsets.UTF_8;
private static final byte TYPE_LONG = 1;
private static final byte TYPE_INT = 2;
private static final byte TYPE_SHORT = 3;
private static final byte TYPE_BYTE = 4;
private static final byte TYPE_DOUBLE = 5;
private static final byte TYPE_FLOAT = 6;
private static final byte TYPE_CHAR = 7;
private static final byte TYPE_BOOLEAN = 8;
private static final byte TYPE_STRING = 9;
private static final byte TYPE_BUFFER = 10;
private static final byte TYPE_BYTES = 11;
private static final byte TYPE_CLUSTER_SERIALIZABLE = 13;
private static final byte TYPE_PB_SERIALIZABLE = 14;
/** * Important note: This constructor (even though not referenced anywhere) is required for serialization purposes. Do * not remove. */
public RedisShardDataSessionImpl() {
super();
}
public RedisShardDataSessionImpl(VertxContextPRNG random) {
super(random);
}
public RedisShardDataSessionImpl(VertxContextPRNG random, long timeout, int length) {
super(random, timeout, length);
}
public void writeToBuffer(Buffer buff) {
byte[] bytes = id().getBytes(UTF8);
buff.appendInt(bytes.length).appendBytes(bytes);
buff.appendLong(timeout());
buff.appendLong(lastAccessed());
buff.appendInt(version());
// use cache
Buffer dataBuf = writeDataToBuffer();
buff.appendBuffer(dataBuf);
}
public int readFromBuffer(int pos, Buffer buffer) {
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
pos += len;
setId(new String(bytes, UTF8));
setTimeout(buffer.getLong(pos));
pos += 8;
setLastAccessed(buffer.getLong(pos));
pos += 8;
setVersion(buffer.getInt(pos));
pos += 4;
pos = readDataFromBuffer(pos, buffer);
return pos;
}
private Buffer writeDataToBuffer() {
Buffer buffer = Buffer.buffer();
if (isEmpty()) {
buffer.appendInt(0);
} else {
final Map<String, Object> data = data();
buffer.appendInt(data.size());
for (Map.Entry<String, Object> entry : data.entrySet()) {
String key = entry.getKey();
byte[] keyBytes = key.getBytes(UTF8);
buffer.appendInt(keyBytes.length).appendBytes(keyBytes);
Object val = entry.getValue();
if (val instanceof Long) {
buffer.appendByte(TYPE_LONG).appendLong((long) val);
} else if (val instanceof Integer) {
buffer.appendByte(TYPE_INT).appendInt((int) val);
} else if (val instanceof Short) {
buffer.appendByte(TYPE_SHORT).appendShort((short) val);
} else if (val instanceof Byte) {
buffer.appendByte(TYPE_BYTE).appendByte((byte) val);
} else if (val instanceof Double) {
buffer.appendByte(TYPE_DOUBLE).appendDouble((double) val);
} else if (val instanceof Float) {
buffer.appendByte(TYPE_FLOAT).appendFloat((float) val);
} else if (val instanceof Character) {
buffer.appendByte(TYPE_CHAR).appendShort((short) ((Character) val).charValue());
} else if (val instanceof Boolean) {
buffer.appendByte(TYPE_BOOLEAN).appendByte((byte) ((boolean) val ? 1 : 0));
} else if (val instanceof String) {
byte[] bytes = ((String) val).getBytes(UTF8);
buffer.appendByte(TYPE_STRING).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof Buffer) {
Buffer buff = (Buffer) val;
buffer.appendByte(TYPE_BUFFER).appendInt(buff.length()).appendBuffer(buff);
} else if (val instanceof byte[]) {
byte[] bytes = (byte[]) val;
buffer.appendByte(TYPE_BYTES).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof ClusterSerializable) {
buffer.appendByte(TYPE_CLUSTER_SERIALIZABLE);
String className = val.getClass().getName();
byte[] classNameBytes = className.getBytes(UTF8);
buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);
((ClusterSerializable) val).writeToBuffer(buffer);
} else {
// Default serialization to pb
buffer.appendByte(TYPE_PB_SERIALIZABLE);
String className = val.getClass().getName();
byte[] classNameBytes = className.getBytes(UTF8);
buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);
byte[] serializedBytes = ProtoStuffUtil.serializer(val);
buffer.appendInt(serializedBytes.length).appendBytes(serializedBytes);
}
}
}
return buffer;
}
private int readDataFromBuffer(int pos, Buffer buffer) {
try {
int entries = buffer.getInt(pos);
pos += 4;
if (entries > 0) {
final Map<String, Object> data = new ConcurrentHashMap<>(entries);
for (int i = 0; i < entries; i++) {
int keylen = buffer.getInt(pos);
pos += 4;
byte[] keyBytes = buffer.getBytes(pos, pos + keylen);
pos += keylen;
String key = new String(keyBytes, UTF8);
byte type = buffer.getByte(pos++);
Object val;
switch (type) {
case TYPE_LONG:
val = buffer.getLong(pos);
pos += 8;
break;
case TYPE_INT:
val = buffer.getInt(pos);
pos += 4;
break;
case TYPE_SHORT:
val = buffer.getShort(pos);
pos += 2;
break;
case TYPE_BYTE:
val = buffer.getByte(pos);
pos++;
break;
case TYPE_FLOAT:
val = buffer.getFloat(pos);
pos += 4;
break;
case TYPE_DOUBLE:
val = buffer.getDouble(pos);
pos += 8;
break;
case TYPE_CHAR:
short s = buffer.getShort(pos);
pos += 2;
val = (char) s;
break;
case TYPE_BOOLEAN:
byte b = buffer.getByte(pos);
pos++;
val = b == 1;
break;
case TYPE_STRING:
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
val = new String(bytes, UTF8);
pos += len;
break;
case TYPE_BUFFER:
len = buffer.getInt(pos);
pos += 4;
bytes = buffer.getBytes(pos, pos + len);
val = Buffer.buffer(bytes);
pos += len;
break;
case TYPE_BYTES:
len = buffer.getInt(pos);
pos += 4;
val = buffer.getBytes(pos, pos + len);
pos += len;
break;
case TYPE_CLUSTER_SERIALIZABLE:
int classNameLen = buffer.getInt(pos);
pos += 4;
byte[] classNameBytes = buffer.getBytes(pos, pos + classNameLen);
pos += classNameLen;
String className = new String(classNameBytes, UTF8);
Class<?> clazz = Utils.getClassLoader().loadClass(className);
if (!ClusterSerializable.class.isAssignableFrom(clazz)) {
throw new ClassCastException(new String(classNameBytes) + " is not assignable from ClusterSerializable");
}
ClusterSerializable obj = (ClusterSerializable) clazz.getDeclaredConstructor().newInstance();
pos = obj.readFromBuffer(pos, buffer);
val = obj;
break;
case TYPE_PB_SERIALIZABLE:
classNameLen = buffer.getInt(pos);
pos += 4;
classNameBytes = buffer.getBytes(pos, pos + classNameLen);
pos += classNameLen;
className = new String(classNameBytes, UTF8);
Class<?> clazzByPb = Utils.getClassLoader().loadClass(className);
int serializeObjLength = buffer.getInt(pos);
pos+=4;
byte[] serializeObjBytes = buffer.getBytes(pos,pos+serializeObjLength);
pos+=serializeObjLength;
val = ProtoStuffUtil.deserializerToObj(serializeObjBytes,clazzByPb);
break;
default:
throw new IllegalStateException("Invalid serialized type: " + type);
}
data.put(key, val);
}
setData(data);
}
return pos;
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
throw new VertxException(e);
}
}
}
}
When serializing, the default is pb Do serialization , When deserializing , If the type is pb Just use pb Do deserialization .

Look at the complete code github
https://github.com/haozhi-ly/spring-boot-tutorial/tree/master/spring-boot-vertx
边栏推荐
- How to renew NPDP? Here comes the operation guide!
- 解析创客教育的知识迁移和分享精神
- Is it necessary for bazel to learn
- 研學旅遊實踐教育的開展助力文旅產業發展
- EN 438-7建筑覆盖物装饰用层压板材产品—CE认证
- 当Steam教育进入个性化信息技术课程
- Sequence alignment
- When steam education enters personalized information technology courses
- CareerCup它1.8 串移包括问题
- Is Kai Niu 2980 useful? Is it safe to open an account
猜你喜欢

How to make ERP inventory accounts of chemical enterprises more accurate

实现浏览页面时校验用户是否已经完成登录的功能

Using webassembly to operate excel on the browser side

Abnova maxpab mouse derived polyclonal antibody solution

MySQL InnoDB架构原理

Open source SPL eliminates tens of thousands of database intermediate tables

Abnova e (diii) (WNV) recombinant protein Chinese and English instructions

研学旅游实践教育的开展助力文旅产业发展

显示屏DIN 4102-1 Class B1防火测试要求

Duchefa MS medium contains vitamin instructions
随机推荐
渗透创客精神文化转化的创客教育
Duchefa丨S0188盐酸大观霉素五水合物中英文说明书
AITM2-0002 12s或60s垂直燃烧试验
Using webassembly to operate excel on the browser side
示波器探头对信号源阻抗的影响
基于AVFoundation实现视频录制的两种方式
Binary search
Influence of oscilloscope probe on signal source impedance
教你自己训练的pytorch模型转caffe(三)
The Chinese Academy of Management Sciences gathered industry experts, and Fu Qiang won the title of "top ten youth" of think tank experts
学习机器人无从下手?带你体会当下机器人热门研究方向有哪些
Is it safe to open a stock account by mobile phone? My home is relatively remote. Is there a better way to open an account?
poj 3414 Pots (bfs+线索)
浅聊我和一些编程语言的缘分
Phpstudy Xiaopi's MySQL Click to start and quickly flash back. It has been solved
Pytorch实战——MNIST数据集手写数字识别
Abnova DNA marker high quality control test program
How to open an account online for futures? Is it safe?
100 cases of shell programming
基于flask写一个接口