当前位置:网站首页>[Sylar] framework -chapter15 stream module
[Sylar] framework -chapter15 stream module
2022-07-28 04:32:00 【Jianghu people call it pineapple bag】
Standing on the shoulders of giants
Rewrite from scratch sylar C++ High performance distributed server framework
summary
- Flow structure , Provide byte stream read / write interface .
- All flow structures are inherited from abstract classes Stream,Stream Class specifies that a stream must have read / write Interface and readFixSize / writeFixSize Interface , Inherited from Stream The class of must implement these interfaces .
Stream
- Encapsulation class of stream interface .
- virtual base class .
- Contains pure virtual methods read / write and readFixSize / writeFixSize.
SocketStream
- Socket Encapsulation of streaming interface .
- Inherited from Stream.
- Socket stream structure , Encapsulate the socket into a stream structure , To support the Stream The interface specification .
- Support socket close operation and get local / Operation of remote address .
Some related codes
/**
* @filename stream.h
* @brief Encapsulation of stream interface
* @author L-ge
* @version 0.1
* @modify 2022-07-14
*/
#ifndef __SYLAR_STREAM_H__
#define __SYLAR_STREAM_H__
#include <memory>
#include "bytearray.h"
namespace sylar
{
class Stream
{
public:
typedef std::shared_ptr<Stream> ptr;
virtual ~Stream() {}
/**
* @brief Reading data
*
* @param buffer Memory for receiving data
* @param length Memory size of receiving data
*
* @return Returns the actual size of the received data
* @retval >0 Returns the actual size of the received data
* @retval =0 Shut down
* @retval <0 A stream error occurred
*/
virtual int read(void* buffer, size_t length) = 0;
/**
* @brief Reading data
*
* @param ba Receiving data ByteArray
*/
virtual int read(ByteArray::ptr ba, size_t length) = 0;
/**
* @brief Read fixed length data
*/
virtual int readFixSize(void* buffer, size_t length);
virtual int readFixSize(ByteArray::ptr ba, size_t length);
virtual int write(const void* buffer, size_t length) = 0;
virtual int write(ByteArray::ptr ba, size_t length) = 0;
virtual int writeFixSize(const void* buffer, size_t length);
virtual int wirteFixSize(ByteArray::ptr ba, size_t length);
/**
* @brief Closed flow
*/
virtual void close() = 0;
};
}
#endif
// stream.cc
#include "stream.h"
namespace sylar
{
int Stream::readFixSize(void* buffer, size_t length)
{
size_t offset = 0;
int64_t left = length;
while(left > 0)
{
int64_t len = read((char*)buffer + offset, left);
if(len <= 0)
{
return len;
}
offset += len;
left -= len;
}
return length;
}
int Stream::readFixSize(ByteArray::ptr ba, size_t length)
{
int64_t left = length;
while(left > 0)
{
// From ba Get iovec, Don't pay attention to the offset
int64_t len = read(ba, left);
if(len <= 0)
{
return len;
}
left -= len;
}
return length;
}
int Stream::writeFixSize(const void* buffer, size_t length)
{
size_t offset = 0;
int64_t left = length;
while(left > 0)
{
int64_t len = write((const char*)buffer + offset, left);
if(len <= 0)
{
return len;
}
offset += len;
left -= len;
}
return length;
}
int Stream::wirteFixSize(ByteArray::ptr ba, size_t length)
{
int64_t left = length;
while(left > 0)
{
int64_t len = write(ba, left);
if(len <= 0)
{
return len;
}
left -= len;
}
return length;
}
}
/**
* @filename socket_stream.h
* @brief Socket Encapsulation of streaming interface
* @author L-ge
* @version 0.1
* @modify 2022-07-14
*/
#ifndef __SYLAR_SOCKET_STREAM_H__
#define __SYLAR_SOCKET_STREAM_H__
#include "sylar/stream.h"
#include "sylar/socket.h"
#include "sylar/mutex.h"
#include "sylar/iomanager.h"
namespace sylar
{
class SocketStream : public Stream
{
public:
typedef std::shared_ptr<SocketStream> ptr;
/**
* @brief Constructors
*
* @param sock Socket class
* @param owner Is it completely controlled
*/
SocketStream(Socket::ptr sock, bool owner = true);
/**
* @brief Destructor ( If m_owner=true, be close)
*/
~SocketStream();
virtual int read(void* buffer, size_t length) override;
virtual int read(ByteArray::ptr ba, size_t length) override;
virtual int write(const void* buffer, size_t length) override;
virtual int write(ByteArray::ptr ba, size_t length) override;
virtual void close() override;
Socket::ptr getSocket() const { return m_socket; }
bool isConnected() const;
Address::ptr getRemoteAddress();
Address::ptr getLocalAddress();
std::string getRemoteAddressString();
std::string getLocaloAddressString();
protected:
/// Socket class
Socket::ptr m_socket;
/// Master or not ( It mainly refers to whether this class controls the closing )
bool m_owner;
};
}
#endif
// socket_stream.cc
#include "socket_stream.h"
namespace sylar
{
SocketStream::SocketStream(Socket::ptr sock, bool owner)
: m_socket(sock)
, m_owner(owner)
{
}
SocketStream::~SocketStream()
{
if(m_owner && m_socket)
{
m_socket->close();
}
}
int SocketStream::read(void* buffer, size_t length)
{
if(!isConnected())
{
return -1;
}
return m_socket->recv(buffer, length);
}
int SocketStream::read(ByteArray::ptr ba, size_t length)
{
if(!isConnected())
{
return -1;
}
std::vector<iovec> iovs;
ba->getWriteBuffers(iovs, length); // Get iovec Write cache
int rt = m_socket->recv(&iovs[0], iovs.size());
if(rt > 0)
{
ba->setPosition(ba->getPosition() + rt); // Update the current operation location
}
return rt;
}
int SocketStream::write(const void* buffer, size_t length)
{
if(!isConnected())
{
return -1;
}
return m_socket->send(buffer, length);
}
int SocketStream::write(ByteArray::ptr ba, size_t length)
{
if(!isConnected())
{
return -1;
}
std::vector<iovec> iovs;
ba->getReadBuffers(iovs, length); // Get iovec Read cache
int rt = m_socket->send(&iovs[0], iovs.size());
if(rt > 0)
{
ba->setPosition(ba->getPosition() + rt); // Update the current operation location
}
return rt;
}
void SocketStream::close()
{
if(m_socket)
{
m_socket->close();
}
}
bool SocketStream::isConnected() const
{
return m_socket && m_socket->isConnected();
}
Address::ptr SocketStream::getRemoteAddress()
{
if(m_socket)
{
return m_socket->getRemoteAddress();
}
return nullptr;
}
Address::ptr SocketStream::getLocalAddress()
{
if(m_socket)
{
return m_socket->getLocalAddress();
}
return nullptr;
}
std::string SocketStream::getRemoteAddressString()
{
auto addr = getRemoteAddress();
if(addr)
{
return addr->toString();
}
return "";
}
std::string SocketStream::getLocaloAddressString()
{
auto addr = getLocalAddress();
if(addr)
{
return addr->toString();
}
return "";
}
}
Advertising time : be based on sylar The implementation of the framework is small demo( I hope to give star)
边栏推荐
- 22 openwrt uses external kernel and kernel_ config
- 方舟生存进化自建服务器要多少成本?
- Information system project manager (2022) - key content: Information System Security Management (20)
- 【sylar】实战篇-基于 redis 的参数查询服务
- transform: failed to synchronize: cudaErrorAssert: device-side assert triggered
- Applet form-2
- How to upgrade a pair of 12.2 RAC(primary) and a pair of 12.2 RAC(dataguard) to 19c
- Full resolution of the use of go native plug-ins
- 关系数据库事务中的对象锁定
- [performance optimization methodology series] III. core idea of performance optimization (2)
猜你喜欢

