当前位置:网站首页>[Sylar] framework -chapter15 stream module
[Sylar] framework -chapter15 stream module
2022-07-28 04:32:00 【Jianghu people call it pineapple bag】
Standing on the shoulders of giants
Rewrite from scratch sylar C++ High performance distributed server framework
summary
- Flow structure , Provide byte stream read / write interface .
- All flow structures are inherited from abstract classes Stream,Stream Class specifies that a stream must have read / write Interface and readFixSize / writeFixSize Interface , Inherited from Stream The class of must implement these interfaces .
Stream
- Encapsulation class of stream interface .
- virtual base class .
- Contains pure virtual methods read / write and readFixSize / writeFixSize.
SocketStream
- Socket Encapsulation of streaming interface .
- Inherited from Stream.
- Socket stream structure , Encapsulate the socket into a stream structure , To support the Stream The interface specification .
- Support socket close operation and get local / Operation of remote address .
Some related codes
/**
* @filename stream.h
* @brief Encapsulation of stream interface
* @author L-ge
* @version 0.1
* @modify 2022-07-14
*/
#ifndef __SYLAR_STREAM_H__
#define __SYLAR_STREAM_H__
#include <memory>
#include "bytearray.h"
namespace sylar
{
class Stream
{
public:
typedef std::shared_ptr<Stream> ptr;
virtual ~Stream() {}
/**
* @brief Reading data
*
* @param buffer Memory for receiving data
* @param length Memory size of receiving data
*
* @return Returns the actual size of the received data
* @retval >0 Returns the actual size of the received data
* @retval =0 Shut down
* @retval <0 A stream error occurred
*/
virtual int read(void* buffer, size_t length) = 0;
/**
* @brief Reading data
*
* @param ba Receiving data ByteArray
*/
virtual int read(ByteArray::ptr ba, size_t length) = 0;
/**
* @brief Read fixed length data
*/
virtual int readFixSize(void* buffer, size_t length);
virtual int readFixSize(ByteArray::ptr ba, size_t length);
virtual int write(const void* buffer, size_t length) = 0;
virtual int write(ByteArray::ptr ba, size_t length) = 0;
virtual int writeFixSize(const void* buffer, size_t length);
virtual int wirteFixSize(ByteArray::ptr ba, size_t length);
/**
* @brief Closed flow
*/
virtual void close() = 0;
};
}
#endif
// stream.cc
#include "stream.h"
namespace sylar
{
int Stream::readFixSize(void* buffer, size_t length)
{
size_t offset = 0;
int64_t left = length;
while(left > 0)
{
int64_t len = read((char*)buffer + offset, left);
if(len <= 0)
{
return len;
}
offset += len;
left -= len;
}
return length;
}
int Stream::readFixSize(ByteArray::ptr ba, size_t length)
{
int64_t left = length;
while(left > 0)
{
// From ba Get iovec, Don't pay attention to the offset
int64_t len = read(ba, left);
if(len <= 0)
{
return len;
}
left -= len;
}
return length;
}
int Stream::writeFixSize(const void* buffer, size_t length)
{
size_t offset = 0;
int64_t left = length;
while(left > 0)
{
int64_t len = write((const char*)buffer + offset, left);
if(len <= 0)
{
return len;
}
offset += len;
left -= len;
}
return length;
}
int Stream::wirteFixSize(ByteArray::ptr ba, size_t length)
{
int64_t left = length;
while(left > 0)
{
int64_t len = write(ba, left);
if(len <= 0)
{
return len;
}
left -= len;
}
return length;
}
}
/**
* @filename socket_stream.h
* @brief Socket Encapsulation of streaming interface
* @author L-ge
* @version 0.1
* @modify 2022-07-14
*/
#ifndef __SYLAR_SOCKET_STREAM_H__
#define __SYLAR_SOCKET_STREAM_H__
#include "sylar/stream.h"
#include "sylar/socket.h"
#include "sylar/mutex.h"
#include "sylar/iomanager.h"
namespace sylar
{
class SocketStream : public Stream
{
public:
typedef std::shared_ptr<SocketStream> ptr;
/**
* @brief Constructors
*
* @param sock Socket class
* @param owner Is it completely controlled
*/
SocketStream(Socket::ptr sock, bool owner = true);
/**
* @brief Destructor ( If m_owner=true, be close)
*/
~SocketStream();
virtual int read(void* buffer, size_t length) override;
virtual int read(ByteArray::ptr ba, size_t length) override;
virtual int write(const void* buffer, size_t length) override;
virtual int write(ByteArray::ptr ba, size_t length) override;
virtual void close() override;
Socket::ptr getSocket() const { return m_socket; }
bool isConnected() const;
Address::ptr getRemoteAddress();
Address::ptr getLocalAddress();
std::string getRemoteAddressString();
std::string getLocaloAddressString();
protected:
/// Socket class
Socket::ptr m_socket;
/// Master or not ( It mainly refers to whether this class controls the closing )
bool m_owner;
};
}
#endif
// socket_stream.cc
#include "socket_stream.h"
namespace sylar
{
SocketStream::SocketStream(Socket::ptr sock, bool owner)
: m_socket(sock)
, m_owner(owner)
{
}
SocketStream::~SocketStream()
{
if(m_owner && m_socket)
{
m_socket->close();
}
}
int SocketStream::read(void* buffer, size_t length)
{
if(!isConnected())
{
return -1;
}
return m_socket->recv(buffer, length);
}
int SocketStream::read(ByteArray::ptr ba, size_t length)
{
if(!isConnected())
{
return -1;
}
std::vector<iovec> iovs;
ba->getWriteBuffers(iovs, length); // Get iovec Write cache
int rt = m_socket->recv(&iovs[0], iovs.size());
if(rt > 0)
{
ba->setPosition(ba->getPosition() + rt); // Update the current operation location
}
return rt;
}
int SocketStream::write(const void* buffer, size_t length)
{
if(!isConnected())
{
return -1;
}
return m_socket->send(buffer, length);
}
int SocketStream::write(ByteArray::ptr ba, size_t length)
{
if(!isConnected())
{
return -1;
}
std::vector<iovec> iovs;
ba->getReadBuffers(iovs, length); // Get iovec Read cache
int rt = m_socket->send(&iovs[0], iovs.size());
if(rt > 0)
{
ba->setPosition(ba->getPosition() + rt); // Update the current operation location
}
return rt;
}
void SocketStream::close()
{
if(m_socket)
{
m_socket->close();
}
}
bool SocketStream::isConnected() const
{
return m_socket && m_socket->isConnected();
}
Address::ptr SocketStream::getRemoteAddress()
{
if(m_socket)
{
return m_socket->getRemoteAddress();
}
return nullptr;
}
Address::ptr SocketStream::getLocalAddress()
{
if(m_socket)
{
return m_socket->getLocalAddress();
}
return nullptr;
}
std::string SocketStream::getRemoteAddressString()
{
auto addr = getRemoteAddress();
if(addr)
{
return addr->toString();
}
return "";
}
std::string SocketStream::getLocaloAddressString()
{
auto addr = getLocalAddress();
if(addr)
{
return addr->toString();
}
return "";
}
}
Advertising time : be based on sylar The implementation of the framework is small demo( I hope to give star)
边栏推荐
- Information system project manager (2022) - key content: Knowledge Management (15)
- gerrit操作-回退掉某个patch_set
- H. 265 web player easyplayer realizes webrtc video real-time recording function
- Reading of seq2path: generating sentimental tuples as paths of a tree
- Simple summary of Modbus Protocol
- Warning: file already exists but should not: c:\users\workmai\appdata\local\temp appears when Python packages exe\_ MEI13
- setup和hold timing分析不满足是解决方法
- 重要的 SQL Server 函数 - 其他函数
- 【sylar】框架篇-Chapter10-Address 模块
- 【sylar】框架篇-Chapter6-协程调度模块
猜你喜欢

