当前位置:网站首页>The transformation based on vertx web sstore redis to realize the distributed session of vertx HTTP application
The transformation based on vertx web sstore redis to realize the distributed session of vertx HTTP application
2022-07-05 20:58:00 【forwardMyLife】
1. vertx Based on redis Distributed session Deficiency
vertx It has redis Distributed session The implementation of the . Only the following dependencies need to be introduced
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-sstore-redis</artifactId>
<version>4.1.4</version>
</dependency>
RedisOptions redisOptions = new RedisOptions();
redisOptions.addConnectionString("redis://192.168.72.8:6379");
Redis redis = new RedisClient(vertx,redisOptions);
RedisSessionStore redisSessionStore = RedisSessionStore.create(vertx,redis);
//SessionStore sessionStore = SessionStore.create(vertx);
router.routeWithRegex("/static.*").handler(StaticHandler.create());
router.routeWithRegex(".*service.*")
.handler(SessionHandler.create(redisSessionStore)).handler(ctx->{
if(ctx.request().path().contains("initSession")){
ctx.request().response().setStatusCode(401)
.putHeader("Content-Type","application/json").end((new JsonObject().put("msg","illegal request").encode()));
}else{
ctx.next();
}
})
Just put the original SessionStore Switch to RedisSessionStore that will do .
Its own Redis The client is based on vertx Realized , Yes redis The reading and writing of is asynchronous , It won't block io Threads . But its current RedisStore Implementation has a fatal problem , It cannot directly support the customization of objects or jdk Existing common objects such as ArrayList,HashMap Do serialization and deserialization , except 8 Large basic types and their packaging , Other complex objects must implement their serialization interfaces .ClusterSerializable ,
package io.vertx.core.shareddata.impl;
import io.vertx.core.buffer.Buffer;
/** * Objects implementing this interface will be write to and read from a {@link Buffer} when respectively * stored and read from an {@link io.vertx.core.shareddata.AsyncMap}. * <p> * Implementations must have a public no-argument constructor. * * @author <a href="http://tfox.org">Tim Fox</a> */
public interface ClusterSerializable {
void writeToBuffer(Buffer buffer);
int readFromBuffer(int pos, Buffer buffer);
}
/* * Copyright 2014 Red Hat, Inc. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Apache License v2.0 which accompanies this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * * The Apache License v2.0 is available at * http://www.opensource.org/licenses/apache2.0.php * * You may elect to redistribute this code under either of these licenses. */
package io.vertx.ext.web.sstore.impl;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.shareddata.Shareable;
import io.vertx.core.shareddata.impl.ClusterSerializable;
import io.vertx.ext.auth.VertxContextPRNG;
import io.vertx.ext.web.impl.Utils;
import io.vertx.ext.web.sstore.AbstractSession;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** * @author <a href="http://tfox.org">Tim Fox</a> */
public class SharedDataSessionImpl extends AbstractSession implements ClusterSerializable, Shareable {
private static final Charset UTF8 = StandardCharsets.UTF_8;
private static final byte TYPE_LONG = 1;
private static final byte TYPE_INT = 2;
private static final byte TYPE_SHORT = 3;
private static final byte TYPE_BYTE = 4;
private static final byte TYPE_DOUBLE = 5;
private static final byte TYPE_FLOAT = 6;
private static final byte TYPE_CHAR = 7;
private static final byte TYPE_BOOLEAN = 8;
private static final byte TYPE_STRING = 9;
private static final byte TYPE_BUFFER = 10;
private static final byte TYPE_BYTES = 11;
private static final byte TYPE_CLUSTER_SERIALIZABLE = 13;
/** * Important note: This constructor (even though not referenced anywhere) is required for serialization purposes. Do * not remove. */
public SharedDataSessionImpl() {
super();
}
public SharedDataSessionImpl(VertxContextPRNG random) {
super(random);
}
public SharedDataSessionImpl(VertxContextPRNG random, long timeout, int length) {
super(random, timeout, length);
}
@Override
public void writeToBuffer(Buffer buff) {
byte[] bytes = id().getBytes(UTF8);
buff.appendInt(bytes.length).appendBytes(bytes);
buff.appendLong(timeout());
buff.appendLong(lastAccessed());
buff.appendInt(version());
// use cache
Buffer dataBuf = writeDataToBuffer();
buff.appendBuffer(dataBuf);
}
@Override
public int readFromBuffer(int pos, Buffer buffer) {
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
pos += len;
setId(new String(bytes, UTF8));
setTimeout(buffer.getLong(pos));
pos += 8;
setLastAccessed(buffer.getLong(pos));
pos += 8;
setVersion(buffer.getInt(pos));
pos += 4;
pos = readDataFromBuffer(pos, buffer);
return pos;
}
private Buffer writeDataToBuffer() {
Buffer buffer = Buffer.buffer();
if (isEmpty()) {
buffer.appendInt(0);
} else {
final Map<String, Object> data = data();
buffer.appendInt(data.size());
for (Map.Entry<String, Object> entry : data.entrySet()) {
String key = entry.getKey();
byte[] keyBytes = key.getBytes(UTF8);
buffer.appendInt(keyBytes.length).appendBytes(keyBytes);
Object val = entry.getValue();
if (val instanceof Long) {
buffer.appendByte(TYPE_LONG).appendLong((long) val);
} else if (val instanceof Integer) {
buffer.appendByte(TYPE_INT).appendInt((int) val);
} else if (val instanceof Short) {
buffer.appendByte(TYPE_SHORT).appendShort((short) val);
} else if (val instanceof Byte) {
buffer.appendByte(TYPE_BYTE).appendByte((byte) val);
} else if (val instanceof Double) {
buffer.appendByte(TYPE_DOUBLE).appendDouble((double) val);
} else if (val instanceof Float) {
buffer.appendByte(TYPE_FLOAT).appendFloat((float) val);
} else if (val instanceof Character) {
buffer.appendByte(TYPE_CHAR).appendShort((short) ((Character) val).charValue());
} else if (val instanceof Boolean) {
buffer.appendByte(TYPE_BOOLEAN).appendByte((byte) ((boolean) val ? 1 : 0));
} else if (val instanceof String) {
byte[] bytes = ((String) val).getBytes(UTF8);
buffer.appendByte(TYPE_STRING).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof Buffer) {
Buffer buff = (Buffer) val;
buffer.appendByte(TYPE_BUFFER).appendInt(buff.length()).appendBuffer(buff);
} else if (val instanceof byte[]) {
byte[] bytes = (byte[]) val;
buffer.appendByte(TYPE_BYTES).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof ClusterSerializable) {
buffer.appendByte(TYPE_CLUSTER_SERIALIZABLE);
String className = val.getClass().getName();
byte[] classNameBytes = className.getBytes(UTF8);
buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);
((ClusterSerializable) val).writeToBuffer(buffer);
} else {
if (val != null) {
throw new IllegalStateException("Invalid type for data in session: " + val.getClass());
}
}
}
}
return buffer;
}
private int readDataFromBuffer(int pos, Buffer buffer) {
try {
int entries = buffer.getInt(pos);
pos += 4;
if (entries > 0) {
final Map<String, Object> data = new ConcurrentHashMap<>(entries);
for (int i = 0; i < entries; i++) {
int keylen = buffer.getInt(pos);
pos += 4;
byte[] keyBytes = buffer.getBytes(pos, pos + keylen);
pos += keylen;
String key = new String(keyBytes, UTF8);
byte type = buffer.getByte(pos++);
Object val;
switch (type) {
case TYPE_LONG:
val = buffer.getLong(pos);
pos += 8;
break;
case TYPE_INT:
val = buffer.getInt(pos);
pos += 4;
break;
case TYPE_SHORT:
val = buffer.getShort(pos);
pos += 2;
break;
case TYPE_BYTE:
val = buffer.getByte(pos);
pos++;
break;
case TYPE_FLOAT:
val = buffer.getFloat(pos);
pos += 4;
break;
case TYPE_DOUBLE:
val = buffer.getDouble(pos);
pos += 8;
break;
case TYPE_CHAR:
short s = buffer.getShort(pos);
pos += 2;
val = (char) s;
break;
case TYPE_BOOLEAN:
byte b = buffer.getByte(pos);
pos++;
val = b == 1;
break;
case TYPE_STRING:
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
val = new String(bytes, UTF8);
pos += len;
break;
case TYPE_BUFFER:
len = buffer.getInt(pos);
pos += 4;
bytes = buffer.getBytes(pos, pos + len);
val = Buffer.buffer(bytes);
pos += len;
break;
case TYPE_BYTES:
len = buffer.getInt(pos);
pos += 4;
val = buffer.getBytes(pos, pos + len);
pos += len;
break;
case TYPE_CLUSTER_SERIALIZABLE:
int classNameLen = buffer.getInt(pos);
pos += 4;
byte[] classNameBytes = buffer.getBytes(pos, pos + classNameLen);
pos += classNameLen;
String className = new String(classNameBytes, UTF8);
Class<?> clazz = Utils.getClassLoader().loadClass(className);
if (!ClusterSerializable.class.isAssignableFrom(clazz)) {
throw new ClassCastException(new String(classNameBytes) + " is not assignable from ClusterSerializable");
}
ClusterSerializable obj = (ClusterSerializable) clazz.getDeclaredConstructor().newInstance();
pos = obj.readFromBuffer(pos, buffer);
val = obj;
break;
default:
throw new IllegalStateException("Invalid serialized type: " + type);
}
data.put(key, val);
}
setData(data);
}
return pos;
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
throw new VertxException(e);
}
}
}
If complex objects are not implemented ClusterSerializable Interface , Will prompt IllegalStateException.
2. rewrite ShareDataSessionImpl
So we add PB type , We all take complex objects pb Serialization and deserialization .
pb Serialization tool class
package com.ly;
import com.ly.entity.Good;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class ProtoStuffUtil {
private static final Map<Class<?>,Schema<?>> SCHEMA_MAP = new ConcurrentHashMap<>();
private static final Class<SerializeDeserializeWrapperObj> SERIALIZE_DESERIALIZE_WRAPPER_OBJ_CLASS =
SerializeDeserializeWrapperObj.class;
private static final Schema<SerializeDeserializeWrapperObj> WRAPPER_SCHEMA =
RuntimeSchema.createFrom(SERIALIZE_DESERIALIZE_WRAPPER_OBJ_CLASS);
private static final Set<Class<?>> WRAPPER_CLASS_SET = new HashSet<>();
static{
WRAPPER_CLASS_SET.add(List.class);
WRAPPER_CLASS_SET.add(Integer.class);
WRAPPER_CLASS_SET.add(Boolean.class);
WRAPPER_CLASS_SET.add(Character.class);
WRAPPER_CLASS_SET.add(Double.class);
WRAPPER_CLASS_SET.add(int.class);
WRAPPER_CLASS_SET.add(boolean.class);
WRAPPER_CLASS_SET.add(char.class);
WRAPPER_CLASS_SET.add(double.class);
WRAPPER_CLASS_SET.add(ArrayList.class);
WRAPPER_CLASS_SET.add(Set.class);
WRAPPER_CLASS_SET.add(Map.class);
WRAPPER_CLASS_SET.add(HashMap.class);
WRAPPER_CLASS_SET.add(Date.class);
}
public static <T> byte[] serializer(T o){
if(WRAPPER_CLASS_SET.contains(o.getClass())){
return ProtostuffIOUtil.toByteArray(SerializeDeserializeWrapperObj.builder(o),WRAPPER_SCHEMA, LinkedBuffer.allocate(1024));
}else{
return ProtostuffIOUtil.toByteArray(o,getSchema(o.getClass()), LinkedBuffer.allocate(1024));
}
}
public static <T> byte[] serializerV1(T o){
Schema<T> schema = getSchema(o.getClass());
return ProtostuffIOUtil.toByteArray(o,schema, LinkedBuffer.allocate(1024));
}
public static <T> Schema getSchema(Class<T> clazz){
if(SCHEMA_MAP.containsKey(clazz)){
return SCHEMA_MAP.get(clazz);
}else{
Schema<T> schema = RuntimeSchema.createFrom(clazz);
SCHEMA_MAP.put(clazz,schema);
return schema;
}
}
public static <T> T deserializer(byte[] bytes,Class<T> clazz) {
if(WRAPPER_CLASS_SET.contains(clazz)){
SerializeDeserializeWrapperObj<T> obj = new SerializeDeserializeWrapperObj<>();
ProtostuffIOUtil.mergeFrom(bytes, obj, WRAPPER_SCHEMA);
return obj.getData();
}else{
Schema<T> schema = getSchema(clazz);
T obj = null;
try {
obj = clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
return null;
}
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
}
public static Object deserializerToObj(byte[] bytes,Class clazz) {
if(WRAPPER_CLASS_SET.contains(clazz)){
SerializeDeserializeWrapperObj obj = new SerializeDeserializeWrapperObj<>();
ProtostuffIOUtil.mergeFrom(bytes, obj, WRAPPER_SCHEMA);
return obj.getData();
}else{
Schema schema = RuntimeSchema.createFrom(clazz);
Object obj = null;
try {
obj = clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
return null;
}
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
}
public static <T> T deserializerV1(byte[] bytes,Class<T> clazz) throws IllegalAccessException, InstantiationException {
Schema<T> schema = getSchema(clazz);
T obj = clazz.newInstance();
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
private static class SerializeDeserializeWrapperObj<T> {
private T data;
public static <T> SerializeDeserializeWrapperObj<T> builder(T data) {
SerializeDeserializeWrapperObj<T> wrapper = new SerializeDeserializeWrapperObj<>();
wrapper.setData(data);
return wrapper;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
}
}
rewrite ShareDataSessionImpl
/* * Copyright 2014 Red Hat, Inc. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Apache License v2.0 which accompanies this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * * The Apache License v2.0 is available at * http://www.opensource.org/licenses/apache2.0.php * * You may elect to redistribute this code under either of these licenses. */
package com.ly.session;
import com.ly.ProtoStuffUtil;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.impl.ClusterSerializable;
import io.vertx.ext.auth.VertxContextPRNG;
import io.vertx.ext.web.Session;
import io.vertx.ext.web.impl.Utils;
import io.vertx.ext.web.sstore.AbstractSession;
import io.vertx.ext.web.sstore.SessionStore;
import io.vertx.ext.web.sstore.redis.RedisSessionStore;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import static io.vertx.redis.client.Command.*;
import static io.vertx.redis.client.Request.cmd;
/** * @author <a href="http://tfox.org">Tim Fox</a> */
public class ReactiveRedisSessionStore implements SessionStore {
private Redis redis;
private VertxContextPRNG random;
private long retryTimeout;
@Override
public SessionStore init(Vertx vertx, JsonObject options) {
Objects.requireNonNull(options, "options are required");
long timeout = options.getLong("retryTimeout", RedisSessionStore.DEFAULT_RETRY_TIMEOUT_MS);
Redis redis = Redis.createClient(vertx, new RedisOptions(options));
return init(vertx, timeout, redis);
}
public SessionStore init(Vertx vertx, long retryTimeout, Redis redis) {
random = VertxContextPRNG.current(vertx);
this.retryTimeout = retryTimeout;
this.redis = Objects.requireNonNull(redis, "redis is required");
return this;
}
@Override
public long retryTimeout() {
return retryTimeout;
}
@Override
public Session createSession(long timeout) {
return createSession(timeout, DEFAULT_SESSIONID_LENGTH);
}
@Override
public Session createSession(long timeout, int length) {
return new RedisShardDataSessionImpl(random, timeout, length);
}
public ReactiveRedisSessionStore() {
super();
}
@Override
public void get(String id, Handler<AsyncResult<Session>> resultHandler) {
redis.send(cmd(GET).arg(id), resGet -> {
if (resGet.failed()) {
resultHandler.handle(Future.failedFuture(resGet.cause()));
return;
}
Response response = resGet.result();
if (response != null) {
RedisShardDataSessionImpl session = new RedisShardDataSessionImpl(random);
session.readFromBuffer(0, response.toBuffer());
// postpone expiration time, this cannot be done in a single frame with GET cmd
// redis.send(cmd(PEXPIRE).arg(id).arg(session.timeout()), resExpire -> {
// if (resExpire.failed()) {
// resultHandler.handle(Future.failedFuture(resExpire.cause()));
// } else {
// resultHandler.handle(Future.succeededFuture(session));
// }
// });
resultHandler.handle(Future.succeededFuture(session));
} else {
resultHandler.handle(Future.succeededFuture());
}
});
}
@Override
public void delete(String id, Handler<AsyncResult<Void>> resultHandler) {
redis.send(cmd(DEL).arg(id), res -> {
if (res.failed()) {
resultHandler.handle(Future.failedFuture(res.cause()));
} else {
resultHandler.handle(Future.succeededFuture());
}
});
}
@Override
public void put(Session session, Handler<AsyncResult<Void>> resultHandler) {
Buffer buffer = Buffer.buffer();
RedisShardDataSessionImpl sessionImpl = (RedisShardDataSessionImpl) session;
sessionImpl.writeToBuffer(buffer);
// submit with all session data & expiration TO in ms
Request rq = cmd(SET)
.arg(session.id()).arg(buffer)
.arg("PX").arg(session.timeout());
redis.send(rq, res -> {
if (res.failed()) {
resultHandler.handle(Future.failedFuture(res.cause()));
} else {
resultHandler.handle(Future.succeededFuture());
}
});
}
@Override
public void clear(Handler<AsyncResult<Void>> resultHandler) {
}
@Override
public void size(Handler<AsyncResult<Integer>> resultHandler) {
}
@Override
public void close() {
}
public static class RedisShardDataSessionImpl extends AbstractSession{
private static final Charset UTF8 = StandardCharsets.UTF_8;
private static final byte TYPE_LONG = 1;
private static final byte TYPE_INT = 2;
private static final byte TYPE_SHORT = 3;
private static final byte TYPE_BYTE = 4;
private static final byte TYPE_DOUBLE = 5;
private static final byte TYPE_FLOAT = 6;
private static final byte TYPE_CHAR = 7;
private static final byte TYPE_BOOLEAN = 8;
private static final byte TYPE_STRING = 9;
private static final byte TYPE_BUFFER = 10;
private static final byte TYPE_BYTES = 11;
private static final byte TYPE_CLUSTER_SERIALIZABLE = 13;
private static final byte TYPE_PB_SERIALIZABLE = 14;
/** * Important note: This constructor (even though not referenced anywhere) is required for serialization purposes. Do * not remove. */
public RedisShardDataSessionImpl() {
super();
}
public RedisShardDataSessionImpl(VertxContextPRNG random) {
super(random);
}
public RedisShardDataSessionImpl(VertxContextPRNG random, long timeout, int length) {
super(random, timeout, length);
}
public void writeToBuffer(Buffer buff) {
byte[] bytes = id().getBytes(UTF8);
buff.appendInt(bytes.length).appendBytes(bytes);
buff.appendLong(timeout());
buff.appendLong(lastAccessed());
buff.appendInt(version());
// use cache
Buffer dataBuf = writeDataToBuffer();
buff.appendBuffer(dataBuf);
}
public int readFromBuffer(int pos, Buffer buffer) {
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
pos += len;
setId(new String(bytes, UTF8));
setTimeout(buffer.getLong(pos));
pos += 8;
setLastAccessed(buffer.getLong(pos));
pos += 8;
setVersion(buffer.getInt(pos));
pos += 4;
pos = readDataFromBuffer(pos, buffer);
return pos;
}
private Buffer writeDataToBuffer() {
Buffer buffer = Buffer.buffer();
if (isEmpty()) {
buffer.appendInt(0);
} else {
final Map<String, Object> data = data();
buffer.appendInt(data.size());
for (Map.Entry<String, Object> entry : data.entrySet()) {
String key = entry.getKey();
byte[] keyBytes = key.getBytes(UTF8);
buffer.appendInt(keyBytes.length).appendBytes(keyBytes);
Object val = entry.getValue();
if (val instanceof Long) {
buffer.appendByte(TYPE_LONG).appendLong((long) val);
} else if (val instanceof Integer) {
buffer.appendByte(TYPE_INT).appendInt((int) val);
} else if (val instanceof Short) {
buffer.appendByte(TYPE_SHORT).appendShort((short) val);
} else if (val instanceof Byte) {
buffer.appendByte(TYPE_BYTE).appendByte((byte) val);
} else if (val instanceof Double) {
buffer.appendByte(TYPE_DOUBLE).appendDouble((double) val);
} else if (val instanceof Float) {
buffer.appendByte(TYPE_FLOAT).appendFloat((float) val);
} else if (val instanceof Character) {
buffer.appendByte(TYPE_CHAR).appendShort((short) ((Character) val).charValue());
} else if (val instanceof Boolean) {
buffer.appendByte(TYPE_BOOLEAN).appendByte((byte) ((boolean) val ? 1 : 0));
} else if (val instanceof String) {
byte[] bytes = ((String) val).getBytes(UTF8);
buffer.appendByte(TYPE_STRING).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof Buffer) {
Buffer buff = (Buffer) val;
buffer.appendByte(TYPE_BUFFER).appendInt(buff.length()).appendBuffer(buff);
} else if (val instanceof byte[]) {
byte[] bytes = (byte[]) val;
buffer.appendByte(TYPE_BYTES).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof ClusterSerializable) {
buffer.appendByte(TYPE_CLUSTER_SERIALIZABLE);
String className = val.getClass().getName();
byte[] classNameBytes = className.getBytes(UTF8);
buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);
((ClusterSerializable) val).writeToBuffer(buffer);
} else {
// Default serialization to pb
buffer.appendByte(TYPE_PB_SERIALIZABLE);
String className = val.getClass().getName();
byte[] classNameBytes = className.getBytes(UTF8);
buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);
byte[] serializedBytes = ProtoStuffUtil.serializer(val);
buffer.appendInt(serializedBytes.length).appendBytes(serializedBytes);
}
}
}
return buffer;
}
private int readDataFromBuffer(int pos, Buffer buffer) {
try {
int entries = buffer.getInt(pos);
pos += 4;
if (entries > 0) {
final Map<String, Object> data = new ConcurrentHashMap<>(entries);
for (int i = 0; i < entries; i++) {
int keylen = buffer.getInt(pos);
pos += 4;
byte[] keyBytes = buffer.getBytes(pos, pos + keylen);
pos += keylen;
String key = new String(keyBytes, UTF8);
byte type = buffer.getByte(pos++);
Object val;
switch (type) {
case TYPE_LONG:
val = buffer.getLong(pos);
pos += 8;
break;
case TYPE_INT:
val = buffer.getInt(pos);
pos += 4;
break;
case TYPE_SHORT:
val = buffer.getShort(pos);
pos += 2;
break;
case TYPE_BYTE:
val = buffer.getByte(pos);
pos++;
break;
case TYPE_FLOAT:
val = buffer.getFloat(pos);
pos += 4;
break;
case TYPE_DOUBLE:
val = buffer.getDouble(pos);
pos += 8;
break;
case TYPE_CHAR:
short s = buffer.getShort(pos);
pos += 2;
val = (char) s;
break;
case TYPE_BOOLEAN:
byte b = buffer.getByte(pos);
pos++;
val = b == 1;
break;
case TYPE_STRING:
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
val = new String(bytes, UTF8);
pos += len;
break;
case TYPE_BUFFER:
len = buffer.getInt(pos);
pos += 4;
bytes = buffer.getBytes(pos, pos + len);
val = Buffer.buffer(bytes);
pos += len;
break;
case TYPE_BYTES:
len = buffer.getInt(pos);
pos += 4;
val = buffer.getBytes(pos, pos + len);
pos += len;
break;
case TYPE_CLUSTER_SERIALIZABLE:
int classNameLen = buffer.getInt(pos);
pos += 4;
byte[] classNameBytes = buffer.getBytes(pos, pos + classNameLen);
pos += classNameLen;
String className = new String(classNameBytes, UTF8);
Class<?> clazz = Utils.getClassLoader().loadClass(className);
if (!ClusterSerializable.class.isAssignableFrom(clazz)) {
throw new ClassCastException(new String(classNameBytes) + " is not assignable from ClusterSerializable");
}
ClusterSerializable obj = (ClusterSerializable) clazz.getDeclaredConstructor().newInstance();
pos = obj.readFromBuffer(pos, buffer);
val = obj;
break;
case TYPE_PB_SERIALIZABLE:
classNameLen = buffer.getInt(pos);
pos += 4;
classNameBytes = buffer.getBytes(pos, pos + classNameLen);
pos += classNameLen;
className = new String(classNameBytes, UTF8);
Class<?> clazzByPb = Utils.getClassLoader().loadClass(className);
int serializeObjLength = buffer.getInt(pos);
pos+=4;
byte[] serializeObjBytes = buffer.getBytes(pos,pos+serializeObjLength);
pos+=serializeObjLength;
val = ProtoStuffUtil.deserializerToObj(serializeObjBytes,clazzByPb);
break;
default:
throw new IllegalStateException("Invalid serialized type: " + type);
}
data.put(key, val);
}
setData(data);
}
return pos;
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
throw new VertxException(e);
}
}
}
}
When serializing, the default is pb Do serialization , When deserializing , If the type is pb Just use pb Do deserialization .

Look at the complete code github
https://github.com/haozhi-ly/spring-boot-tutorial/tree/master/spring-boot-vertx
边栏推荐
- 研学旅游实践教育的开展助力文旅产业发展
- Comparison table of foreign lead American abbreviations
- 如何让化工企业的ERP库存账目更准确
- ts 之 泛型
- Prior knowledge of machine learning in probability theory (Part 1)
- Open source SPL eliminates tens of thousands of database intermediate tables
- Material design component - use bottomsheet to show extended content (II)
- Graph embedding learning notes
- Clion-MinGW编译后的exe文件添加ico图标
- Web Service简单入门示例
猜你喜欢

leetcode:1755. 最接近目标值的子序列和

Duchefa d5124 md5a medium Chinese and English instructions

【案例】定位的运用-淘宝轮播图

教你自己训练的pytorch模型转caffe(三)

Écrire une interface basée sur flask

使用WebAssembly在浏览器端操作Excel

Enclosed please find. Net Maui's latest learning resources

木板ISO 5660-1 热量释放速率摸底测试

Mathematical analysis_ Notes_ Chapter 9: curve integral and surface integral

学习机器人无从下手?带你体会当下机器人热门研究方向有哪些
随机推荐
树莓派4B上ncnn转换出来的模型调用时总是崩溃(Segment Fault)的原因
ts 之 泛型
Prosci LAG-3 recombinant protein specification
100 cases of shell programming
从架构上详解技术(SLB,Redis,Mysql,Kafka,Clickhouse)的各类热点问题
Influence of oscilloscope probe on measurement bandwidth
Analysis of steam education mode under the integration of five Education
台风来袭!建筑工地该如何防范台风!
Abnova丨DNA 标记高质量控制测试方案
Wanglaoji pharmaceutical's public welfare activity of "caring for the most lovely people under the scorching sun" was launched in Nanjing
Abnova丨培养细胞总 RNA 纯化试剂盒中英文说明书
学习机器人无从下手?带你体会当下机器人热门研究方向有哪些
SYSTEMd resolved enable debug log
Binary search
hdu2377Bus Pass(构建更复杂的图+spfa)
Analyze the knowledge transfer and sharing spirit of maker Education
示波器探头对测量带宽的影响
Mode - "Richter replacement principle"
中国的软件公司为什么做不出产品?00后抛弃互联网;B站开源的高性能API网关组件|码农周刊VIP会员专属邮件周报 Vol.097
《SAS编程和数据挖掘商业案例》学习笔记# 19