Information system project manager (2022) - key content: Information System Security Management (20)

Harmony's Application on the shelves reported an error. The solution of "please use the API of the released version to develop the application and apply for listing"
![RuntimeError: stack expects each tensor to be equal size, but got [8] at entry 0 and [2] at entry 2](/img/66/27de1ac0f642fc91fca5196ea6e141.png)
RuntimeError: stack expects each tensor to be equal size, but got [8] at entry 0 and [2] at entry 2

Reading of the paper "attentional encoder network for targeted sentimental classification"

idea启动项目mvn命令终端用不了法将“mvn”项识别为 cmdlet

物联网工业串口转WiFi模块 无线路由WiFi模块的选型

Seamless support for hugging face community, colossal AI low-cost and easy acceleration of large model

Some personal understandings of openpose

高数_第4章__曲线积分

Kotlin——函数
随机推荐
Use Baidu developer tool 4.0 to build a dedicated applet IDE
Thoroughly understand the sharing function in wechat games
高数_第4章__曲线积分
Fearless of side impact damage, Chery arize 8 fully protects the safety of passengers
[practice] use the web animations API to realize a clock with accurate timing
Important SQL server functions - other functions
About me writing a custom cell
idea启动项目mvn命令终端用不了法将“mvn”项识别为 cmdlet
Ma Yi, Shen Xiangyang, Cao Ying's latest AI overview is hot! It took 3 months to build, netizens: required papers
Render the data obtained from the database to the table in elementui
[performance optimization methodology series] III. core idea of performance optimization (2)
Campus stray cat information recording and sharing applet source code
上班摸鱼打卡模拟器微信小程序源码
Practice and thinking of AI standardization engine in pink client
The unsatisfied analysis of setup and hold timing is the solution
DNS series (III): how to avoid DNS spoofing
Blooming old trees -- quickly build a map bed application with imageprocessor
Password key hard coding check
《关于我写自定义cell这件事》
Select sorting method