Jupyter notebook installation code prompt function
![[mathematical modeling] Based on MATLAB seismic exploration Marmousi model [including Matlab source code, 1977]](/img/fd/6b261670c12e4d89c27364bcdf2a02.jpg)
[mathematical modeling] Based on MATLAB seismic exploration Marmousi model [including Matlab source code, 1977]

Information system project manager (2022) - key content: Knowledge Management (15)

Jupyter Notebook安装代码提示功能

Reading of papers on "towards generative aspect based sentimental analysis"

虚拟机类加载机制

Important SQL server functions - other functions

网页源代码查看竟然有这么多方法!你都知道吗?

Kotlin -- function

could only be written to 0 of the 1 minReplication nodes. There are 0 datanode(s) running and 0 node
随机推荐
【sylar】框架篇-Chapter22-辅助模块
Slice切片
Jupyter notebook installation code prompt function
Seamless support for hugging face community, colossal AI low-cost and easy acceleration of large model
Important SQL server functions - other functions
24-Openwrt dnsmasq
【sylar】框架篇-Chapter11-Socket 模块
[coding and decoding] Huffman coding and decoding based on Matlab GUI [including Matlab source code 1976]
Kotlin -- function
Bio annotation of emotion analysis aste triples extraction
H. 265 web player easyplayer realizes webrtc video real-time recording function
The unsatisfied analysis of setup and hold timing is the solution
【实战】使用 Web Animations API 实现一个精确计时的时钟
Information system project manager (2022) - key content: Knowledge Management (15)
Warning: file already exists but should not: c:\users\workmai\appdata\local\temp appears when Python packages exe\_ MEI13
重要的 SQL Server 函数 - 其他函数
Mac installs mysql5.7 through brew
Fedformer MOE module
openpose的一些个人理解
Transformer landing | next vit realizes the real-time landing of industrial tensorrt, surpassing RESNET and cswin