当前位置:网站首页>utils session&rpc
utils session&rpc
2022-06-30 09:32:00 【Zip-List】
utils session和简易rpc
session常用在:异步+回调的编程模型
session
定义一个回调函数
typedef int (*SESSION_MSG_CALLBACK_FUNC) (int ret, //本次调用结果
u64 uid,//调用的发起者
const Message* msg, //回调后获得的数据
const void* cbdata, const size_t cbdata_len); //上下文
定义一个session
1 session的生命周期由用户负责:池类来管理
2 超时处理:启动一个定时器
3 回调函数+cb_data+cbdata_len
4 seq_id unique sequence number for each call
struct RPC_SESSION
{
mid_t mid; //记录该session的内存地址,分配情况
u64 seq_id; // unique sequence number for each call
mid_t timer_mid; //超时处理
SESSION_MSG_CALLBACK_FUNC cb_func; // callback func, default can't be null
u64 uid; // 透传回来的uid
int msg_id;
s64 begin_ms;
u8 cbdata[RPC_CBDATA_LEN_MAX];
size_t cbdata_len;
};
session管理的相关函数
std::map<u64, mid_t> g_map_seq2sess; //根据seq映射session的地址
RPC_SESSION* _session_create(SESSION_MSG_CALLBACK_FUNC cb_func, u64 uid, const void *cbdata, size_t cbdata_len)
{
mid_t mid = memunit_alloc(COM_RPC_SESSION);
RPC_SESSION* session = (RPC_SESSION*)memunit_get(mid);
do
{
session->mid = mid;
session->uid = uid;
session->seq_id = guid_alloc();
session->cb_func = cb_func;
session->cbdata_len = cbdata_len;
if (cbdata) memcpy(session->cbdata, cbdata, cbdata_len);
mid_t timer_mid = add_once_timer(sec_to_tick(RPC_TIMEOUT_SEC)
, TIMEOUT_RPC_SESSION, &session->seq_id, sizeof(session->seq_id));
session->timer_mid = timer_mid;
g_map_seq2sess[session->seq_id] = mid;
return session;
} while (0);
_session_destroy(*session);
return nullptr;
}
void _session_destroy(RPC_SESSION &sess) //rpc调用结束或者超时时销毁session
{
auto itr = g_map_seq2sess.find(sess.seq_id);
if (itr != g_map_seq2sess.end())
{
g_map_seq2sess.erase(itr);
}
del_timer(sess.timer_mid);
memunit_free(sess.mid);
}
RPC_SESSION* _session_get(u64 rpc_seq)
{
auto itr = g_map_seq2sess.find(rpc_seq);
if (itr == g_map_seq2sess.end())
{
return nullptr;
}
return (RPC_SESSION*)memunit_get(itr->second);
}
int _session_callback(RPC_SESSION* sess, int errcode, Message *msg /*= NULL*/)
{
int ret = sess->cb_func(errcode, sess->uid, msg, sess->cbdata, sess->cbdata_len);
}
rpc
如何封装远程调用的格式
message SSSessionMsgReq
{
optional fixed32 src_addr = 1;
optional fixed64 seq_id = 2;//透传session的seq
optional uint32 msg_id = 3;//远程调用的方法id rpc_req的id
optional bytes msg_body = 4;//远程调用的方法的参数
}
message SSSessionMsgAck
{
optional fixed32 target_addr = 1;
optional fixed64 seq_id = 2;//透传session的seq
optional uint32 msg_id = 3;//远程调用的结果的id rpc_ack的id
optional bytes msg_body = 4;//远程调用的结果
}
//A 发起远程调用
static int _session_call_by_type(int app_type, u64 uid, const Message* msg
, BG_SESSION_MSG_CALLBACK_FUNC cb_fn
, const void* cb_data, size_t cb_data_len, EN_BG_SESSION_CALL_TYPE call_type)
{
session = _session_create(cb_fn, uid, cb_data, cb_data_len);
dc::SSSessionMsgReq rpc_req;
rpc_req.set_seq_id(session->seq_id);
rpc_req.set_src_addr(bg_this_appid());
int msg_id = find_msg_id(msg->GetTypeName());
rpc_req.set_msg_id(msg_id);
session->msg_id = msg_id;
std::string str_body;
rpc_req.set_msg_body(str_body);
send_to_single_svr_by_type(app_type, uid, &rpc_req);
}
//B 收到远程调用请求
int _recv_session_req(int src_addr, u64 uid, dc::SSSessionMsgReq *req)
{
//调用对应方法
}
//B 对应方法执行完毕 返回远程调用结果
struct BG_CALL_SOURCE
{
int src_addr; //远程调用发起者的地址
u64 seq; //对应的session
};
int bg_session_call_ack(const BG_CALL_SOURCE& src, u64 uid, const Message* msg)
{
dc::SSSessionMsgAck rpc_ack;
rpc_ack.set_seq_id(src.seq);
int msg_id = find_msg_id(msg->GetTypeName());
rpc_ack.set_msg_id(msg_id);
std::string str_body;
rpc_ack.set_msg_body(str_body);
return send_to_svr(src.src_addr, uid, &rpc_ack);
}
//A 收到远程调用结果
int _recv_session_ack(int src_addr, u64 session_id, dc::SSSessionMsgAck *ack)
{
int msg_id = ack->msg_id();
BG_RPC_SESSION* session = _session_get(ack->seq_id()); //找到对应session
google::protobuf::Message* msg = get_msg_pb_obj(msg_id, ack->msg_body());
_session_callback(session, 0, msg); //执行对应的回调
return 0;
}
边栏推荐
- 12. problem set: process, thread and JNI architecture
- [shutter] solve failed assertion: line 5142 POS 12: '_ debugLocked‘: is not true.
- Terminal -- Zsh of terminal three swordsmen
- Talk about the kotlin cooperation process and the difference between job and supervisorjob
- QR code generation and analysis
- Installation, use and explanation of vulnerability scanning tool OpenVAS
- Express get request
- ES6 learning path (IV) operator extension
- 目标检测yolov5开源项目调试
- Simple redis lock
猜你喜欢
随机推荐
Demo of guavacache
MySQL knowledge summary (useful for thieves)
asdsadadsad
What are the SQL add / delete / modify queries?
Flutter theme (skin) changes
Startup of MySQL green edition in Windows system
Pit encountered by fastjason
Deep Learning with Pytorch- A 60 Minute Blitz
Tablet PC based ink handwriting recognition input method
Interviewer: do you understand the principle of recyclerview layout animation?
Ocx control can be called by IE on some computers, but can not be called by IE on some computers
Metasploit practice - SSH brute force cracking process
Numpy (time date and time increment)
float
Talk about how the kotlin process started?
Challenge transform() 2D
ES6 learning path (III) deconstruction assignment
Bottomsheetbehavior principle of realizing the home page effect of Gaode map
Application of hongruan face recognition
Deep Learning with Pytorch-Train A Classifier









