当前位置:网站首页>基于vertx-web-sstore-redis的改造实现vertx http应用的分布式session
基于vertx-web-sstore-redis的改造实现vertx http应用的分布式session
2022-07-05 20:58:00 【forwardMyLife】
1. vertx 自身基于redis的分布式session的不足
vertx 本身有redis的分布式session的实现。只需要引入如下依赖
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-sstore-redis</artifactId>
<version>4.1.4</version>
</dependency>
RedisOptions redisOptions = new RedisOptions();
redisOptions.addConnectionString("redis://192.168.72.8:6379");
Redis redis = new RedisClient(vertx,redisOptions);
RedisSessionStore redisSessionStore = RedisSessionStore.create(vertx,redis);
//SessionStore sessionStore = SessionStore.create(vertx);
router.routeWithRegex("/static.*").handler(StaticHandler.create());
router.routeWithRegex(".*service.*")
.handler(SessionHandler.create(redisSessionStore)).handler(ctx->{
if(ctx.request().path().contains("initSession")){
ctx.request().response().setStatusCode(401)
.putHeader("Content-Type","application/json").end((new JsonObject().put("msg","illegal request").encode()));
}else{
ctx.next();
}
})
只需要将原本的SessionStore换成RedisSessionStore即可。
其本身的Redis客户端是基于vertx实现的,对redis的读写是异步的,不会阻塞io线程。但是其目前的RedisStore实现有个很致命的问题,不能直接支持对自定义对象或者jdk原有的常用对象如ArrayList,HashMap做序列化和反序列化,除8大基本类型和其包装类外,其他的复杂对象都要实现其序列化接口。ClusterSerializable ,
package io.vertx.core.shareddata.impl;
import io.vertx.core.buffer.Buffer;
/** * Objects implementing this interface will be write to and read from a {@link Buffer} when respectively * stored and read from an {@link io.vertx.core.shareddata.AsyncMap}. * <p> * Implementations must have a public no-argument constructor. * * @author <a href="http://tfox.org">Tim Fox</a> */
public interface ClusterSerializable {
void writeToBuffer(Buffer buffer);
int readFromBuffer(int pos, Buffer buffer);
}
/* * Copyright 2014 Red Hat, Inc. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Apache License v2.0 which accompanies this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * * The Apache License v2.0 is available at * http://www.opensource.org/licenses/apache2.0.php * * You may elect to redistribute this code under either of these licenses. */
package io.vertx.ext.web.sstore.impl;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.shareddata.Shareable;
import io.vertx.core.shareddata.impl.ClusterSerializable;
import io.vertx.ext.auth.VertxContextPRNG;
import io.vertx.ext.web.impl.Utils;
import io.vertx.ext.web.sstore.AbstractSession;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** * @author <a href="http://tfox.org">Tim Fox</a> */
public class SharedDataSessionImpl extends AbstractSession implements ClusterSerializable, Shareable {
private static final Charset UTF8 = StandardCharsets.UTF_8;
private static final byte TYPE_LONG = 1;
private static final byte TYPE_INT = 2;
private static final byte TYPE_SHORT = 3;
private static final byte TYPE_BYTE = 4;
private static final byte TYPE_DOUBLE = 5;
private static final byte TYPE_FLOAT = 6;
private static final byte TYPE_CHAR = 7;
private static final byte TYPE_BOOLEAN = 8;
private static final byte TYPE_STRING = 9;
private static final byte TYPE_BUFFER = 10;
private static final byte TYPE_BYTES = 11;
private static final byte TYPE_CLUSTER_SERIALIZABLE = 13;
/** * Important note: This constructor (even though not referenced anywhere) is required for serialization purposes. Do * not remove. */
public SharedDataSessionImpl() {
super();
}
public SharedDataSessionImpl(VertxContextPRNG random) {
super(random);
}
public SharedDataSessionImpl(VertxContextPRNG random, long timeout, int length) {
super(random, timeout, length);
}
@Override
public void writeToBuffer(Buffer buff) {
byte[] bytes = id().getBytes(UTF8);
buff.appendInt(bytes.length).appendBytes(bytes);
buff.appendLong(timeout());
buff.appendLong(lastAccessed());
buff.appendInt(version());
// use cache
Buffer dataBuf = writeDataToBuffer();
buff.appendBuffer(dataBuf);
}
@Override
public int readFromBuffer(int pos, Buffer buffer) {
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
pos += len;
setId(new String(bytes, UTF8));
setTimeout(buffer.getLong(pos));
pos += 8;
setLastAccessed(buffer.getLong(pos));
pos += 8;
setVersion(buffer.getInt(pos));
pos += 4;
pos = readDataFromBuffer(pos, buffer);
return pos;
}
private Buffer writeDataToBuffer() {
Buffer buffer = Buffer.buffer();
if (isEmpty()) {
buffer.appendInt(0);
} else {
final Map<String, Object> data = data();
buffer.appendInt(data.size());
for (Map.Entry<String, Object> entry : data.entrySet()) {
String key = entry.getKey();
byte[] keyBytes = key.getBytes(UTF8);
buffer.appendInt(keyBytes.length).appendBytes(keyBytes);
Object val = entry.getValue();
if (val instanceof Long) {
buffer.appendByte(TYPE_LONG).appendLong((long) val);
} else if (val instanceof Integer) {
buffer.appendByte(TYPE_INT).appendInt((int) val);
} else if (val instanceof Short) {
buffer.appendByte(TYPE_SHORT).appendShort((short) val);
} else if (val instanceof Byte) {
buffer.appendByte(TYPE_BYTE).appendByte((byte) val);
} else if (val instanceof Double) {
buffer.appendByte(TYPE_DOUBLE).appendDouble((double) val);
} else if (val instanceof Float) {
buffer.appendByte(TYPE_FLOAT).appendFloat((float) val);
} else if (val instanceof Character) {
buffer.appendByte(TYPE_CHAR).appendShort((short) ((Character) val).charValue());
} else if (val instanceof Boolean) {
buffer.appendByte(TYPE_BOOLEAN).appendByte((byte) ((boolean) val ? 1 : 0));
} else if (val instanceof String) {
byte[] bytes = ((String) val).getBytes(UTF8);
buffer.appendByte(TYPE_STRING).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof Buffer) {
Buffer buff = (Buffer) val;
buffer.appendByte(TYPE_BUFFER).appendInt(buff.length()).appendBuffer(buff);
} else if (val instanceof byte[]) {
byte[] bytes = (byte[]) val;
buffer.appendByte(TYPE_BYTES).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof ClusterSerializable) {
buffer.appendByte(TYPE_CLUSTER_SERIALIZABLE);
String className = val.getClass().getName();
byte[] classNameBytes = className.getBytes(UTF8);
buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);
((ClusterSerializable) val).writeToBuffer(buffer);
} else {
if (val != null) {
throw new IllegalStateException("Invalid type for data in session: " + val.getClass());
}
}
}
}
return buffer;
}
private int readDataFromBuffer(int pos, Buffer buffer) {
try {
int entries = buffer.getInt(pos);
pos += 4;
if (entries > 0) {
final Map<String, Object> data = new ConcurrentHashMap<>(entries);
for (int i = 0; i < entries; i++) {
int keylen = buffer.getInt(pos);
pos += 4;
byte[] keyBytes = buffer.getBytes(pos, pos + keylen);
pos += keylen;
String key = new String(keyBytes, UTF8);
byte type = buffer.getByte(pos++);
Object val;
switch (type) {
case TYPE_LONG:
val = buffer.getLong(pos);
pos += 8;
break;
case TYPE_INT:
val = buffer.getInt(pos);
pos += 4;
break;
case TYPE_SHORT:
val = buffer.getShort(pos);
pos += 2;
break;
case TYPE_BYTE:
val = buffer.getByte(pos);
pos++;
break;
case TYPE_FLOAT:
val = buffer.getFloat(pos);
pos += 4;
break;
case TYPE_DOUBLE:
val = buffer.getDouble(pos);
pos += 8;
break;
case TYPE_CHAR:
short s = buffer.getShort(pos);
pos += 2;
val = (char) s;
break;
case TYPE_BOOLEAN:
byte b = buffer.getByte(pos);
pos++;
val = b == 1;
break;
case TYPE_STRING:
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
val = new String(bytes, UTF8);
pos += len;
break;
case TYPE_BUFFER:
len = buffer.getInt(pos);
pos += 4;
bytes = buffer.getBytes(pos, pos + len);
val = Buffer.buffer(bytes);
pos += len;
break;
case TYPE_BYTES:
len = buffer.getInt(pos);
pos += 4;
val = buffer.getBytes(pos, pos + len);
pos += len;
break;
case TYPE_CLUSTER_SERIALIZABLE:
int classNameLen = buffer.getInt(pos);
pos += 4;
byte[] classNameBytes = buffer.getBytes(pos, pos + classNameLen);
pos += classNameLen;
String className = new String(classNameBytes, UTF8);
Class<?> clazz = Utils.getClassLoader().loadClass(className);
if (!ClusterSerializable.class.isAssignableFrom(clazz)) {
throw new ClassCastException(new String(classNameBytes) + " is not assignable from ClusterSerializable");
}
ClusterSerializable obj = (ClusterSerializable) clazz.getDeclaredConstructor().newInstance();
pos = obj.readFromBuffer(pos, buffer);
val = obj;
break;
default:
throw new IllegalStateException("Invalid serialized type: " + type);
}
data.put(key, val);
}
setData(data);
}
return pos;
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
throw new VertxException(e);
}
}
}
复杂对象如果没有实现ClusterSerializable 接口,就会提示IllegalStateException。
2. 改写ShareDataSessionImpl
所以我们在原有的序列化类型上加上PB类型,复杂的对象我们都采取pb进行序列化和反序列化。
pb序列化工具类
package com.ly;
import com.ly.entity.Good;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class ProtoStuffUtil {
private static final Map<Class<?>,Schema<?>> SCHEMA_MAP = new ConcurrentHashMap<>();
private static final Class<SerializeDeserializeWrapperObj> SERIALIZE_DESERIALIZE_WRAPPER_OBJ_CLASS =
SerializeDeserializeWrapperObj.class;
private static final Schema<SerializeDeserializeWrapperObj> WRAPPER_SCHEMA =
RuntimeSchema.createFrom(SERIALIZE_DESERIALIZE_WRAPPER_OBJ_CLASS);
private static final Set<Class<?>> WRAPPER_CLASS_SET = new HashSet<>();
static{
WRAPPER_CLASS_SET.add(List.class);
WRAPPER_CLASS_SET.add(Integer.class);
WRAPPER_CLASS_SET.add(Boolean.class);
WRAPPER_CLASS_SET.add(Character.class);
WRAPPER_CLASS_SET.add(Double.class);
WRAPPER_CLASS_SET.add(int.class);
WRAPPER_CLASS_SET.add(boolean.class);
WRAPPER_CLASS_SET.add(char.class);
WRAPPER_CLASS_SET.add(double.class);
WRAPPER_CLASS_SET.add(ArrayList.class);
WRAPPER_CLASS_SET.add(Set.class);
WRAPPER_CLASS_SET.add(Map.class);
WRAPPER_CLASS_SET.add(HashMap.class);
WRAPPER_CLASS_SET.add(Date.class);
}
public static <T> byte[] serializer(T o){
if(WRAPPER_CLASS_SET.contains(o.getClass())){
return ProtostuffIOUtil.toByteArray(SerializeDeserializeWrapperObj.builder(o),WRAPPER_SCHEMA, LinkedBuffer.allocate(1024));
}else{
return ProtostuffIOUtil.toByteArray(o,getSchema(o.getClass()), LinkedBuffer.allocate(1024));
}
}
public static <T> byte[] serializerV1(T o){
Schema<T> schema = getSchema(o.getClass());
return ProtostuffIOUtil.toByteArray(o,schema, LinkedBuffer.allocate(1024));
}
public static <T> Schema getSchema(Class<T> clazz){
if(SCHEMA_MAP.containsKey(clazz)){
return SCHEMA_MAP.get(clazz);
}else{
Schema<T> schema = RuntimeSchema.createFrom(clazz);
SCHEMA_MAP.put(clazz,schema);
return schema;
}
}
public static <T> T deserializer(byte[] bytes,Class<T> clazz) {
if(WRAPPER_CLASS_SET.contains(clazz)){
SerializeDeserializeWrapperObj<T> obj = new SerializeDeserializeWrapperObj<>();
ProtostuffIOUtil.mergeFrom(bytes, obj, WRAPPER_SCHEMA);
return obj.getData();
}else{
Schema<T> schema = getSchema(clazz);
T obj = null;
try {
obj = clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
return null;
}
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
}
public static Object deserializerToObj(byte[] bytes,Class clazz) {
if(WRAPPER_CLASS_SET.contains(clazz)){
SerializeDeserializeWrapperObj obj = new SerializeDeserializeWrapperObj<>();
ProtostuffIOUtil.mergeFrom(bytes, obj, WRAPPER_SCHEMA);
return obj.getData();
}else{
Schema schema = RuntimeSchema.createFrom(clazz);
Object obj = null;
try {
obj = clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
return null;
}
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
}
public static <T> T deserializerV1(byte[] bytes,Class<T> clazz) throws IllegalAccessException, InstantiationException {
Schema<T> schema = getSchema(clazz);
T obj = clazz.newInstance();
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
private static class SerializeDeserializeWrapperObj<T> {
private T data;
public static <T> SerializeDeserializeWrapperObj<T> builder(T data) {
SerializeDeserializeWrapperObj<T> wrapper = new SerializeDeserializeWrapperObj<>();
wrapper.setData(data);
return wrapper;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
}
}
改写ShareDataSessionImpl
/* * Copyright 2014 Red Hat, Inc. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Apache License v2.0 which accompanies this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * * The Apache License v2.0 is available at * http://www.opensource.org/licenses/apache2.0.php * * You may elect to redistribute this code under either of these licenses. */
package com.ly.session;
import com.ly.ProtoStuffUtil;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.impl.ClusterSerializable;
import io.vertx.ext.auth.VertxContextPRNG;
import io.vertx.ext.web.Session;
import io.vertx.ext.web.impl.Utils;
import io.vertx.ext.web.sstore.AbstractSession;
import io.vertx.ext.web.sstore.SessionStore;
import io.vertx.ext.web.sstore.redis.RedisSessionStore;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import static io.vertx.redis.client.Command.*;
import static io.vertx.redis.client.Request.cmd;
/** * @author <a href="http://tfox.org">Tim Fox</a> */
public class ReactiveRedisSessionStore implements SessionStore {
private Redis redis;
private VertxContextPRNG random;
private long retryTimeout;
@Override
public SessionStore init(Vertx vertx, JsonObject options) {
Objects.requireNonNull(options, "options are required");
long timeout = options.getLong("retryTimeout", RedisSessionStore.DEFAULT_RETRY_TIMEOUT_MS);
Redis redis = Redis.createClient(vertx, new RedisOptions(options));
return init(vertx, timeout, redis);
}
public SessionStore init(Vertx vertx, long retryTimeout, Redis redis) {
random = VertxContextPRNG.current(vertx);
this.retryTimeout = retryTimeout;
this.redis = Objects.requireNonNull(redis, "redis is required");
return this;
}
@Override
public long retryTimeout() {
return retryTimeout;
}
@Override
public Session createSession(long timeout) {
return createSession(timeout, DEFAULT_SESSIONID_LENGTH);
}
@Override
public Session createSession(long timeout, int length) {
return new RedisShardDataSessionImpl(random, timeout, length);
}
public ReactiveRedisSessionStore() {
super();
}
@Override
public void get(String id, Handler<AsyncResult<Session>> resultHandler) {
redis.send(cmd(GET).arg(id), resGet -> {
if (resGet.failed()) {
resultHandler.handle(Future.failedFuture(resGet.cause()));
return;
}
Response response = resGet.result();
if (response != null) {
RedisShardDataSessionImpl session = new RedisShardDataSessionImpl(random);
session.readFromBuffer(0, response.toBuffer());
// postpone expiration time, this cannot be done in a single frame with GET cmd
// redis.send(cmd(PEXPIRE).arg(id).arg(session.timeout()), resExpire -> {
// if (resExpire.failed()) {
// resultHandler.handle(Future.failedFuture(resExpire.cause()));
// } else {
// resultHandler.handle(Future.succeededFuture(session));
// }
// });
resultHandler.handle(Future.succeededFuture(session));
} else {
resultHandler.handle(Future.succeededFuture());
}
});
}
@Override
public void delete(String id, Handler<AsyncResult<Void>> resultHandler) {
redis.send(cmd(DEL).arg(id), res -> {
if (res.failed()) {
resultHandler.handle(Future.failedFuture(res.cause()));
} else {
resultHandler.handle(Future.succeededFuture());
}
});
}
@Override
public void put(Session session, Handler<AsyncResult<Void>> resultHandler) {
Buffer buffer = Buffer.buffer();
RedisShardDataSessionImpl sessionImpl = (RedisShardDataSessionImpl) session;
sessionImpl.writeToBuffer(buffer);
// submit with all session data & expiration TO in ms
Request rq = cmd(SET)
.arg(session.id()).arg(buffer)
.arg("PX").arg(session.timeout());
redis.send(rq, res -> {
if (res.failed()) {
resultHandler.handle(Future.failedFuture(res.cause()));
} else {
resultHandler.handle(Future.succeededFuture());
}
});
}
@Override
public void clear(Handler<AsyncResult<Void>> resultHandler) {
}
@Override
public void size(Handler<AsyncResult<Integer>> resultHandler) {
}
@Override
public void close() {
}
public static class RedisShardDataSessionImpl extends AbstractSession{
private static final Charset UTF8 = StandardCharsets.UTF_8;
private static final byte TYPE_LONG = 1;
private static final byte TYPE_INT = 2;
private static final byte TYPE_SHORT = 3;
private static final byte TYPE_BYTE = 4;
private static final byte TYPE_DOUBLE = 5;
private static final byte TYPE_FLOAT = 6;
private static final byte TYPE_CHAR = 7;
private static final byte TYPE_BOOLEAN = 8;
private static final byte TYPE_STRING = 9;
private static final byte TYPE_BUFFER = 10;
private static final byte TYPE_BYTES = 11;
private static final byte TYPE_CLUSTER_SERIALIZABLE = 13;
private static final byte TYPE_PB_SERIALIZABLE = 14;
/** * Important note: This constructor (even though not referenced anywhere) is required for serialization purposes. Do * not remove. */
public RedisShardDataSessionImpl() {
super();
}
public RedisShardDataSessionImpl(VertxContextPRNG random) {
super(random);
}
public RedisShardDataSessionImpl(VertxContextPRNG random, long timeout, int length) {
super(random, timeout, length);
}
public void writeToBuffer(Buffer buff) {
byte[] bytes = id().getBytes(UTF8);
buff.appendInt(bytes.length).appendBytes(bytes);
buff.appendLong(timeout());
buff.appendLong(lastAccessed());
buff.appendInt(version());
// use cache
Buffer dataBuf = writeDataToBuffer();
buff.appendBuffer(dataBuf);
}
public int readFromBuffer(int pos, Buffer buffer) {
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
pos += len;
setId(new String(bytes, UTF8));
setTimeout(buffer.getLong(pos));
pos += 8;
setLastAccessed(buffer.getLong(pos));
pos += 8;
setVersion(buffer.getInt(pos));
pos += 4;
pos = readDataFromBuffer(pos, buffer);
return pos;
}
private Buffer writeDataToBuffer() {
Buffer buffer = Buffer.buffer();
if (isEmpty()) {
buffer.appendInt(0);
} else {
final Map<String, Object> data = data();
buffer.appendInt(data.size());
for (Map.Entry<String, Object> entry : data.entrySet()) {
String key = entry.getKey();
byte[] keyBytes = key.getBytes(UTF8);
buffer.appendInt(keyBytes.length).appendBytes(keyBytes);
Object val = entry.getValue();
if (val instanceof Long) {
buffer.appendByte(TYPE_LONG).appendLong((long) val);
} else if (val instanceof Integer) {
buffer.appendByte(TYPE_INT).appendInt((int) val);
} else if (val instanceof Short) {
buffer.appendByte(TYPE_SHORT).appendShort((short) val);
} else if (val instanceof Byte) {
buffer.appendByte(TYPE_BYTE).appendByte((byte) val);
} else if (val instanceof Double) {
buffer.appendByte(TYPE_DOUBLE).appendDouble((double) val);
} else if (val instanceof Float) {
buffer.appendByte(TYPE_FLOAT).appendFloat((float) val);
} else if (val instanceof Character) {
buffer.appendByte(TYPE_CHAR).appendShort((short) ((Character) val).charValue());
} else if (val instanceof Boolean) {
buffer.appendByte(TYPE_BOOLEAN).appendByte((byte) ((boolean) val ? 1 : 0));
} else if (val instanceof String) {
byte[] bytes = ((String) val).getBytes(UTF8);
buffer.appendByte(TYPE_STRING).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof Buffer) {
Buffer buff = (Buffer) val;
buffer.appendByte(TYPE_BUFFER).appendInt(buff.length()).appendBuffer(buff);
} else if (val instanceof byte[]) {
byte[] bytes = (byte[]) val;
buffer.appendByte(TYPE_BYTES).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof ClusterSerializable) {
buffer.appendByte(TYPE_CLUSTER_SERIALIZABLE);
String className = val.getClass().getName();
byte[] classNameBytes = className.getBytes(UTF8);
buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);
((ClusterSerializable) val).writeToBuffer(buffer);
} else {
// 默认序列化成pb
buffer.appendByte(TYPE_PB_SERIALIZABLE);
String className = val.getClass().getName();
byte[] classNameBytes = className.getBytes(UTF8);
buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);
byte[] serializedBytes = ProtoStuffUtil.serializer(val);
buffer.appendInt(serializedBytes.length).appendBytes(serializedBytes);
}
}
}
return buffer;
}
private int readDataFromBuffer(int pos, Buffer buffer) {
try {
int entries = buffer.getInt(pos);
pos += 4;
if (entries > 0) {
final Map<String, Object> data = new ConcurrentHashMap<>(entries);
for (int i = 0; i < entries; i++) {
int keylen = buffer.getInt(pos);
pos += 4;
byte[] keyBytes = buffer.getBytes(pos, pos + keylen);
pos += keylen;
String key = new String(keyBytes, UTF8);
byte type = buffer.getByte(pos++);
Object val;
switch (type) {
case TYPE_LONG:
val = buffer.getLong(pos);
pos += 8;
break;
case TYPE_INT:
val = buffer.getInt(pos);
pos += 4;
break;
case TYPE_SHORT:
val = buffer.getShort(pos);
pos += 2;
break;
case TYPE_BYTE:
val = buffer.getByte(pos);
pos++;
break;
case TYPE_FLOAT:
val = buffer.getFloat(pos);
pos += 4;
break;
case TYPE_DOUBLE:
val = buffer.getDouble(pos);
pos += 8;
break;
case TYPE_CHAR:
short s = buffer.getShort(pos);
pos += 2;
val = (char) s;
break;
case TYPE_BOOLEAN:
byte b = buffer.getByte(pos);
pos++;
val = b == 1;
break;
case TYPE_STRING:
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
val = new String(bytes, UTF8);
pos += len;
break;
case TYPE_BUFFER:
len = buffer.getInt(pos);
pos += 4;
bytes = buffer.getBytes(pos, pos + len);
val = Buffer.buffer(bytes);
pos += len;
break;
case TYPE_BYTES:
len = buffer.getInt(pos);
pos += 4;
val = buffer.getBytes(pos, pos + len);
pos += len;
break;
case TYPE_CLUSTER_SERIALIZABLE:
int classNameLen = buffer.getInt(pos);
pos += 4;
byte[] classNameBytes = buffer.getBytes(pos, pos + classNameLen);
pos += classNameLen;
String className = new String(classNameBytes, UTF8);
Class<?> clazz = Utils.getClassLoader().loadClass(className);
if (!ClusterSerializable.class.isAssignableFrom(clazz)) {
throw new ClassCastException(new String(classNameBytes) + " is not assignable from ClusterSerializable");
}
ClusterSerializable obj = (ClusterSerializable) clazz.getDeclaredConstructor().newInstance();
pos = obj.readFromBuffer(pos, buffer);
val = obj;
break;
case TYPE_PB_SERIALIZABLE:
classNameLen = buffer.getInt(pos);
pos += 4;
classNameBytes = buffer.getBytes(pos, pos + classNameLen);
pos += classNameLen;
className = new String(classNameBytes, UTF8);
Class<?> clazzByPb = Utils.getClassLoader().loadClass(className);
int serializeObjLength = buffer.getInt(pos);
pos+=4;
byte[] serializeObjBytes = buffer.getBytes(pos,pos+serializeObjLength);
pos+=serializeObjLength;
val = ProtoStuffUtil.deserializerToObj(serializeObjBytes,clazzByPb);
break;
default:
throw new IllegalStateException("Invalid serialized type: " + type);
}
data.put(key, val);
}
setData(data);
}
return pos;
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
throw new VertxException(e);
}
}
}
}
序列化时默认采用pb做序列化,反序列化时,如果类型是pb就采用pb做反序列化。
完整代码其看github
https://github.com/haozhi-ly/spring-boot-tutorial/tree/master/spring-boot-vertx
边栏推荐
- Abnova maxpab mouse derived polyclonal antibody solution
- Duchefa cytokinin dihydrozeatin (DHZ) instructions
- 【案例】元素的显示与隐藏的运用--元素遮罩
- Which is the best online collaboration product? Microsoft loop, notion, flowus
- Abnova丨 MaxPab 小鼠源多克隆抗体解决方案
- 浅聊我和一些编程语言的缘分
- Viewrootimpl and windowmanagerservice notes
- 研学旅游实践教育的开展助力文旅产业发展
- 学习机器人无从下手?带你体会当下机器人热门研究方向有哪些
- Write an interface based on flask
猜你喜欢
显示屏DIN 4102-1 Class B1防火测试要求
Research and development efficiency improvement practice of large insurance groups with 10000 + code base and 3000 + R & D personnel
Abnova e (diii) (WNV) recombinant protein Chinese and English instructions
Duchefa丨S0188盐酸大观霉素五水合物中英文说明书
Abnova丨E (DIII) (WNV) 重组蛋白 中英文说明书
Abnova total RNA Purification Kit for cultured cells Chinese and English instructions
Abnova fluorescent dye 620-m streptavidin scheme
leetcode:1755. 最接近目标值的子序列和
2.<tag-哈希表, 字符串>补充: 剑指 Offer 50. 第一个只出现一次的字符 dbc
CLion配置visual studio(msvc)和JOM多核编译
随机推荐
树莓派4B上ncnn转换出来的模型调用时总是崩溃(Segment Fault)的原因
matplotlib绘图润色(如何形成高质量的图,例如设如何置字体等)
Duchefa丨MS培养基含维生素说明书
hdu2377Bus Pass(构建更复杂的图+spfa)
2. < tag hash table, string> supplement: Sword finger offer 50 The first character DBC that appears only once
Popular science | does poor English affect the NPDP exam?
启牛2980有没有用?开户安全吗、
How to open an account online for futures? Is it safe?
Duchefa s0188 Chinese and English instructions of spectinomycin hydrochloride pentahydrate
POJ 3414 pots (bfs+ clues)
Mathematical analysis_ Notes_ Chapter 9: curve integral and surface integral
挖财商学院给的证券账户安全吗?可以开户吗?
Viewrootimpl and windowmanagerservice notes
研学旅游实践教育的开展助力文旅产业发展
线程池的使用
解析五育融合之下的steam教育模式
Maker education infiltrating the transformation of maker spirit and culture
Web Service简单入门示例
ts 之 泛型
Abnova丨血液总核酸纯化试剂盒预装相关说明书