当前位置:网站首页>utils session&rpc
utils session&rpc
2022-06-30 09:40:00 【Zip-List】
utils session And simplicity rpc
session Commonly used in : asynchronous + The programming model of callback
session
Define a callback function
typedef int (*SESSION_MSG_CALLBACK_FUNC) (int ret, // The result of this call
u64 uid,// The originator of the call
const Message* msg, // Data obtained after callback
const void* cbdata, const size_t cbdata_len); // Context
Define a session
1 session Of Life cycle The user is responsible for : Pool class to manage
2 timeout handler : Start a timer
3 Callback function +cb_data+cbdata_len
4 seq_id unique sequence number for each call
struct RPC_SESSION
{
mid_t mid; // Record what should be session Memory address of , Distribution
u64 seq_id; // unique sequence number for each call
mid_t timer_mid; // timeout handler
SESSION_MSG_CALLBACK_FUNC cb_func; // callback func, default can't be null
u64 uid; // Transmitted back uid
int msg_id;
s64 begin_ms;
u8 cbdata[RPC_CBDATA_LEN_MAX];
size_t cbdata_len;
};
session Management related functions
std::map<u64, mid_t> g_map_seq2sess; // according to seq mapping session The address of
RPC_SESSION* _session_create(SESSION_MSG_CALLBACK_FUNC cb_func, u64 uid, const void *cbdata, size_t cbdata_len)
{
mid_t mid = memunit_alloc(COM_RPC_SESSION);
RPC_SESSION* session = (RPC_SESSION*)memunit_get(mid);
do
{
session->mid = mid;
session->uid = uid;
session->seq_id = guid_alloc();
session->cb_func = cb_func;
session->cbdata_len = cbdata_len;
if (cbdata) memcpy(session->cbdata, cbdata, cbdata_len);
mid_t timer_mid = add_once_timer(sec_to_tick(RPC_TIMEOUT_SEC)
, TIMEOUT_RPC_SESSION, &session->seq_id, sizeof(session->seq_id));
session->timer_mid = timer_mid;
g_map_seq2sess[session->seq_id] = mid;
return session;
} while (0);
_session_destroy(*session);
return nullptr;
}
void _session_destroy(RPC_SESSION &sess) //rpc End of call or destroy after timeout session
{
auto itr = g_map_seq2sess.find(sess.seq_id);
if (itr != g_map_seq2sess.end())
{
g_map_seq2sess.erase(itr);
}
del_timer(sess.timer_mid);
memunit_free(sess.mid);
}
RPC_SESSION* _session_get(u64 rpc_seq)
{
auto itr = g_map_seq2sess.find(rpc_seq);
if (itr == g_map_seq2sess.end())
{
return nullptr;
}
return (RPC_SESSION*)memunit_get(itr->second);
}
int _session_callback(RPC_SESSION* sess, int errcode, Message *msg /*= NULL*/)
{
int ret = sess->cb_func(errcode, sess->uid, msg, sess->cbdata, sess->cbdata_len);
}
rpc
How to encapsulate the format of remote calls
message SSSessionMsgReq
{
optional fixed32 src_addr = 1;
optional fixed64 seq_id = 2;// transparent transmission session Of seq
optional uint32 msg_id = 3;// Methods called remotely id rpc_req Of id
optional bytes msg_body = 4;// Parameters of the method called remotely
}
message SSSessionMsgAck
{
optional fixed32 target_addr = 1;
optional fixed64 seq_id = 2;// transparent transmission session Of seq
optional uint32 msg_id = 3;// The result of the remote call id rpc_ack Of id
optional bytes msg_body = 4;// The result of the remote call
}
//A Make a remote call
static int _session_call_by_type(int app_type, u64 uid, const Message* msg
, BG_SESSION_MSG_CALLBACK_FUNC cb_fn
, const void* cb_data, size_t cb_data_len, EN_BG_SESSION_CALL_TYPE call_type)
{
session = _session_create(cb_fn, uid, cb_data, cb_data_len);
dc::SSSessionMsgReq rpc_req;
rpc_req.set_seq_id(session->seq_id);
rpc_req.set_src_addr(bg_this_appid());
int msg_id = find_msg_id(msg->GetTypeName());
rpc_req.set_msg_id(msg_id);
session->msg_id = msg_id;
std::string str_body;
rpc_req.set_msg_body(str_body);
send_to_single_svr_by_type(app_type, uid, &rpc_req);
}
//B Remote call request received
int _recv_session_req(int src_addr, u64 uid, dc::SSSessionMsgReq *req)
{
// Call corresponding method
}
//B The corresponding method has been executed Return the remote call result
struct BG_CALL_SOURCE
{
int src_addr; // The address of the originator of the remote call
u64 seq; // Corresponding session
};
int bg_session_call_ack(const BG_CALL_SOURCE& src, u64 uid, const Message* msg)
{
dc::SSSessionMsgAck rpc_ack;
rpc_ack.set_seq_id(src.seq);
int msg_id = find_msg_id(msg->GetTypeName());
rpc_ack.set_msg_id(msg_id);
std::string str_body;
rpc_ack.set_msg_body(str_body);
return send_to_svr(src.src_addr, uid, &rpc_ack);
}
//A Received remote call result
int _recv_session_ack(int src_addr, u64 session_id, dc::SSSessionMsgAck *ack)
{
int msg_id = ack->msg_id();
BG_RPC_SESSION* session = _session_get(ack->seq_id()); // To find the corresponding session
google::protobuf::Message* msg = get_msg_pb_obj(msg_id, ack->msg_body());
_session_callback(session, 0, msg); // Execute the corresponding callback
return 0;
}
边栏推荐
- POJ 1753 flip game (DFS 𞓜 bit operation)
- MySQL knowledge summary (useful for thieves)
- 基于Svelte3.x桌面端UI组件库Svelte UI
- JVM notes (III): analysis of JVM object creation and memory allocation mechanism
- Express の post request
- Pytorch graduate warm LR installation
- oracle跨数据库复制数据表-dblink
- (zero) most complete JVM knowledge points
- Deberta (decoding enhanced Bert with distinguished attention)
- utlis 内存池 对象池
猜你喜欢

Express の Hello World

oracle跨数据库复制数据表-dblink

MySQL-- Entity Framework Code First(EF Code First)

目标检测yolov5开源项目调试

桂林 稳健医疗收购桂林乳胶100%股权 填补乳胶产品线空白

近期学习遇到的比较问题

JVM tuning tool introduction and constant pool explanation

Applet learning path 2 - event binding

Express get request

5. Messager framework and imessager interface
随机推荐
Linear-gradient()
【Ubuntu-redis安装】
How do I start? (continuously updating)
Common query and aggregation of ES
Tablet PC based ink handwriting recognition input method
银河麒麟server-V10配置镜像源
Generate directory in markdown
Distributed ID
Numpy (data type)
Why won't gold depreciate???
Function simplification principle: save if you can
直播带货源码开发中,如何降低直播中的延迟?
Flutter 中的 ValueNotifier 和 ValueListenableBuilder
Mysq database remote connection error, remote connection is not allowed
ReturnJson,让返回数据多一些自定义数据或类名
【新书推荐】Deno Web Development
JVM tuning tool introduction and constant pool explanation
Couldn't load this key (openssh ssh-2 private key (old PEM format))
Solution to the eighth training competition of 2020 Provincial Games
QR code generation and analysis