当前位置:网站首页>基于vertx-web-sstore-redis的改造实现vertx http应用的分布式session
基于vertx-web-sstore-redis的改造实现vertx http应用的分布式session
2022-07-05 20:58:00 【forwardMyLife】
1. vertx 自身基于redis的分布式session的不足
vertx 本身有redis的分布式session的实现。只需要引入如下依赖
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-sstore-redis</artifactId>
<version>4.1.4</version>
</dependency>
RedisOptions redisOptions = new RedisOptions();
redisOptions.addConnectionString("redis://192.168.72.8:6379");
Redis redis = new RedisClient(vertx,redisOptions);
RedisSessionStore redisSessionStore = RedisSessionStore.create(vertx,redis);
//SessionStore sessionStore = SessionStore.create(vertx);
router.routeWithRegex("/static.*").handler(StaticHandler.create());
router.routeWithRegex(".*service.*")
.handler(SessionHandler.create(redisSessionStore)).handler(ctx->{
if(ctx.request().path().contains("initSession")){
ctx.request().response().setStatusCode(401)
.putHeader("Content-Type","application/json").end((new JsonObject().put("msg","illegal request").encode()));
}else{
ctx.next();
}
})
只需要将原本的SessionStore换成RedisSessionStore即可。
其本身的Redis客户端是基于vertx实现的,对redis的读写是异步的,不会阻塞io线程。但是其目前的RedisStore实现有个很致命的问题,不能直接支持对自定义对象或者jdk原有的常用对象如ArrayList,HashMap做序列化和反序列化,除8大基本类型和其包装类外,其他的复杂对象都要实现其序列化接口。ClusterSerializable ,
package io.vertx.core.shareddata.impl;
import io.vertx.core.buffer.Buffer;
/** * Objects implementing this interface will be write to and read from a {@link Buffer} when respectively * stored and read from an {@link io.vertx.core.shareddata.AsyncMap}. * <p> * Implementations must have a public no-argument constructor. * * @author <a href="http://tfox.org">Tim Fox</a> */
public interface ClusterSerializable {
void writeToBuffer(Buffer buffer);
int readFromBuffer(int pos, Buffer buffer);
}
/* * Copyright 2014 Red Hat, Inc. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Apache License v2.0 which accompanies this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * * The Apache License v2.0 is available at * http://www.opensource.org/licenses/apache2.0.php * * You may elect to redistribute this code under either of these licenses. */
package io.vertx.ext.web.sstore.impl;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.shareddata.Shareable;
import io.vertx.core.shareddata.impl.ClusterSerializable;
import io.vertx.ext.auth.VertxContextPRNG;
import io.vertx.ext.web.impl.Utils;
import io.vertx.ext.web.sstore.AbstractSession;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** * @author <a href="http://tfox.org">Tim Fox</a> */
public class SharedDataSessionImpl extends AbstractSession implements ClusterSerializable, Shareable {
private static final Charset UTF8 = StandardCharsets.UTF_8;
private static final byte TYPE_LONG = 1;
private static final byte TYPE_INT = 2;
private static final byte TYPE_SHORT = 3;
private static final byte TYPE_BYTE = 4;
private static final byte TYPE_DOUBLE = 5;
private static final byte TYPE_FLOAT = 6;
private static final byte TYPE_CHAR = 7;
private static final byte TYPE_BOOLEAN = 8;
private static final byte TYPE_STRING = 9;
private static final byte TYPE_BUFFER = 10;
private static final byte TYPE_BYTES = 11;
private static final byte TYPE_CLUSTER_SERIALIZABLE = 13;
/** * Important note: This constructor (even though not referenced anywhere) is required for serialization purposes. Do * not remove. */
public SharedDataSessionImpl() {
super();
}
public SharedDataSessionImpl(VertxContextPRNG random) {
super(random);
}
public SharedDataSessionImpl(VertxContextPRNG random, long timeout, int length) {
super(random, timeout, length);
}
@Override
public void writeToBuffer(Buffer buff) {
byte[] bytes = id().getBytes(UTF8);
buff.appendInt(bytes.length).appendBytes(bytes);
buff.appendLong(timeout());
buff.appendLong(lastAccessed());
buff.appendInt(version());
// use cache
Buffer dataBuf = writeDataToBuffer();
buff.appendBuffer(dataBuf);
}
@Override
public int readFromBuffer(int pos, Buffer buffer) {
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
pos += len;
setId(new String(bytes, UTF8));
setTimeout(buffer.getLong(pos));
pos += 8;
setLastAccessed(buffer.getLong(pos));
pos += 8;
setVersion(buffer.getInt(pos));
pos += 4;
pos = readDataFromBuffer(pos, buffer);
return pos;
}
private Buffer writeDataToBuffer() {
Buffer buffer = Buffer.buffer();
if (isEmpty()) {
buffer.appendInt(0);
} else {
final Map<String, Object> data = data();
buffer.appendInt(data.size());
for (Map.Entry<String, Object> entry : data.entrySet()) {
String key = entry.getKey();
byte[] keyBytes = key.getBytes(UTF8);
buffer.appendInt(keyBytes.length).appendBytes(keyBytes);
Object val = entry.getValue();
if (val instanceof Long) {
buffer.appendByte(TYPE_LONG).appendLong((long) val);
} else if (val instanceof Integer) {
buffer.appendByte(TYPE_INT).appendInt((int) val);
} else if (val instanceof Short) {
buffer.appendByte(TYPE_SHORT).appendShort((short) val);
} else if (val instanceof Byte) {
buffer.appendByte(TYPE_BYTE).appendByte((byte) val);
} else if (val instanceof Double) {
buffer.appendByte(TYPE_DOUBLE).appendDouble((double) val);
} else if (val instanceof Float) {
buffer.appendByte(TYPE_FLOAT).appendFloat((float) val);
} else if (val instanceof Character) {
buffer.appendByte(TYPE_CHAR).appendShort((short) ((Character) val).charValue());
} else if (val instanceof Boolean) {
buffer.appendByte(TYPE_BOOLEAN).appendByte((byte) ((boolean) val ? 1 : 0));
} else if (val instanceof String) {
byte[] bytes = ((String) val).getBytes(UTF8);
buffer.appendByte(TYPE_STRING).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof Buffer) {
Buffer buff = (Buffer) val;
buffer.appendByte(TYPE_BUFFER).appendInt(buff.length()).appendBuffer(buff);
} else if (val instanceof byte[]) {
byte[] bytes = (byte[]) val;
buffer.appendByte(TYPE_BYTES).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof ClusterSerializable) {
buffer.appendByte(TYPE_CLUSTER_SERIALIZABLE);
String className = val.getClass().getName();
byte[] classNameBytes = className.getBytes(UTF8);
buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);
((ClusterSerializable) val).writeToBuffer(buffer);
} else {
if (val != null) {
throw new IllegalStateException("Invalid type for data in session: " + val.getClass());
}
}
}
}
return buffer;
}
private int readDataFromBuffer(int pos, Buffer buffer) {
try {
int entries = buffer.getInt(pos);
pos += 4;
if (entries > 0) {
final Map<String, Object> data = new ConcurrentHashMap<>(entries);
for (int i = 0; i < entries; i++) {
int keylen = buffer.getInt(pos);
pos += 4;
byte[] keyBytes = buffer.getBytes(pos, pos + keylen);
pos += keylen;
String key = new String(keyBytes, UTF8);
byte type = buffer.getByte(pos++);
Object val;
switch (type) {
case TYPE_LONG:
val = buffer.getLong(pos);
pos += 8;
break;
case TYPE_INT:
val = buffer.getInt(pos);
pos += 4;
break;
case TYPE_SHORT:
val = buffer.getShort(pos);
pos += 2;
break;
case TYPE_BYTE:
val = buffer.getByte(pos);
pos++;
break;
case TYPE_FLOAT:
val = buffer.getFloat(pos);
pos += 4;
break;
case TYPE_DOUBLE:
val = buffer.getDouble(pos);
pos += 8;
break;
case TYPE_CHAR:
short s = buffer.getShort(pos);
pos += 2;
val = (char) s;
break;
case TYPE_BOOLEAN:
byte b = buffer.getByte(pos);
pos++;
val = b == 1;
break;
case TYPE_STRING:
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
val = new String(bytes, UTF8);
pos += len;
break;
case TYPE_BUFFER:
len = buffer.getInt(pos);
pos += 4;
bytes = buffer.getBytes(pos, pos + len);
val = Buffer.buffer(bytes);
pos += len;
break;
case TYPE_BYTES:
len = buffer.getInt(pos);
pos += 4;
val = buffer.getBytes(pos, pos + len);
pos += len;
break;
case TYPE_CLUSTER_SERIALIZABLE:
int classNameLen = buffer.getInt(pos);
pos += 4;
byte[] classNameBytes = buffer.getBytes(pos, pos + classNameLen);
pos += classNameLen;
String className = new String(classNameBytes, UTF8);
Class<?> clazz = Utils.getClassLoader().loadClass(className);
if (!ClusterSerializable.class.isAssignableFrom(clazz)) {
throw new ClassCastException(new String(classNameBytes) + " is not assignable from ClusterSerializable");
}
ClusterSerializable obj = (ClusterSerializable) clazz.getDeclaredConstructor().newInstance();
pos = obj.readFromBuffer(pos, buffer);
val = obj;
break;
default:
throw new IllegalStateException("Invalid serialized type: " + type);
}
data.put(key, val);
}
setData(data);
}
return pos;
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
throw new VertxException(e);
}
}
}
复杂对象如果没有实现ClusterSerializable 接口,就会提示IllegalStateException。
2. 改写ShareDataSessionImpl
所以我们在原有的序列化类型上加上PB类型,复杂的对象我们都采取pb进行序列化和反序列化。
pb序列化工具类
package com.ly;
import com.ly.entity.Good;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class ProtoStuffUtil {
private static final Map<Class<?>,Schema<?>> SCHEMA_MAP = new ConcurrentHashMap<>();
private static final Class<SerializeDeserializeWrapperObj> SERIALIZE_DESERIALIZE_WRAPPER_OBJ_CLASS =
SerializeDeserializeWrapperObj.class;
private static final Schema<SerializeDeserializeWrapperObj> WRAPPER_SCHEMA =
RuntimeSchema.createFrom(SERIALIZE_DESERIALIZE_WRAPPER_OBJ_CLASS);
private static final Set<Class<?>> WRAPPER_CLASS_SET = new HashSet<>();
static{
WRAPPER_CLASS_SET.add(List.class);
WRAPPER_CLASS_SET.add(Integer.class);
WRAPPER_CLASS_SET.add(Boolean.class);
WRAPPER_CLASS_SET.add(Character.class);
WRAPPER_CLASS_SET.add(Double.class);
WRAPPER_CLASS_SET.add(int.class);
WRAPPER_CLASS_SET.add(boolean.class);
WRAPPER_CLASS_SET.add(char.class);
WRAPPER_CLASS_SET.add(double.class);
WRAPPER_CLASS_SET.add(ArrayList.class);
WRAPPER_CLASS_SET.add(Set.class);
WRAPPER_CLASS_SET.add(Map.class);
WRAPPER_CLASS_SET.add(HashMap.class);
WRAPPER_CLASS_SET.add(Date.class);
}
public static <T> byte[] serializer(T o){
if(WRAPPER_CLASS_SET.contains(o.getClass())){
return ProtostuffIOUtil.toByteArray(SerializeDeserializeWrapperObj.builder(o),WRAPPER_SCHEMA, LinkedBuffer.allocate(1024));
}else{
return ProtostuffIOUtil.toByteArray(o,getSchema(o.getClass()), LinkedBuffer.allocate(1024));
}
}
public static <T> byte[] serializerV1(T o){
Schema<T> schema = getSchema(o.getClass());
return ProtostuffIOUtil.toByteArray(o,schema, LinkedBuffer.allocate(1024));
}
public static <T> Schema getSchema(Class<T> clazz){
if(SCHEMA_MAP.containsKey(clazz)){
return SCHEMA_MAP.get(clazz);
}else{
Schema<T> schema = RuntimeSchema.createFrom(clazz);
SCHEMA_MAP.put(clazz,schema);
return schema;
}
}
public static <T> T deserializer(byte[] bytes,Class<T> clazz) {
if(WRAPPER_CLASS_SET.contains(clazz)){
SerializeDeserializeWrapperObj<T> obj = new SerializeDeserializeWrapperObj<>();
ProtostuffIOUtil.mergeFrom(bytes, obj, WRAPPER_SCHEMA);
return obj.getData();
}else{
Schema<T> schema = getSchema(clazz);
T obj = null;
try {
obj = clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
return null;
}
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
}
public static Object deserializerToObj(byte[] bytes,Class clazz) {
if(WRAPPER_CLASS_SET.contains(clazz)){
SerializeDeserializeWrapperObj obj = new SerializeDeserializeWrapperObj<>();
ProtostuffIOUtil.mergeFrom(bytes, obj, WRAPPER_SCHEMA);
return obj.getData();
}else{
Schema schema = RuntimeSchema.createFrom(clazz);
Object obj = null;
try {
obj = clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
return null;
}
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
}
public static <T> T deserializerV1(byte[] bytes,Class<T> clazz) throws IllegalAccessException, InstantiationException {
Schema<T> schema = getSchema(clazz);
T obj = clazz.newInstance();
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
private static class SerializeDeserializeWrapperObj<T> {
private T data;
public static <T> SerializeDeserializeWrapperObj<T> builder(T data) {
SerializeDeserializeWrapperObj<T> wrapper = new SerializeDeserializeWrapperObj<>();
wrapper.setData(data);
return wrapper;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
}
}
改写ShareDataSessionImpl
/* * Copyright 2014 Red Hat, Inc. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Apache License v2.0 which accompanies this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * * The Apache License v2.0 is available at * http://www.opensource.org/licenses/apache2.0.php * * You may elect to redistribute this code under either of these licenses. */
package com.ly.session;
import com.ly.ProtoStuffUtil;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.impl.ClusterSerializable;
import io.vertx.ext.auth.VertxContextPRNG;
import io.vertx.ext.web.Session;
import io.vertx.ext.web.impl.Utils;
import io.vertx.ext.web.sstore.AbstractSession;
import io.vertx.ext.web.sstore.SessionStore;
import io.vertx.ext.web.sstore.redis.RedisSessionStore;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import static io.vertx.redis.client.Command.*;
import static io.vertx.redis.client.Request.cmd;
/** * @author <a href="http://tfox.org">Tim Fox</a> */
public class ReactiveRedisSessionStore implements SessionStore {
private Redis redis;
private VertxContextPRNG random;
private long retryTimeout;
@Override
public SessionStore init(Vertx vertx, JsonObject options) {
Objects.requireNonNull(options, "options are required");
long timeout = options.getLong("retryTimeout", RedisSessionStore.DEFAULT_RETRY_TIMEOUT_MS);
Redis redis = Redis.createClient(vertx, new RedisOptions(options));
return init(vertx, timeout, redis);
}
public SessionStore init(Vertx vertx, long retryTimeout, Redis redis) {
random = VertxContextPRNG.current(vertx);
this.retryTimeout = retryTimeout;
this.redis = Objects.requireNonNull(redis, "redis is required");
return this;
}
@Override
public long retryTimeout() {
return retryTimeout;
}
@Override
public Session createSession(long timeout) {
return createSession(timeout, DEFAULT_SESSIONID_LENGTH);
}
@Override
public Session createSession(long timeout, int length) {
return new RedisShardDataSessionImpl(random, timeout, length);
}
public ReactiveRedisSessionStore() {
super();
}
@Override
public void get(String id, Handler<AsyncResult<Session>> resultHandler) {
redis.send(cmd(GET).arg(id), resGet -> {
if (resGet.failed()) {
resultHandler.handle(Future.failedFuture(resGet.cause()));
return;
}
Response response = resGet.result();
if (response != null) {
RedisShardDataSessionImpl session = new RedisShardDataSessionImpl(random);
session.readFromBuffer(0, response.toBuffer());
// postpone expiration time, this cannot be done in a single frame with GET cmd
// redis.send(cmd(PEXPIRE).arg(id).arg(session.timeout()), resExpire -> {
// if (resExpire.failed()) {
// resultHandler.handle(Future.failedFuture(resExpire.cause()));
// } else {
// resultHandler.handle(Future.succeededFuture(session));
// }
// });
resultHandler.handle(Future.succeededFuture(session));
} else {
resultHandler.handle(Future.succeededFuture());
}
});
}
@Override
public void delete(String id, Handler<AsyncResult<Void>> resultHandler) {
redis.send(cmd(DEL).arg(id), res -> {
if (res.failed()) {
resultHandler.handle(Future.failedFuture(res.cause()));
} else {
resultHandler.handle(Future.succeededFuture());
}
});
}
@Override
public void put(Session session, Handler<AsyncResult<Void>> resultHandler) {
Buffer buffer = Buffer.buffer();
RedisShardDataSessionImpl sessionImpl = (RedisShardDataSessionImpl) session;
sessionImpl.writeToBuffer(buffer);
// submit with all session data & expiration TO in ms
Request rq = cmd(SET)
.arg(session.id()).arg(buffer)
.arg("PX").arg(session.timeout());
redis.send(rq, res -> {
if (res.failed()) {
resultHandler.handle(Future.failedFuture(res.cause()));
} else {
resultHandler.handle(Future.succeededFuture());
}
});
}
@Override
public void clear(Handler<AsyncResult<Void>> resultHandler) {
}
@Override
public void size(Handler<AsyncResult<Integer>> resultHandler) {
}
@Override
public void close() {
}
public static class RedisShardDataSessionImpl extends AbstractSession{
private static final Charset UTF8 = StandardCharsets.UTF_8;
private static final byte TYPE_LONG = 1;
private static final byte TYPE_INT = 2;
private static final byte TYPE_SHORT = 3;
private static final byte TYPE_BYTE = 4;
private static final byte TYPE_DOUBLE = 5;
private static final byte TYPE_FLOAT = 6;
private static final byte TYPE_CHAR = 7;
private static final byte TYPE_BOOLEAN = 8;
private static final byte TYPE_STRING = 9;
private static final byte TYPE_BUFFER = 10;
private static final byte TYPE_BYTES = 11;
private static final byte TYPE_CLUSTER_SERIALIZABLE = 13;
private static final byte TYPE_PB_SERIALIZABLE = 14;
/** * Important note: This constructor (even though not referenced anywhere) is required for serialization purposes. Do * not remove. */
public RedisShardDataSessionImpl() {
super();
}
public RedisShardDataSessionImpl(VertxContextPRNG random) {
super(random);
}
public RedisShardDataSessionImpl(VertxContextPRNG random, long timeout, int length) {
super(random, timeout, length);
}
public void writeToBuffer(Buffer buff) {
byte[] bytes = id().getBytes(UTF8);
buff.appendInt(bytes.length).appendBytes(bytes);
buff.appendLong(timeout());
buff.appendLong(lastAccessed());
buff.appendInt(version());
// use cache
Buffer dataBuf = writeDataToBuffer();
buff.appendBuffer(dataBuf);
}
public int readFromBuffer(int pos, Buffer buffer) {
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
pos += len;
setId(new String(bytes, UTF8));
setTimeout(buffer.getLong(pos));
pos += 8;
setLastAccessed(buffer.getLong(pos));
pos += 8;
setVersion(buffer.getInt(pos));
pos += 4;
pos = readDataFromBuffer(pos, buffer);
return pos;
}
private Buffer writeDataToBuffer() {
Buffer buffer = Buffer.buffer();
if (isEmpty()) {
buffer.appendInt(0);
} else {
final Map<String, Object> data = data();
buffer.appendInt(data.size());
for (Map.Entry<String, Object> entry : data.entrySet()) {
String key = entry.getKey();
byte[] keyBytes = key.getBytes(UTF8);
buffer.appendInt(keyBytes.length).appendBytes(keyBytes);
Object val = entry.getValue();
if (val instanceof Long) {
buffer.appendByte(TYPE_LONG).appendLong((long) val);
} else if (val instanceof Integer) {
buffer.appendByte(TYPE_INT).appendInt((int) val);
} else if (val instanceof Short) {
buffer.appendByte(TYPE_SHORT).appendShort((short) val);
} else if (val instanceof Byte) {
buffer.appendByte(TYPE_BYTE).appendByte((byte) val);
} else if (val instanceof Double) {
buffer.appendByte(TYPE_DOUBLE).appendDouble((double) val);
} else if (val instanceof Float) {
buffer.appendByte(TYPE_FLOAT).appendFloat((float) val);
} else if (val instanceof Character) {
buffer.appendByte(TYPE_CHAR).appendShort((short) ((Character) val).charValue());
} else if (val instanceof Boolean) {
buffer.appendByte(TYPE_BOOLEAN).appendByte((byte) ((boolean) val ? 1 : 0));
} else if (val instanceof String) {
byte[] bytes = ((String) val).getBytes(UTF8);
buffer.appendByte(TYPE_STRING).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof Buffer) {
Buffer buff = (Buffer) val;
buffer.appendByte(TYPE_BUFFER).appendInt(buff.length()).appendBuffer(buff);
} else if (val instanceof byte[]) {
byte[] bytes = (byte[]) val;
buffer.appendByte(TYPE_BYTES).appendInt(bytes.length).appendBytes(bytes);
} else if (val instanceof ClusterSerializable) {
buffer.appendByte(TYPE_CLUSTER_SERIALIZABLE);
String className = val.getClass().getName();
byte[] classNameBytes = className.getBytes(UTF8);
buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);
((ClusterSerializable) val).writeToBuffer(buffer);
} else {
// 默认序列化成pb
buffer.appendByte(TYPE_PB_SERIALIZABLE);
String className = val.getClass().getName();
byte[] classNameBytes = className.getBytes(UTF8);
buffer.appendInt(classNameBytes.length).appendBytes(classNameBytes);
byte[] serializedBytes = ProtoStuffUtil.serializer(val);
buffer.appendInt(serializedBytes.length).appendBytes(serializedBytes);
}
}
}
return buffer;
}
private int readDataFromBuffer(int pos, Buffer buffer) {
try {
int entries = buffer.getInt(pos);
pos += 4;
if (entries > 0) {
final Map<String, Object> data = new ConcurrentHashMap<>(entries);
for (int i = 0; i < entries; i++) {
int keylen = buffer.getInt(pos);
pos += 4;
byte[] keyBytes = buffer.getBytes(pos, pos + keylen);
pos += keylen;
String key = new String(keyBytes, UTF8);
byte type = buffer.getByte(pos++);
Object val;
switch (type) {
case TYPE_LONG:
val = buffer.getLong(pos);
pos += 8;
break;
case TYPE_INT:
val = buffer.getInt(pos);
pos += 4;
break;
case TYPE_SHORT:
val = buffer.getShort(pos);
pos += 2;
break;
case TYPE_BYTE:
val = buffer.getByte(pos);
pos++;
break;
case TYPE_FLOAT:
val = buffer.getFloat(pos);
pos += 4;
break;
case TYPE_DOUBLE:
val = buffer.getDouble(pos);
pos += 8;
break;
case TYPE_CHAR:
short s = buffer.getShort(pos);
pos += 2;
val = (char) s;
break;
case TYPE_BOOLEAN:
byte b = buffer.getByte(pos);
pos++;
val = b == 1;
break;
case TYPE_STRING:
int len = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + len);
val = new String(bytes, UTF8);
pos += len;
break;
case TYPE_BUFFER:
len = buffer.getInt(pos);
pos += 4;
bytes = buffer.getBytes(pos, pos + len);
val = Buffer.buffer(bytes);
pos += len;
break;
case TYPE_BYTES:
len = buffer.getInt(pos);
pos += 4;
val = buffer.getBytes(pos, pos + len);
pos += len;
break;
case TYPE_CLUSTER_SERIALIZABLE:
int classNameLen = buffer.getInt(pos);
pos += 4;
byte[] classNameBytes = buffer.getBytes(pos, pos + classNameLen);
pos += classNameLen;
String className = new String(classNameBytes, UTF8);
Class<?> clazz = Utils.getClassLoader().loadClass(className);
if (!ClusterSerializable.class.isAssignableFrom(clazz)) {
throw new ClassCastException(new String(classNameBytes) + " is not assignable from ClusterSerializable");
}
ClusterSerializable obj = (ClusterSerializable) clazz.getDeclaredConstructor().newInstance();
pos = obj.readFromBuffer(pos, buffer);
val = obj;
break;
case TYPE_PB_SERIALIZABLE:
classNameLen = buffer.getInt(pos);
pos += 4;
classNameBytes = buffer.getBytes(pos, pos + classNameLen);
pos += classNameLen;
className = new String(classNameBytes, UTF8);
Class<?> clazzByPb = Utils.getClassLoader().loadClass(className);
int serializeObjLength = buffer.getInt(pos);
pos+=4;
byte[] serializeObjBytes = buffer.getBytes(pos,pos+serializeObjLength);
pos+=serializeObjLength;
val = ProtoStuffUtil.deserializerToObj(serializeObjBytes,clazzByPb);
break;
default:
throw new IllegalStateException("Invalid serialized type: " + type);
}
data.put(key, val);
}
setData(data);
}
return pos;
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
throw new VertxException(e);
}
}
}
}
序列化时默认采用pb做序列化,反序列化时,如果类型是pb就采用pb做反序列化。
完整代码其看github
https://github.com/haozhi-ly/spring-boot-tutorial/tree/master/spring-boot-vertx
边栏推荐
猜你喜欢
Talk about my fate with some programming languages
EN 438-7建筑覆盖物装饰用层压板材产品—CE认证
基于AVFoundation实现视频录制的两种方式
教你自己训练的pytorch模型转caffe(一)
Duchefa丨P1001植物琼脂中英文说明书
Duchefa丨S0188盐酸大观霉素五水合物中英文说明书
phpstudy小皮的mysql点击启动后迅速闪退,已解决
基于flask写一个接口
Specification of protein quantitative kit for abbkine BCA method
Abnova丨DNA 标记高质量控制测试方案
随机推荐
Mode - "Richter replacement principle"
判断横竖屏的最佳实现
Material Design组件 - 使用BottomSheet展现扩展内容(二)
ODPS 下一个map / reduce 准备
When steam education enters personalized information technology courses
php中explode函数存在的陷阱
MySQL InnoDB架构原理
Who the final say whether the product is good or not? Sonar puts forward performance indicators for analysis to help you easily judge product performance and performance
The reason why the ncnn converted model on raspberry pie 4B always crashes when called
Prosci LAG-3 recombinant protein specification
Analysis of steam education mode under the integration of five Education
How to make ERP inventory accounts of chemical enterprises more accurate
Comparison table of foreign lead American abbreviations
sql系列(基础)-第二章 限制和排序数据
PHP deserialization +md5 collision
AITM2-0002 12s或60s垂直燃烧试验
【案例】定位的运用-淘宝轮播图
台风来袭!建筑工地该如何防范台风!
Matplotlib drawing retouching (how to form high-quality drawings, such as how to set fonts, etc.)
研學旅遊實踐教育的開展助力文旅產業發展