当前位置:网站首页>高并发内存池
高并发内存池
2022-07-03 07:05:00 【Ustinian%】
文章目录
高并发内存池
项目介绍
- 当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。
项目要求的知识储备
- 这个项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等等方面的知识。
什么是内存池
池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。
之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
举个例子吧
我们大家现在这个阶段大部分应该都还是学生,暂时还没有什么经济来源,所以生活费一般都需要找父母要。那假如说我们买个早餐要找父母要钱去买,买一本书一支笔找父母要钱去买,买一件衣服也找父母要钱去买,不管买什么都要去找父母要钱买。大家有没有觉得这种方式太低效了呢?即使你父母不觉得烦,你自己都可能觉得烦了。那为了提高效率,我们的父母一般会将每个月的生活费一起打给我们,我们要是要用钱了,就直接从父母给我们的生活费里拿,而不是每次要花钱的时候都得找父母去要,当我们把父母给我们的生活费都用完了之后我们可以再去找父母要,这种方式相较于前一种大大的提高了效率。
在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。
内存池
内存池是指程序预先从操作系统申请一块足够大内存,
此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;
同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。
当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
内存池主要解决的问题
内存池主要解决的当然是效率
的问题,其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。那么什么是内存碎片
呢?
再需要补充说明的是内存碎片分为外碎片和内碎片,上面我们讲的外碎片问题。外部碎片是一些空闲的连续内存区域太小,这些内存空间不连续
,以至于合计的内存足够,但是不能满足一些的内存分配申请需求
。内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。
内碎片问题,我们后面项目就会看到,那会再进行更准确的理解。
malloc
C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的,而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用。
当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”
。malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的一套,linux gcc用的glibc中的ptmalloc。
设计一个定长的内存池
作为程序员(C/C++)我们知道申请内存使用的是malloc,malloc其实就是一个通用的大众货,什么场景下都可以用, 但是什么场景下都可以用就意味着什么场景下都不会有很高的性能, 下面我们就先来自己设计一个定长内存池做个开胃菜,当然这个定长内存池在我们后面的高并发内存池中也是有价值的,所以学习他目的有两层,先熟悉一下简单内存池是如何控制的,第二他会作为我们后面内存池的一个基础组件。
定长内存池就证明我们实现的这个内存池是一个定长的,那么如何实现这个定长呢?我们可以使用非类型模板参数来实现,表示我们在内存池里面申请的对象大小都是N
template<size_t N>
class ObjetcPool
{
};
我们同样可以使用另外一种方式来实现这个定长,我们可以给一个模板参数,使得我们在内存池里面每次申请对象大小都是T类型的大小
template<class T>
class ObjetcPool
{
};
下面问题又来了,我们的定长内存池需要哪些成员变量呢?我们来一起分析一下
因为是定长内存池,所以我们需要向堆申请一个大块内存,因此我们这里可以使用一个指针指向这个大块内存来管理它,由于我们后面还需要对这个大块内存进行切分,所以我们需要一个变量来记录这个大块内存的剩余字节数,同样的为了便于进行切分这个指向大块内存的指针最好是字符指针
,因为对于字符指针来说我们想要向后移动n个字节时,只需要对该字符指针进行加n即可。
其次,当别人申请的内存块用完了想释放的时候,我们这里并不是将内存块直接还给堆,而是将还回来的定长内存块以链表的形式管理起来,管理释放回来的定长内存卡的链表叫做自由链表
,因此我们还需要一个指针指向这个自由链表的头部。
因此定长内存池中需要包含以下三个成员变量—— memory、remainBytes、freeList
char* _memory = nullptr;//指向大块内存的指针
size_t _remainBytes = 0;//大块内存在切分过程中的剩余字节数
void* _freeList = nullptr;//还回来过程中链接的自由链表的头指针
因为我们这个是自己设计的定长内存池,我们后面要用这个定长内存池与malloc做比较,因此我们在申请内存的时候不应该去向malloc申请,而是应该直接向堆申请内存,那么我们如何直接向堆申请内存呢?
在Windows下,可以调用VirtualAlloc函数,在Linux下,可以调用brk或者mmp函数
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage<<13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
这里我们进行了简单的封装,因此后面如果我们想要向堆直接申请内存的话,我们只需要调用SystemAlloc函数即可。
释放内存块
我们前面说过,我们释放回来的内存块并不是直接还给堆,而是以链表的形式进行管理起来,那么具体是如何管理的呢?
我们是采用头插
的方式来管理的,比如说我们现在还回来一块内存,指向这块内存的指针叫做obj,我们只需要让obj的前4/8个字节存储自由链表的头指针freeList的地址,如何再让它自己去做自由链表的头指针即可。
于是大家可能就会这么想:
这样的代码在32位下是没什么问题的,但是在64位下就会存在问题,因为在64位下我们想让obj的前8个字节去存储自由链表头指针freeList的地址,但是在64位下,int*解引用能访问的也只是四个字节所以就会存在问题。
那么我们应该如何解决这个问题呢?
我们知道,在32位平台下指针的大小是4个字节,64位平台下指针的大小是8个字节。而指针指向的数据类型,决定了指针解引用后访问的空间大小,因此我们我们这里采用二级指针
就行了
我们先将obj强转为二级指针,再将其解引用去存储自由链表头指针的地址,我们知道二级指针存的是一级指针的地址,而二级指针解引用后能向后访问一个指针的大小,在32位平台下访问的就是4个字节,在64位平台下访问的就是8字节,如此我们便解决了上面的问题。
需要注意的是,在释放对象的时候,我们应该显示调用对象的析构函数去清理该对象的资源,因为该对象有可能管理着其他的资源,如果没有将这些资源及时清理掉,就有可能会导致内存泄漏。
释放内存块的代码如下:
void Delete(T* ptr)
{
// 显示调用析构函数清理对象资源
ptr->~T();
//头插
*(void**)ptr = _freeList;//当前节点的前4/8个字节存原来头结点的地址
_freeList = ptr;//然后自己再做头结点
//if (_freeList == nullptr)
//{
// *(void**)ptr = nullptr;//将当前节点的前4/8个字节存nullptr
// _freeList = ptr;
//}
//else
//{
// *(void**)ptr = _freeList;
// _freeList = ptr;
//}
}
向定长内存池申请对象
我们知道定长内存池释放内存块的时候,并不是将内存块直接还给堆,而是直接将这些内存块通过自由链表管理起来,那我们申请的时候应该先看看自由链表里面有没有内存块,如果有的话我们先从自由链表里面拿从而达到重复利用。如果自由链表里面没有内存块了,如果此时memory为空,我们就需要先向堆申请一个大块内存,否则的话我们就需要在大块内存中切出定长内存块分给我们即可。
需要注意的是,每使用我们的大块内存进行切分之后,我们一定要及时的更新指向大块内存的指针——_memory
以及大块内存剩余字节数_remainBytes的值
,我们这里只是光有一块空间,所以和上面释放内存的逻辑一样,当我们拿到这块空间后,我们还需要显示的调用该对象的构造函数进行初始化
申请对象的代码如下:
T* New()
{
T* obj = nullptr;
//优先使用还回来的内存块对象,再次重复利用
//只要_freeList不为空,就证明有还回来的内存块对象
if (_freeList != nullptr)
{
//提前记录下一个还回来的内存块对象
void* next = *(void**)_freeList;
obj = (T*)_freeList;
_freeList = next;
}
else
{
//如果当前剩余字节数小于一个对象的大小
//剩余的空间我们不用了,我们重新生申请一块大点的空间
if (_remainBytes < sizeof(T))
{
_remainBytes = 128 * 1024;
//_memory = (char*)malloc(_remainBytes);
_memory = (char*)SystemAlloc(_remainBytes >> 13);
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}
obj = (T*)_memory;
//如果当前一个对象的大小比指针还小,那么我们就让它的大小变成指针的大小
//这样有助于我们存下一个节点的地址
size_t ObjSize = sizeof(T) < sizeof(void*)?sizeof(void*):sizeof(T);
_memory += ObjSize;//切分
_remainBytes -= ObjSize;
}
// 定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
}
定长内存池的整体代码如下:
template<class T>
class ObjectPool
{
public:
T* New()
{
T* obj = nullptr;
//优先使用还回来的内存块对象,再次重复利用
//只要_freeList不为空,就证明有还回来的内存块对象
if (_freeList != nullptr)
{
//提前记录下一个还回来的内存块对象
void* next = *(void**)_freeList;
obj = (T*)_freeList;
_freeList = next;
}
else
{
//如果当前剩余字节数小于一个对象的大小
//剩余的空间我们不用了,我们重新生申请一块大点的空间
if (_remainBytes < sizeof(T))
{
_remainBytes = 128 * 1024;
//_memory = (char*)malloc(_remainBytes);
_memory = (char*)SystemAlloc(_remainBytes >> 13);
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}
obj = (T*)_memory;
//如果当前一个对象的大小比指针还小,那么我们就让它的大小变成指针的大小
//这样有助于我们存下一个节点的地址
size_t ObjSize = sizeof(T) < sizeof(void*)?sizeof(void*):sizeof(T);
_memory += ObjSize;//切分
_remainBytes -= ObjSize;
}
// 定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
}
void Delete(T* ptr)
{
// 显示调用析构函数清理对象的资源
ptr->~T();
//头插
*(void**)ptr = _freeList;//当前节点的前4/8个字节存原来头结点的地址
_freeList = ptr;//然后自己再做头结点
//if (_freeList == nullptr)
//{
// *(void**)ptr = nullptr;//将当前节点的前4/8个字节存nullptr
// _freeList = ptr;
//}
//else
//{
// *(void**)ptr = _freeList;
// _freeList = ptr;
//}
}
private:
char* _memory = nullptr;//指向大块内存的指针
size_t _remainBytes = 0;//大块内存在切分过程中的剩余字节数
void* _freeList = nullptr;//还回来过程中链接的自由链表的头指针
};
性能对比
下面我们将自己实现的定长内存池和malloc/free进行性能对比,测试代码如下:
void TestObjectPool()
{
// 申请释放的轮次
const size_t Rounds = 5;
// 每轮申请释放多少次
const size_t N = 100000;
std::vector<TreeNode*> v1;
v1.reserve(N);
size_t begin1 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v1.push_back(new TreeNode);
}
for (int i = 0; i < N; ++i)
{
delete v1[i];
}
v1.clear();
}
size_t end1 = clock();
std::vector<TreeNode*> v2;
v2.reserve(N);
ObjectPool<TreeNode> TNPool;
size_t begin2 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v2.push_back(TNPool.New());
}
for (int i = 0; i < N; ++i)
{
TNPool.Delete(v2[i]);
}
v2.clear();
}
size_t end2 = clock();
cout << "new cost time:" << end1 - begin1 << endl;
cout << "object pool cost time:" << end2 - begin2 << endl;
}
测试结果:
可以看到在测试过程中,定长内存池申请或者释放定长对象会比malloc/free快很多,因为malloc是一个通用的大众货,什么场景下都可以用, 但是什么场景下都可以用就意味着什么场景下都不会有很高的性能。而定长内存池就是专门针对设计定长对象而设计的,因此在这种场景下使用定长内存池的效率会比malloc/free更高。这也就是我们老话常说的——尺有所短、寸有所长。
高并发内存池整体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。
- 性能问题
- 多线程环境下,锁竞争问题
- 内存碎片问题
concurrent memory pool主要由以下3个部分构成:
- thread cache: 线程缓存是每个线程独有的,用于小于256KB的内存的分配, 线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
- central cache: 中心缓存是所有线程所共享,thread cache是 按需从central cache中获取 的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧, 达到内存分配在多个线程中更均衡的按需调度的目的。 central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先
这里使用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
- page cache: 页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。
当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
高并发内存池——第一层Thread Cache
thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时都是无锁的。
Thread Cache申请内存和释放内存的大体思路
申请内存:
- 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
- 如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。
- 如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存:
- 当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
- 当链表的长度过长,则回收一部分内存对象到central cache。
我们在上面学过定长内存池,我们知道定长内存池是用来申请或者释放一块固定大小的内存块,对于释放的内存块我们通过自由链表来管理。而我们的高并发内存池它是用来申请或者释放不同大小的内存块的,这也就间接说明了我们需要多个自由链表来管理释放回来的内存块,其实大家通过这个也可以联想到我们的Thread Cache它实际上采用的是哈希桶
结构。
我们上面说过Thread Cache主要是用于小于256KB的内存的分配,我们这里其实也可以采用类似于定长内存池的方式,但是不推荐这么做,为什么呢?下面我来解释一下不推荐的理由
假设我们每个桶的大小都是8Bytes,那么对于256KB来说我们就需要32768
个桶,我们是可以开这么多个桶,但是这样做太浪费了,产生的自由链表太多了。
因此我们的高并发内存池的Thread Cache不采用这种方式,而是换了一种方式。我们采用内存对齐
的方式,当然了这种方法不是我想出来的,我是从大佬那里学到的。
内存对齐:
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
第一种方法
//我们一般能想到的方法
//static inline size_t _RoundUp(size_t size, size_t alignNum)
//{
// size_t alignSize;
// //比如以8字节对齐,在[1,128]这个区间内
// //8不需要对齐,应该它就是8的倍数了
// //16不需要对齐,因为它就是8的倍数
// if (size%alignNum == 0)
// {
// alignSize = size;
// }
// //1,它对齐后应该是8
// //9,它对齐后应该是16字节
// else
// {
// alignSize = (size / alignNum + 1)*alignNum;
// }
// return alignSize;
//}
//第二种方法
//大佬的方法
static inline size_t _RoundUp(size_t size, size_t AlignNum)
{
//比如说在[1,128]这个区间里面以八字节对齐
//1-8对齐之后都是8
//1+7 7取反后:1000(前面多余的1省略没写)
//2+7
//3+7
//...
//从1-8 +7 最终结果会在8-15这个区间里面不会超过15
//然后再和7的反码一与
//因此最终与完的结果就是8
return ((size + AlignNum - 1)&~(AlignNum - 1));
}
//内存对齐
static size_t RoundUp(size_t size)
{
//首先我们的ThreadCache不能够申请超过256kb的空间
//assert(size <= MAX_BYTES);
if (size <= 128)
{
return _RoundUp(size, 8);
}
else if (size <= 1024)
{
return _RoundUp(size, 16);
}
else if (size <= 8 * 1024)
{
return _RoundUp(size, 128);
}
else if (size <= 64 * 1024)
{
return _RoundUp(size, 1024);
}
else if (size <= 256 * 1024)
{
return _RoundUp(size, 8 * 1024);
}
else
{
assert(false);
return -1;
}
}
我们采用内存对齐的方式之后,我们就不再需要开32768个桶,我们只需要开208个桶
就可以完成各个不同内存块的申请和释放。但是这里的内存对齐也有一点点小缺点,就是它会造成大约%10左右内存碎片的浪费。比如说我要申请1025字节的内存块,但是这里经过对齐之后它会给我分配1152字节的内存块,多给我分配了124字节的内存,124除以1152最后的值大约就是%10.
根据上面的内存对齐规则,thread cache中需要开的桶的个数由32768变成了208,同时我们这一层申请的都是小于256KB大小的内存块,因此我们可以将这些数据在Common.h里面定义成全局的静态变量
static const size_t MAX_BYTES = 256 * 1024;//ThreadCache最大能申请256KB
static const size_t NFREELIST = 208;//一个ThreadCache中最多有多少个桶
上面对于我们申请某一大小的内存块时,我们需要将其进行内存对齐,那我们要申请某一大小的内存时,我们如何知道它是属于哪个哈希桶的呢?
我们一般人想到的方法是这样的:
//第一种方法,我们一般能想到的办法
static inline size_t _index(size_t size, size_t alignNum)
{
size_t index;
//比如在[1,128]这个区间内,以8字节对齐
//8,8%8==0,而8/8==1,因此它应该挂在8/8-1也就是0号桶上面
if (size%alignNum == 0)
{
index = size / alignNum - 1;
}
//对于1而言,1%8!=0
//因此它应该挂在1/8也就是0号桶上面
else
{
index = size / alignNum;
}
return index;
}
下面我们一起来学习一下大佬使用位运算的方法
//第二种方法
//大佬的方法
static inline size_t _Index(size_t size, size_t align_shift)
{
//比如size是8,align_shift是3
//8+(1<<3)-1 = 15
//(15>>3) -1 = 0
return ((size + (1 << align_shift) - 1) >> align_shift) - 1;
}
// 计算映射的哪一个自由链表桶
static size_t Index(size_t size)
{
assert(size <= MAX_BYTES);
//每个区间多少个链
static int group_array[4] = {
16,56,56,56};
if (size <= 128)
{
return _Index(size, 3);
}
else if (size <= 1024)
{
return _Index(size-128, 4)+group_array[0];
}
else if (size <= 8 * 1024)
{
return _Index(size-1024, 7)+group_array[0]+group_array[1];
}
else if (size <= 64 * 1024)
{
return _Index(size-8*1024, 10)+group_array[0]+group_array[1]+group_array[2];
}
else if (size <= 256 * 1024)
{
return _Index(size - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];
}
else
{
assert(false);
return -1;
}
}
注意: 因为我们这里是一个大的项目,有可能内存对齐,哈希桶的映射这些公共的方法在后面的两层也有可能会用到,所以我们将这些代码放到了一个公共的头文件Common.h
里面,其实内存对齐以及哈希桶的映射在SizeClass
这个类中。如果后面后两层需要再添加什么公共的方法或者类的时候,我们只需要在Common.h里面添加即可。
Thread Cache中的自由链表
我们上面说过thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。所以为了方便我们对自由链表这个结构进行了简单的封装并放在了公共的头文件Common.h里面
,当前我们的自由链表的主要操作就是Push
和Pop
内存对象,等到后面我们发现还有别的需求时我们会添加对应的成员函数。
//管理切分好的小对象的自由链表
class FreeList
{
public:
void Push(void* obj)
{
//头插的时候obj不能够为空,因此需要进行断言
assert(obj);
//头插
//*(void**)obj = _freeList;//obj的前4/8个字节存当前的头节点
Next(obj) = _freeList;
//然后obj去做新的头
_freeList = obj;
}
void* Pop()
{
assert(_freeList);
//头删
void* obj = _freeList;
_freeList = Next(obj);
return obj;
}
//判空
bool Empty()
{
return _freeList == nullptr;
}
private:
void* _freeList = nullptr;
};
因为我们的自由链表是单链表,所以我们这里的Push和Pop都采用的是和定长内存池一样的头插以及头删,因为对于单链表而言采用尾插尾删的时间复杂度是O(N),因为要遍历链表去找尾,这样效率会低很多。
头插:
头删:
注意: 我们头插和头删里面的Next函数它的作用就是取出当前指针头4/8个字节
static void*& Next(void* obj)
{
return *(void**)obj;
}
Thread Cache代码框架
class ThreadCache
{
public:
//申请和释放内存对象
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
//从中心缓存获取对象
void* FetchFromCentralCache(size_t index, size_t size);
private:
FreeList _freeList[NFREELIST];
};
//TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
Thread CacheTLS无锁访问
我们上面说过每一个线程都有一个thread cache对象,在我们Thread Cache这一层申请和释放对象时是无锁的,我们学过Linux我们也知道,同一个进程组里面的线程他们是共享进程的地址空间的。那我们这里的thread cache应该如何设计才能实现无锁访问呢?
如果thread cache设计为全局的,那么它就会被所有线程所共享,这样就需要加锁,从而违反了上面所说的在这一层申请和释放对象时是无锁的。
因此要想让我们每一个线程无锁的访问自己的thread cache,我们就需要用到线程局部存储(TLS)
,它是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。
而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。
//TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
需要注意的是: 每个线程并不是说一创建就有了自己的thread cache,而是只要当这个线程去调用相关申请内存的接口时才会创建自己的thread cache。
申请内存对象
通过计算需要申请的内存大小,从而映射到对应的桶。如果自由链表_freeLists[index]中有对象,则直接Pop一个内存对象返回。如果_freeLists[index]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。这里的FetchFromCentralCache是thread cache的一个成员函数,具体实现我们放到后面来说。
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
//对齐后的大小
size_t alignSize = SizeClass::RoundUp(size);
//映射的桶下标
size_t index = SizeClass::Index(size);
if (!_freeList[index].Empty())
{
return _freeList[index].Pop();
}
else
{
return FetchFromCentralCache(index, alignSize);
}
}
释放内存对象
计算size映射自由链表桶位置index,将对象Push到_freeLists[index]。
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
//映射的自由链表桶下标
size_t index = SizeClass::Index(size);
_freeList[index].Push(ptr);
}
高并发内存池——第二层Central Cache
central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。
Central Cache申请内存和释放内存的大体思路:
申请内存:
- 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
- central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache。
- central cache的中挂的span中use_count记录分配了多少个对象出去,分配一个对象给threadcache,就++use_count
释放内存:
- 当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时–use_count。
- 当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。
通过上面的图我们可以知道,Central Cache的每个桶都是由一个又一个span结点组成的SpanList双链表结构,span管理一个跨度的大块内存,它是一个管理以页为单位的大块内存的结构。下面我们来一起看一下span的结构吧
//Span管理一个跨度的大块内存
//管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID page_id = 0;// 大块内存起始页的页号
size_t n = 0;//页的数量
Span* _prev = nullptr;//双向链表结构
Span* _next = nullptr;
void* _freelist = nullptr; // 切好的小块内存的自由链表
size_t _usecount = 0;// 切好小块内存,被分配给thread cache的计数
};
大家看到span的结构后,可能会有点小疑问:这里的页号的类型是PAGE_ID
,那这个PAGE_ID是什么呢?
这里的PAGE_ID是我们经过typdef后的数据类型,那为什么又要对它typedef呢?下面我来给大家解释一下
我们学过Linux操作系统我们知道,一个进程它是有自己的进程地址空间的,其中在32位下,进程地址空间的大小是2^3 2,在64位下,进程地址空间的大小是2^64.
下面我们来算一笔账,假设我们以8K为1页,那么在32位平台下,进程地址空间就可以被分成232÷213=2^19 个页,在64位平台下,进程地址空间可以被分成264÷213=2^51个页。所以我们就可以知道在32位与在64位下,进程地址空间被分成的页的数量是不同的,所以我们就不能够简单的去用无符号整型去作为页号的数据类型。
因此我们这里需要使用条件编译来解决这个问题
//注意我们这里需要使用条件编译
//因为在Span中有页号,假如我们以8kb为1页
//那么在32位下,我们就有2^32/2^13 = 2^19页号
//在64位下,我们就有2^64/2^13 = 2^51页号
//因此我们要对PAGE_ID进行typedef才行
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else
// linux
#endif
注意: 在32位下,_WIN32有定义,_WIN64没有定义;而在64位下,_WIN32和_WIN64都有定义。因此在条件编译时,我们应该先判断_WIN64是否有定义,再去判断_WIN32是否有定义。
Central Cache中的SpanLists
SpanList是由我们一个又一个span结点所组成的双链表结构,其中它还是一个带哨兵卫的双向循环链表,下面我们一起来看一下它的结构吧。
class SpanList
{
public:
//构造函数
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
//在pos位置前插入一个节点
void Insert(Span* pos,Span* newSpan)
{
assert(pos);
assert(newSpan);
//prev newSpan pos
Span* prev = pos->_prev;
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
//删除pos节点
void Erase(Span* pos)
{
assert(pos);
assert(pos!=_head);//删除的节点不能是哨兵卫头结点
//prev pos next
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;//哨兵卫头结点
public:
std::mutex _mtx;//桶锁
};
注意: 因为我们第二层Central Cache是被所有线程所共享的,所以在每个桶里面还需要定义一把桶锁保证线程安全。
Central Cache代码框架
同一个进程组中的每个线程都有一个属于自己的thread cache,我们是用TLS来实现每个线程无锁访问属于自己的thread cache的。而central cache在整个进程中只有一个,对于这种只能创建一个对象的类,我们可以将其设置为单例模式。
//采用单例模式--饿汉
class CentralCache
{
public:
static CentralCache* GetInstance()
{
return &_sInst;
}
// 获取一个非空的span
Span* GetOneSpan(SpanList& list, size_t size);
// 从中心缓存获取一定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
private:
CentralCache()
{
}
//C++11防拷贝
CentralCache(const CentralCache&) = delete;
SpanList _spanLists[NFREELIST];
static CentralCache _sInst;
};
Thread Cache::从中心缓存获取对象
还记得我们之前说过在第一层的时候如果Thread Cache对应的桶里面有对象,我们只需要Pop一个对象返回即可,但是如果没有对象我们就需要去找Central Cache去要。我们在第一层的时候说我们这个接口会在后面具体实现的时候具体展开来说,下面我们就来说一下这个接口。
现在有一个问题:我们向Central Cache要一批对象,那我们具体应该要多少呢?
如果一次要的太多,当前线程的Thread Cache用不完造成浪费,然后其他线程来申请的时候,Central Cache又没有小块的自由链表对象分给其他线程的Thread Cache。如果一次要的太少,那么我马上就会用完,然后又得来找Central Cache要。
所以基于这两个问题,我们联想到了一种方法——这种批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法。
当我们某个线程的Thread Cache向Central Cache申请内存时,如果申请的是较小的内存对象,我们可以多要一点,申请较大的内存对象我们就少要一点。
通过下面这个NumMoveSize
函数,我们就可以根据所需申请的对象的大小计算出具体Thread Cache向Central要的对象个数,并且可以将要的对象个数控制到2~512个之间。也就是说,如果我Thread Cache当前申请的对象比较小,那么我就向你Central Cache要512个对象;如果我Thread Cache当前申请的对象比较大,那么我就向你Central Cache要512个对象。
class SizeClass
{
// 一次thread cache从中心缓存获取多少个自由链表对象
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
//分配多少个给thread cache
size_t num = MAX_BYTES / size;
//[2,512],一次批量移动多少个对象的(慢启动)上限值
//小对象一次批量上限高
//大对象一次批量上限低
if (num < 2)
{
num = 2;
}
if (num>512)
{
num = 512;
}
return num;
}
}
但是这个时候问题又来了,对于小对象,我们一次性要给512个还是太多了,Thread Cache一般肯定用不了这么多,那我们有没有什么办法解决一下这个问题呢?
答案是有的,我们可以在FreeList结构中增加一个叫做_maxSize的成员变量
,该变量的初始值设置为1,并且提供一个公有成员函数用于获取这个变量。也就是说,现在Thread Cache中的桶里面都会有一个自己的_maxSize。
//管理切分好的小对象的自由链表
class FreeList
{
public:
size_t& MaxSize()
{
return _maxSize;
}
private:
void* _freeList = nullptr;
size_t _maxSize = 1;
};
自此,当我们的Thread Cache向Central Cache申请对象时,我们会比较NumMoveSize
与MaxSize
的值,取它俩直接较小的值作为这一次Thread Cache向Central Cache要的对象个数。此外,如果本次采用的是maxSize的值,那么就会将thread cache中该自由链表的maxSize的值进行加一。
因为我们要的是批量的对象,所以我们需要将这些从Central Cache要来的批量对象插入到Tread Cache对应的桶里面才行,可是我们之前FreeList里面只有单个插入的接口,没有插入一段范围的接口,于是下面我们可以实现一个插入一段范围的接口PushRange
//插入一段范围的节点
//[start,end]这段区间的节点是链接着的
//那我们如何让这段区间和_freeList链接起来呢?
//我们只需要让end的前4/8字节存当前_freeList的地址
//再让start去做新的_freeList就可以将他们给链接起来了
void PushRange(void* start, void* end)
{
NextObj(end) = _freeList;
_freeList = start;
}
对于这里的PushRange的代码,大家可能有点疑问,为啥子插入一段范围的接口就只要两行代码就搞定了。下面我通过图片来给大家解释一下
解决了上面的这些问题之后,我们来看一下Thread Cache里面从中心缓存获取对象
的代码实现吧
//ThreadCache对应的桶中如果没有自由链表对象
//我们就找CentralCache中要
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
// 慢开始反馈调节算法
// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
// 2、如果你不断要这个size大小内存需求,那么batchNum就会不断增长,直到上限
// 3、size越大,一次向central cache要的batchNum就越小
// 4、size越小,一次向central cache要的batchNum就越大
size_t batchNum = min(_freeList[index].MaxSize(), SizeClass::NumMoveSize(size));
if (batchNum == _freeList[index].MaxSize())
{
_freeList[index].MaxSize() += 1;
}
//刚开始我不会给你太多,但是最低不会少于1个
assert(batchNum >= 1);
void* start = nullptr;
void* end = nullptr;
//实际给你的对象
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
//假如给一个
if (actualNum == 1)
{
assert(start==end);
return start;
}
//给多个
else
{
//因为我们最后要返回链接后的头结点
//因此我们只需要将[start->next,end]这段区间与index所对应的桶给链接起来即可
_freeList[index].PushRange(NextObj(start), end, actualNum - 1);
return start;
}
}
从中心缓存获取一定数量的对象给thread cache
- 先根据要申请的对象大小去映射到Central Cache对应的桶(Thread Cache和Central Cache的桶映射规则是一样的)
- 因为Central Cache是被所有线程所共享的,因此当Thread Cache要访问Central Cache对应的哈希桶之前,我们需要先对桶进行加锁,保证线程安全
- 在保证线程安全的前提下,从Central Cache中获取对应的span,然后将该span切分成若干个小块的自由链表对象,Thread Cache要多少,我给多少,但是如果我span下面挂的自由链表对象的数量不够,那我有多少给你多少。然后批量的给一批小块的自由链表对象给Thread Cache。
- 前面加锁了,后面一定要记得解锁,否则会造成死锁。
我们这里还要将span切分成若干个小块的自由链表对象,下面我们来说一下切分的逻辑
下面我们一起来看一下这个接口的代码实现
// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
//先算出size映射到哪个桶
size_t index = SizeClass::Index(size);
//因为CentralCache是所有线程所共享的,因此ThreadCache从CentralCache申请内存的时候要加锁
_spanLists[index]._mtx.lock();
//假设我们申请到了一个span对象
Span* span = GetOneSpan(_spanLists[index],size);
assert(span);
assert(span->_freelist);
// 从span中获取batchNum个对象
// 如果不够batchNum个,有多少拿多少
start = span->_freelist;
end = start;
size_t i = 0;
size_t actualNum = 1;
//这里有两者情况:1.有可能当前对象下的_freeList自由链表个数够分给你batchNum个
// 2.有可能当前对象下的_freeList自由链表对象个数不够分给你batchNum个
while (i < batchNum - 1 && NextObj(end) != nullptr)
{
end = NextObj(end);
++i;
//每分一个actualNum++
++actualNum;
}
//最终不管够不够分,我们需要将end的Next再赋值给span的free_List
span->_freelist = NextObj(end);
//然后断开end与它next的链接
NextObj(end) = nullptr;
//加锁了就一定不要忘记解锁,否则有可能会造成死锁
_spanLists[index]._mtx.unlock();
//最后真正分配的小对象个数
return actualNum;
}
获取一个非空的Span
- 我们先到Central Cache当前的spanList里面去查看是否还有未分配对象span,如果有,我们直接进行返回即可
- 如果当前Central Cache里面没有未分配对象的span,那我们就需要去找第三层Page Cache去要,为了保持连贯性,我们将这个成员函数实现放到第三层Page Cache里面讲
高并发内存池——第三层Page Cache
第三层Page Cache也是哈希桶结构,但是与第一层和第二层的映射规则不同的是,第三层Page Cache的哈希桶映射规则采用的是直接定址法
,比如说1号桶里面挂的都是1页的span,2号桶里面挂的都是2页的span,依此类推。
Page Cache申请内存和释放内存的大体思路
申请内存:
- 当central cache向page cache申请内存时,
page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。
比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页pagespan分裂为一个4页page span和一个6页page span。 - 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复1中的过程。
- 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
释放内存:
- 如果central cache释放回一个span,
则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。
这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
我们这里的Page Cache最大是挂128页的span,因此我们可以让桶号与页对应起来,我们将0号桶给空出来,然后将[1,128]号桶与1页——128页之间建立映射关系,所以我们将Page Cache的哈希桶个数设置成129
,并且用全局的静态变量来保存。
static const size_t NPAGES = 129;
大家这个时候可能会问:为什么最大挂128页的span呢?我们知道单个线程它最大申请的大小是256KB,而在我们这个高并发内存池里面我们是以8KB为一页,而128页正好就是我们的1024KB,128页刚好可以被切分成4个256KB的对象,所以是比较适中的。
Page Cache代码框架
跟Central Cache一样,Page Cache在整个进程中也只有一个,对于这种只能创建一个对象的类,我们也可以将其设置为单例模式。
//单例模式--饿汉
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
Span* NewSpan(size_t k);
//互斥锁
std::mutex _pagemtx;
private:
PageCache()
{
}
//防拷贝
PageCache(const PageCache&) = delete;
//PageCache也采用哈希桶的方式,只不过跟上两层的映射方式不一样
//它这里一共有128个桶,[1,128]
//它是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
SpanList _spanLists[NPAGES];
static PageCache _sInst;
};
下面我们先来梳理一下逻辑,如果一个线程它要申请对象时,它会先去找它的Thread Cache去要,如果Thread Cache对应的桶里面已经没有自由链表对象了,那么它就要去找Central Cache去申请。同样的如果Central Cache到对应的桶里面发现已经没有未分配对象的span了,那么它就会找Page Cache要从而获取一个非空的Span。
在前面我们第二层的时候,我们说过Central Cache获取一个非空的Span这个接口我们放到第三层来讲,下面我们就先来说一下这个接口里面的小细节
。
Central Cache::获取一个非空的Span
我们的Central Cache获取一个非空的Span时,它并不是直接就去找Page Cache要,而是会去遍历一下自己对应哈希桶看看桶里面有没有还未分配对象的span,因为我们的SpanList是带头双向链表结构,所以为了方便遍历,我们可以模拟类似于C++STL里面链表迭代器的方式,给SpanList提供Begin和End成员函数
,分别用于获取双链表中的第一个结点和最后一个结点的下一个结点(也就是哨兵卫头结点)但是我们这里不封装迭代器,而是直接用它的原生指针去模拟。
class SpanList
{
public:
//构造函数
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
//在pos位置前插入一个节点
void Insert(Span* pos,Span* newSpan)
{
assert(pos);
assert(newSpan);
//prev newSpan pos
Span* prev = pos->_prev;
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
//删除pos节点
void Erase(Span* pos)
{
assert(pos);
assert(pos!=_head);//删除的节点不能是哨兵卫头结点
//prev pos next
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;//哨兵卫头结点
public:
std::mutex _mtx;//桶锁
};
如果当前桶遍历完了之后,发现都没有未分配对象的Span了,这时候我们就要去找Page Cache去要一个以页为单位的Span了。可是这个时候问题又来了: 我Central Cache应该向Page Cache去要一个几页的Span呢?
不要忘了我们获取一个非空的Span这个接口的第二个参数size
是Thread Cache要的小对象大小,我们可以根据这个小对象的大小计算出Thread Cache一次向Central Cache申请对象的个数上限,再根据一次申请对象的个数上限*该对象的大小的字节数转换成页数
,我们就可以知道我应该向Page Cache去要一个几页的Span了.
下面又有问题了:我们如何将字节数转换成页数呢?
我们说过我们这里一页是8KB,因此我们可以定义一个PAGE_SHIFT
代表页大小转换偏移,1<<13==8*1024也就是8KB,所以我们这里的PAGE_SHIFT的值就是13
static const size_t PAGE_SHIFT = 13;//8k为一页
下面我们再来看一下Central Cache应该向Page Cache去要一个几页的Span的这个接口
class SizeClass
{
public:
//计算一次向PageCache获取多少页
//单个对象 8byte
//...
//单个对象256kb
static size_t NumMovePage(size_t size)
{
//计算要多少个size大小的对象
size_t num = NumMoveSize(size);
//size个大小的字节数是多少
size_t bytes = num*size;
//如果当前要的字节数一页都不到
//我们就直接给它一页
//否则你要几页我就给你几页
size_t npage = bytes >> PAGE_SHIFT;
if (npage == 0)
{
npage = 1;
}
return npage;
}
};
OK,现在我们知道了要想Page Cache申请多少页之后,我们还有问题需要处理,我们现在向Page Cache申请到了一个若干页的Span,但是我们还需要将这个切分成一个个小的自由链表对象,最后才能分给Thread Cache。
可是问题又来了:我们既然要切分的话,我们肯定就得找到这个Span的起始地址以及结束位置,那么如何找到这个Span的起始地址以及结束位置呢?
我们可以通过用当前Span的页号*一页的大小,即页号<<PAGE_SHIFT
就可以得到这个Span的起始地址了
拿到了Span的起始地址之后,再去算结束位置就会方便很多了,我们通过当前Span它的页数*一页的大小再加上我们Span的起始地址
,我们就可以算出结束位置了,有了起始位置结束位置,以及每个小对象的大小接下来我们就可以进行切分了。这里我们切分再插入的时候就不采用头插了,而是采用尾插的方式, 因为采用尾插的方式,这些对象看起来是按照链式结构链接起来的,但是实际上它们在物理上是连续的,这时当我们把这些内存分配给某个线程使用时,可以提高该线程的CPU缓存利用率。
Span切分好了之后,我们还需要将这个Span头插到对应的哈希桶里面,因为SpanList没有提供头插的接口,我们可以在这里提供一个头插的接口PushFront.
class SpanList
{
public:
void PushFront(Span* span)
{
Insert(Begin(), span);
}
private:
Span* _head;//哨兵卫头结点
public:
std::mutex _mtx;//桶锁
};
这些小细节都梳理清楚了之后,下面我们再来整体梳理一下获取一个非空的Span这个接口。
- 我们先到Central Cache当前的spanList里面去查看是否还有未分配对象Span,如果有,我们直接弹出一个返回即可
- 我们此时发现如果Central Cache对应的桶里面没有未分配对象的Span,我们就应该去找Page Cache去要一个非空的span,但是在这之前我们应该先解掉当前的桶锁,因为在Central Cache里面调用我们这个接口的函数是
FetchRangeObj
并且它是加了桶锁的,虽然Central Cache这个桶里面没有未分配的自由链表分给Thread Cache,但是我们的Thread Cache它除了申请内存它还可以释放内存。所以我们可以在访问Page Cache之前将Central Cache对应的桶锁给解了,这样其他的Thread Cache还内存给Central Cache这个桶时就不会阻塞。 - 此时我们再来向Page Cache申请一个非空的Span,申请成功之后再将这个Span切分成若干个自由链表,切分完了之后在把这个Span头插到对应的桶之前我们再把对应的桶锁给加上,最后把这个Span头插到对应的桶里面去即可。
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
// 查看当前的spanlist中是否有还有未分配对象的span
Span* it = list.Begin();
while (it != list.End())
{
//如果当前spanlist中还有未分配对象的span
//我们就返回它
if (it->_freelist != nullptr)
{
return it;
}
//否则往后走
else
{
it = it->_next;
}
}
//先把Central Cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
list._mtx.unlock();
//走到这里证明当前桶里面没有未分配对象的span
//因此我们需要找PageCache去要
PageCache::GetInstance()->_pagemtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_isUse = true;
span->_objSize = size;
PageCache::GetInstance()->_pagemtx.unlock();
//对获取的span进行切分,不需要加锁,因为这会儿其他线程访问不到这个span
//因为我们还没有把这个span挂到SpanList上面
//我们将刚刚得到的span进行切分,然后再插入到对应CentralCache的桶中
//切分的时候:我们需要先计算出当前span的起始地址
// 计算出当前span究竟有多大
// 计算出当前span的结束地址
//然后按size大小进行切分
char* start = (char*)(span->page_id << PAGE_SHIFT);//通过页号*1页的大小(8k) 我们就可以得到它的起始地址
size_t bytes = span->n << PAGE_SHIFT;//通过页的数量*页的大小我们就可以知道当前span究竟有多大
char* end = start + bytes;//起始地址+span的大小就可以得到结束地址
// 把大块内存切成自由链表链接起来
// 1、先切一块下来去做头,方便尾插
span->_freelist = start;
start += size;
void* tail = span->_freelist;
int i = 1;
while (start < end)
{
++i;
NextObj(tail) = start;
tail = NextObj(tail);
start += size;
}
//注意这里还要将tail的下一个节点的地址置空
//不然会导致程序崩溃引起一连串的问题
//!!!!!!!!!!!!!
NextObj(tail) = nullptr;
//切好span以后,需要把span挂到桶里面的时候,再加锁
list._mtx.lock();
//将切分好的span头插到CentralCache对应的桶中
list.PushFront(span);
return span;
}
获取一个K页的Span
- 我们首先需要到Page Cache对应哈希桶里面看看是否还有Span,如果有的话我们直接
PopFront
弹出一个即可 - 如果当前桶为空,没有Span,那我们就需要去后面找一个大一点的Span进行切分,切分成K页的Span与N-K页的Span,把K页的Span返回即可
- 如果遍历完了Page Cache的所有桶之后发现都没有Span了,此时我们就需要直接向堆申请一个
NPAGE-1页
(也就是128页)的Span,然后根据申请到的内存地址>>PAGE_SHIFT就可以求出这个Span的页号
,最后再递归调用这个函数重复前面两步即可。
因为后面会用到判空以及头删,因此我们可以在SpanLIst函数里面添加这两个接口。
//带头双向循环链表
class SpanList
{
public:
//判空
bool Empty()
{
return _head->_next == _head;
}
//头删
Span* PopFront()
{
//注意我们这里的删除没有delete它
//只是断开了它和它的prev next的链接
Span* front = _head->_next;
Erase(front);
return front;
}
private:
Span* _head;//哨兵卫头结点
public:
std::mutex _mtx;//桶锁
};
下面我们再来看一下一个N页的Span切成K页Span和N-K页Span的逻辑
下面我们来看一下获取一个K页的Span的代码
Span* PageCache::NewSpan(size_t k)
{
//断言一下,申请的页最少是一页,最多不会超过NPAGES
assert(k > 0 && k < NPAGES);
//先去PageCache对应的桶里面看看有没有span
if (!_spanLists[k].Empty())
{
//有的话我们就直接进行头删
return _spanLists[k].PopFront();
}
//如果对应的桶没有span的话,我们就去后面找大一点的桶
//将大点的桶进行切分,切成一个k页的桶和一个n-k页的桶
for (int i = k + 1; i < NPAGES; i++)
{
//当前桶还有span,我们就从当前桶切分
if (!_spanLists[i].Empty())
{
Span* NSpan = _spanLists[i].PopFront();
Span* KSpan = new Span;
//从当前大一点的桶进行切分
//切成一个k页的span和一个n-k页的span
//k页的span返回CentralCache
//n-k页的span挂到n-k的桶中去
KSpan->page_id = NSpan->page_id;
KSpan->n = k;//k页
//页号+=k
//比如说我之前的页号是100 现在我切了两个页出去之后
//我们页应该从102开始
NSpan->page_id += k;
//页的数量应该用n-=k
NSpan->n -= k;
//然后我们将n-k页的span头插到n-k号桶中去
_spanLists[NSpan->n].PushFront(NSpan);
return KSpan;
}
}
// 走到这个位置就说明后面没有大页的span了
// 这时就去找堆要一个128页的span
Span* bigSpan = new Span;
void* ptr = (void*)SystemAlloc(NPAGES - 1);
//根据地址算出页号
//地址/一页的大小就是页号
bigSpan->page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
//页数就是NPAGES-1
bigSpan->n = NPAGES - 1;
//根据页就可以找到对应的桶
//我们将这个从堆里面申请的span头插到PageCache对应的桶中
_spanLists[bigSpan->n].PushFront(bigSpan);
//插入进去了之后,现在我们的PageCache中就有大的桶了
//因此我们还需要在调用一次自己
return NewSpan(k);
}
Thread Cache释放内存
- 当释放内存小于256k时将内存释放回Thread Cache,计算size映射自由链表桶位置index,将对象Push到_freeLists[index]。
- 但是随着不断的释放内存给Thread Cache则会导致Thread Cache对应的哈希桶里面的自由链表越来越长,当Thread Cache对应的自由链表的长度达到一定长度时,由于我们现在用不了这么多,我们应该还一部分给中心缓存Central Cache。这样可以使得其他线程来这个桶申请内存的时候,Central Cache可以把刚刚你还回来的那一部分自由链表分配给这些线程。
我们这里说的是当Thread Cache对应的自由链表的长度达到一定长度时我们应该还一部分给中心缓存Central Cache,那这里的一定长度
是多少呢?
我们这里的一定长度以Thread Cache一次批量向Central Cache申请的对象个数作为标准,也就是说当Thread Cache某个桶里面的自由链表长度超过它一次批量向central cache申请的对象个数,我们就需要还一部分给Central Cache。
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
//映射的自由链表桶下标
size_t index = SizeClass::Index(size);
_freeList[index].Push(ptr);
// 当链表长度大于一次批量申请的内存时就开始还一段list给central cache
if (_freeList[index].Size() >= _freeList[index].MaxSize())
{
ListTooLong(_freeList[index], size);
}
}
当前自由链表的长度已经超过了一次批量向Central Cache申请的对象个数
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
void* start = nullptr;
void* end = nullptr;
list.PopRange(start, end, list.MaxSize());
//将这一批自由链表对象还给Central Cache对应的Span
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
从上述代码可以看出,FreeList类里面还需要一个获取自由链表中对象的个数的成员函数,还需要支持用PopRange函数从自由链表中取出指定个数的对象。因此我们需要给FreeList类增加一个对应的PopRange函数
,然后再增加一个_size成员变量
,该成员变量用于记录当前自由链表的长度,并且每当我们在自由链表里面插入或者删除对象时,我们都需要更新这个成员变量的值。
//管理切分好的小对象的自由链表
class FreeList
{
public:
void Push(void* obj)
{
//头插的时候obj不能够为空,因此需要进行断言
assert(obj);
//头插
//*(void**)obj = _freeList;//obj的前4/8个字节存当前的头节点
NextObj(obj) = _freeList;
//然后obj去做新的头
_freeList = obj;
//每插入一个链表
//当前桶里面自由链表的个数++
_size++;
}
//插入一段范围的节点
//[start,end]这段区间的节点是链接着的
//那我们如何让这段区间和_freeList链接起来呢?
//我们只需要让end的前4/8字节存当前_freeList的地址
//再让start去做新的_freeList就可以将他们给链接起来了
void PushRange(void* start, void* end,size_t n)
{
NextObj(end) = _freeList;
_freeList = start;
_size += n;
}
void PopRange(void*& start, void*& end,size_t n)
{
//我们还的这一段自由链表的个数不能够超过这个桶里面自由链表的个数
assert(n <= _size);
start = _freeList;
end = start;
for (int i = 0; i < n - 1; i++)
{
end = NextObj(end);
}
//将end的下一个节点的地址赋值给_freeList
//然后让我们end与后面的节点断开连接
_freeList = NextObj(end);
NextObj(end) = nullptr;
//因为我们释放了一段自由链表
//因此这个桶里面自由链表的个数要-n
_size -= n;
}
void* Pop()
{
assert(_freeList);
//头删
void* obj = _freeList;
_freeList = NextObj(obj);
//每释放一个链表
//当前桶里面自由链表的个数--
--_size;
return obj;
}
//判空
bool Empty()
{
return _freeList == nullptr;
}
size_t& MaxSize()
{
return _maxSize;
}
size_t Size()
{
return _size;
}
private:
void* _freeList = nullptr;//自由链表的头指针
size_t _maxSize = 1;
size_t _size = 0;//自由链表的长度
};
对于FreeList类里面的PushRange
接口,我们最好也像PopRange
那样给它一个参数,用来表示这一次范围插入的对象个数。
因此我们就需要在之前调用PushRange的地方修改一下,而我们实际就在一个地方调用过PushRange函数,并且此时插入对象的个数也是很容易知道的。当时Thread Cache从Central Cache获取了actualNum个对象,将其中的一个返回给了申请对象的线程
,剩下的actualNum-1个挂到了thread cache对应的桶当中,所以这里插入对象的个数就是actualNum-1
。
//给多个
else
{
//因为我们最后要返回链接后的头结点
//因此我们只需要将[start->next,end]这段区间与index所对应的桶给链接起来即可
_freeList[index].PushRange(NextObj(start), end, actualNum - 1);
return start;
}
注意: 当thread cache的某个桶里面的自由链表过长时,我们实际就是把这个自由链表当中全部的对象都还给central cache了,但这里在设计PopRange接口时还是设计的是取出指定个数的对象
,因为在某些情况下当自由链表过长时,我们可能并不一定想把链表中全部的对象都取出来还给central cache,这样设计就是为了增加代码的可修改性
。
Central Cache释放内存
我们先来看一下Central Cache释放内存的逻辑
- 当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时–use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。
Central Cache每分给Thread Cache一个对象,use_count++,Thread Cache每还回来一个对象,use_count–,当use_count减到0时则表示所有对象都回到了span,因此我们还需要在Span的结构里面增加一个成员变量use_count
;
//Span管理一个跨度的大块内存
//管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID page_id = 0;// 大块内存起始页的页号
size_t n = 0;//页的数量
Span* _prev = nullptr;//双向链表结构
Span* _next = nullptr;
void* _freelist = nullptr; // 切好的小块内存的自由链表
size_t _usecount = 0;// 切好小块内存,被分配给thread cache的计数
};
当Thread Cache的某个桶自由链表长度太长,则会还回来一部分给Central Cache,但我们需要明白一件事情就是——Thread Cache还回来的这些对象并不一定是属于同一个Span的
,因为Central Cache中的每个哈希桶中有多个Span,所以我们除了需要计算出还回来的对象应该还给central cache的哪一个桶之外,还需要知道这些对象到底应该还给这个桶当中的哪一个span。
那么现在问题来了:我们如何通过一个对象的地址拿到对应Span的页号呢?
我们首先需要明白Thread Cache还回来的小块内存它们一定是由页内存切分出来的
解决了上面的问题之后,现在我们又有了一个新的问题,那我们如何通过一个Span的页号找到对应的Span对象呢?
我们通过C++中的一个容器unordered_map来建立页号与Span之间的映射
,由于这个映射关系在Page Cache进行span的合并时也需要用到,因此我们直接将其存放到Page Cache里面,同时添加一个函数接口,用于让central cache获取这里的映射关系。
//单例模式--饿汉
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
// 获取从对象到span的映射
Span* MapObjectToSpan(void* obj);
// 获取一个K页的span
Span* NewSpan(size_t k);
// 释放空闲span回到Pagecache,并合并相邻的span
void ReleaseSpanToPageCache(Span* span);
//互斥锁
std::mutex _pagemtx;
private:
PageCache()
{
}
//防拷贝
PageCache(const PageCache&) = delete;
//PageCache也采用哈希桶的方式,只不过跟上两层的映射方式不一样
//它这里一共有128个桶,[1,128]
//它是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
SpanList _spanLists[NPAGES];
//建立页号与Span的映射
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
static PageCache _sInst;
};
新增了这个成员变量之后,我们可以在Central Cache向Page Cache要一个K页的Span时,在Page Cache返回这个K页的Span给Central Cache之前,应该建立这K个页号与该Span之间的映射关系。
Span* PageCache::NewSpan(size_t k)
{
//断言一下,申请的页最少是一页,最多不会超过NPAGES
assert(k > 0 && k < NPAGES);
//先去PageCache对应的桶里面看看有没有span
if (!_spanLists[k].Empty())
{
//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
//有的话我们就直接进行头删
return _spanLists[k].PopFront();
}
//如果对应的桶没有span的话,我们就去后面找大一点的桶
//将大点的桶进行切分,切成一个k页的桶和一个n-k页的桶
for (int i = k + 1; i < NPAGES; i++)
{
//当前桶还有span,我们就从当前桶切分
if (!_spanLists[i].Empty())
{
Span* NSpan = _spanLists[i].PopFront();
Span* KSpan = new Span;
//从当前大一点的桶进行切分
//切成一个k页的span和一个n-k页的span
//k页的span返回CentralCache
//n-k页的span挂到n-k的桶中去
KSpan->page_id = NSpan->page_id;
KSpan->n = k;//k页
//页号+=k
//比如说我之前的页号是100 现在我切了两个页出去之后
//我们页应该从102开始
NSpan->page_id += k;
//页的数量应该用n-=k
NSpan->n -= k;
// 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时
// 进行的合并查找
_idSpanMap[NSpan->page_id] = NSpan;
_idSpanMap[NSpan->page_id + NSpan->n - 1] = NSpan;
//然后我们将n-k页的span头插到n-k号桶中去
_spanLists[NSpan->n].PushFront(NSpan);
//将PageCache分配给CentralCache的Span
//在idSpanMap中建立映射
for (int i = 0; i < KSpan->n; i++)
{
_idSpanMap[KSpan->page_id + i] = KSpan;
}
return KSpan;
}
}
// 走到这个位置就说明后面没有大页的span了
// 这时就去找堆要一个128页的span
Span* bigSpan = new Span;
void* ptr = (void*)SystemAlloc(NPAGES - 1);
//根据地址算出页号
//地址/一页的大小就是页号
bigSpan->page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
//页数就是NPAGES-1
bigSpan->n = NPAGES - 1;
//根据页就可以找到对应的桶
//我们将这个从堆里面申请的span头插到PageCache对应的桶中
_spanLists[bigSpan->n].PushFront(bigSpan);
//插入进去了之后,现在我们的PageCache中就有大的桶了
//因此我们还需要在调用一次自己
return NewSpan(k);
}
建立了映射之后,我们就可以调用刚刚在Page Cache里面新增的成员函数MapObjectToSpan
通过对象的地址找到对应的Span
// 获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
//根据自由链表的地址求出页号
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
std::unique_lock<std::mutex>lock(_pagemtx);
//然后去查找当前页号对应的span在不在idSpanMap里面
//在的话就直接返回对应的Span,否则报错
auto ret = _idSpanMap.find(id);
//找到了直接返回对应的Span即可
if (ret != _idSpanMap.end())
{
return ret->second;
}
//如果找不到,证明前面肯定有问题,所以我们需要断言一下
else
{
assert(false);
return nullptr;
}
}
当这些准备工作都做好之后,我们就可以来对Central Cache释放内存了。当Thread Cache还对象给Central Cache的过程中,如果Central Cache中某个Span的_useCount减到0时,说明这个Span分配出去的对象全部都还回来了,那么此时就可以将这个Span还给Page Cache
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
//先算出当前链表是在CentralCache哪个桶的
size_t index = SizeClass::Index(size);
//注意:threadcache换回来的List并不一定来自于同一个span
//因为我们之前申请的时候如果centralcache当前span没有空闲的freelist
//我们就会找pagecache去要,但是pagecache如果没有小的span,它会进行切分
_spanLists[index]._mtx.lock();
//这段还回来的自由链表以nullptr结束
while (start)
{
//我们先记录当前链表的下一个节点
void* next = NextObj(start);
//根据自由链表的地址找到对应的span
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
//然后将start头插到这个span中
NextObj(start) = span->_freelist;
span->_freelist = start;
//每还回来一个自由链表,对应span的usecount--
span->_usecount--;
//如果当前span中的usecount已经减为0
//证明之前分给ThreadCache的自由链表都已经还回来了
//现在我们可以将这个span再还给PageCache
//pagecache可以再尝试去做前后页的合并
if (span->_usecount == 0)
{
//既然我们想把这个span还给PageCache
//那么我们就需要在CentralCache中断开这个span与它前后span的链接
_spanLists[index].Erase(span);
span->_freelist = nullptr;
span->_prev = nullptr;
span->_next = nullptr;
//有可能还有其他人也要来还自由链表,因此我们可以在这里把桶锁先给解了
_spanLists[index]._mtx.unlock();
PageCache::GetInstance()->_pagemtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pagemtx.unlock();
_spanLists[index]._mtx.lock();
}
//再把前面记录的下一个节点的地址给start迭代往后走
start = next;
}
_spanLists[index]._mtx.unlock();
}
注意: 当我们把某个Span还给Page Cache时,我们需要先将这个Span从Central Cache对应的双链表中Erase掉(这里不是真的删除而是断开链接),然后再将该span的自由链表置空,因为page cache中的span是不需要切分成一个个的小对象的,以及该span的前后指针也都应该置空,因为之后要将其插入到page cache对应的双链表中。
Page Cache释放内存
- 如果Central Cache释放回一个Span,则依次寻找Span的前后page id的没有在使用的空闲Span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的Span,
减少内存碎片
。
其中将一个Span合并成一个更大的Span这里又分为向前合并与向后合并
,如果当前还回来的Span的起始页号是ID,该Span的页的数量是n。在向前合并时,我们需要判断第ID-1页的Span是否空闲,如果空闲则可以将其进行合并,并且合并之后还需要继续向前尝试进行合并,直到不能进行合并为止。在向后合并的时候,我们需要判断第ID+n页对应的Span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向后尝试进行合并,直到不能进行合并为止。
我们向前或者向后找到的Span有两种可能,一种是已经分给了Central Cache也就是正在被使用的Span,另一种就是在Page Cache中未被使用的Span,但是我们进行向前或者向后合并的时候,只能用Page Cache中未被使用的Span来进行合并。
所以我们这里还需要判断某个Span是否是空闲,也就是未被使用的,因此我们可以在Span的结构里面增加一个成员变量_isUse
,用于标记这个span是否正在被使用,当一个Span结构被创建时我们默认该Span是空闲的。
//Span管理一个跨度的大块内存
//管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID page_id = 0;// 大块内存起始页的页号
size_t n = 0;//页的数量
Span* _prev = nullptr;//双向链表结构
Span* _next = nullptr;
size_t _objSize = 0; // 切好的小对象的大小
void* _freelist = nullptr; // 切好的小块内存的自由链表
bool _isUse = false; // 判断当前span是否在被使用
size_t _usecount = 0;// 切好小块内存,被分配给thread cache的计数
};
Span在向前或向后合并为更大的Span时主要有以下三种情况:
- 在向前和向后合并时,无法在unordered_map中通过当前Span前后相邻页的页号找到对应的Span,此时我们不合并。
- 通过Span前后相邻页的页号找到了对应的Span,但是这个Span正在被其他线程所使用,不合并。
- 在向前和向后合并时,当前Span前后相邻页的页数+当前Span的页数超过了128也不进行合并,因为Page Cache无法管理超过128页的Span
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
// 对span前后的页,尝试进行合并,缓解内存碎片问题
//向前合并
while (1)
{
//先算出页号
PAGE_ID PrevId = span->page_id - 1;
auto ret = _idSpanMap.find(PrevId);
//如果当前_idSpanMap中没有前面的页号,不合并
if (ret == _idSpanMap.end())
{
break;
}
Span* PrevSpan = ret->second;
//前面相邻页的span在使用,不合并
if (PrevSpan->_isUse == true)
{
break;
}
//当前页的页数+span的页数超过了128,不合并
if (PrevSpan->n + span->n > NPAGES - 1)
{
break;
}
//走到这里我们进行合并
span->page_id = PrevSpan->page_id;
span->n += PrevSpan->n;
//合并了之后我们还需要将PrevSpan在对应的桶里面给erase掉
_spanLists[PrevSpan->n].Erase(PrevSpan);
delete PrevSpan;
}
//向后合并
while (1)
{
//先算出页号
PAGE_ID NextId = span->page_id + span->n;
auto ret = _idSpanMap.find(NextId);
//如果当前页号在idSpanMap中找不到,不合并
if (ret == _idSpanMap.end())
{
break;
}
Span* NextSpan = ret->second;
//后面相邻的页正在被使用,不合并
if (NextSpan->_isUse == true)
{
break;
}
//当前页的页数+span的页数超过了128,不合并
if (NextSpan->n + span->n > NPAGES - 1)
{
break;
}
//走到这里我们开始合并
//向后合并页号不需要改变
span->n += NextSpan->n;
//合并了之后我们还需要将NextSpan在对应的桶里面给erase掉
_spanLists[NextSpan->n].Erase(NextSpan);
delete NextSpan;
}
//向前向后合并了之后,此时我们的span变成了一块更大的span了
//因此我们需要插入到对应的桶里面去
_spanLists[span->n].PushFront(span);
//现在我是一块更大的未使用的span了,因此isUse要变成false
span->_isUse = false;
//将当前span的起始和末尾页号页放到idSpanMap中
//这样可以方便别人来合并我
_idSpanMap[span->page_id] = span;
_idSpanMap[span->page_id + span->n - 1] = span;
}
注意: 在合并了前后相邻页的Span之后,我们还需要把这些Span给Erase掉,再将这个被合并的Span给delete掉。在合并结束后,除了将合并后的Span挂到Page Cache对应哈希桶的双链表当中,还需要建立该Span与其页号之间的映射关系,方便我们后面可以合并出更大的Span。
大于256KB的大块内存申请问题
申请内存
申请内存的时候主要分为以下几种情况:
- <=256KB -->三层缓存
- >256KB
- 32 *8K<size<=128 *8K -->Page Cache
- size>128*8K -->找系统堆直接申请
但是需要注意的是,我们上面写的代码只考虑了<=256KB的情况,因此下面我们还需要来写一下第二种情况也就是申请>256KB内存的情况怎么处理
首先我们需要来改一下SizeClass类里面的内存对齐函数RoundUp
,因为我们对传入字节数大于256KB的情况直接进行了断言处理,所以我们需要对RoundUp函数这里稍作修改。
//内存对齐
static size_t RoundUp(size_t size)
{
if (size <= 128)
{
return _RoundUp(size, 8);
}
else if (size <= 1024)
{
return _RoundUp(size, 16);
}
else if (size <= 8 * 1024)
{
return _RoundUp(size, 128);
}
else if (size <= 64 * 1024)
{
return _RoundUp(size, 1024);
}
else if (size <= 256 * 1024)
{
return _RoundUp(size, 8 * 1024);
}
//大于256K的内存我们也以8K对齐
else
{
return _RoundUp(size, 1 << PAGE_SHIFT);
}
}
下面我们就可以来修改一下内存申请函数
的逻辑了,即当申请的内存大小超过了256KB,我们直接去找Page Cache要,申请的内存小于等于256KB我们继续走三层缓存逻辑
static void* ConcurrentAlloc(size_t size)
{
//大于256KB的内存我们直接找PageCache去要
if (size>MAX_BYTES)
{
//先算出对齐后的大小
size_t align = SizeClass::RoundUp(size);
//将内存转化为页
size_t Kpage = align >> PAGE_SHIFT;
//然后去找我们的PageCache去申请一块k页的内存
//因为这里NewSpan会访问桶,因此我们需要进行加锁
PageCache::GetInstance()->_pagemtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(Kpage);
span->_objSize = size;
PageCache::GetInstance()->_pagemtx.unlock();
//拿到了对应的span之后我们要算出它的地址
void* ptr = (void*)(span->page_id << PAGE_SHIFT);
//然后返回给threadcache即可
return ptr;
}
//否则继续使用我们内存池原来的逻辑
else
{
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
//cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
return pTLSThreadCache->Allocate(size);
}
}
其次因为这里超过256KB的内存直接去找Page Cache要,所以我们Page Cache里面获取一个K页的Span这个成员函数我们也需要稍作改动,因为即使是申请超过256KB的内存也分为两种情况,如果是32页-128页的内存,我可以直接给你,但是如果超过了128页的Span,我Page Cache就需要直接找堆去要然后再把这块内存给你。
Span* PageCache::NewSpan(size_t k)
{
//断言一下,申请的页最少是一页
assert(k > 0);
//如果要申请的页超过了128
//我们直接去找堆要
if (k > NPAGES - 1)
{
void* ptr = SystemAlloc(k);
Span* span = new Span;
//算出这个span的页号
span->page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
//页的数量
span->n = k;
//为了后面方便还给PageCache
//我们将从堆申请到的页号也在idSpanMap里面建立映射
_idSpanMap[span->page_id] = span;
return span;
}
//小于等于128页的内存就继续调用以前的逻辑
else
{
//先去PageCache对应的桶里面看看有没有span
if (!_spanLists[k].Empty())
{
//有的话我们就直接进行头删
Span* KSpan = _spanLists[k].PopFront();
// 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
for (int i = 0; i < KSpan->n; i++)
{
_idSpanMap[KSpan->page_id + i] = KSpan;
}
return KSpan;
}
//如果对应的桶没有span的话,我们就去后面找大一点的桶
//将大点的桶进行切分,切成一个k页的桶和一个n-k页的桶
for (int i = k + 1; i < NPAGES; i++)
{
//当前桶还有span,我们就从当前桶切分
if (!_spanLists[i].Empty())
{
Span* NSpan = _spanLists[i].PopFront();
Span* KSpan = new Span;
//从当前大一点的桶进行切分
//切成一个k页的span和一个n-k页的span
//k页的span返回CentralCache
//n-k页的span挂到n-k的桶中去
KSpan->page_id = NSpan->page_id;
KSpan->n = k;//k页
//页号+=k
//比如说我之前的页号是100 现在我切了两个页出去之后
//我们页应该从102开始
NSpan->page_id += k;
//页的数量应该用n-=k
NSpan->n -= k;
// 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时
// 进行的合并查找
_idSpanMap[NSpan->page_id] = NSpan;
_idSpanMap[NSpan->page_id + NSpan->n - 1] = NSpan;
//然后我们将n-k页的span头插到n-k号桶中去
_spanLists[NSpan->n].PushFront(NSpan);
//将PageCache分配给CentralCache的Span
//在idSpanMap中建立映射
for (int i = 0; i < KSpan->n; i++)
{
_idSpanMap[KSpan->page_id + i] = KSpan;
}
return KSpan;
}
}
// 走到这个位置就说明后面没有大页的span了
// 这时就去找堆要一个128页的span
Span* bigSpan = new Span;
void* ptr = (void*)SystemAlloc(NPAGES - 1);
//根据地址算出页号
//地址/一页的大小就是页号
bigSpan->page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
//页数就是NPAGES-1
bigSpan->n = NPAGES - 1;
//根据页就可以找到对应的桶
//我们将这个从堆里面申请的span头插到PageCache对应的桶中
_spanLists[bigSpan->n].PushFront(bigSpan);
//插入进去了之后,现在我们的PageCache中就有大的桶了
//因此我们还需要在调用一次自己
return NewSpan(k);
}
}
释放内存
由于上面申请内存的逻辑有所变化,因此我们这里释放内存的逻辑也需要有所改变。
- <=256KB -->三层缓存
- >256KB
- 32 *8K<size<=128 *8K -->Page Cache
- size>128*8K -->直接还给堆
下面我们来看一下释放内存的函数代码
static void ConcurrentFree(void* ptr,size_t size)
{
//释放大于256KB的内存
//分为两种情况:
//1.释放的内存小于等于128页,因此还给PageCache即可
//2.释放的内存大于128页,因此需要还给堆
if (size > MAX_BYTES)
{
//根据对象的地址找到对应的span
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
//将这个span还给PageCache或者是还给堆
//这里有可能访问桶我们还需要进行加锁
PageCache::GetInstance()->_pagemtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pagemtx.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
因为我们上面Page Cache释放回收内存时只写了小于等于128页的代码,但是此时我们应该还要将大于128页内存的代码也写一下,即对于大于128页的Span,说明该Span是直接向堆申请的,我们需要将这块内存直接还给堆,然后将这个Span进行delete即可。
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
//释放超过128页的span我们直接还给堆
if (span->n > NPAGES - 1)
{
//算出当前span的地址
void* ptr = (void*)(span->page_id << PAGE_SHIFT);
//将这块内存直接还给堆
SystemFree(ptr);
delete span;
}
//释放小于等于128页的span我们继续调用以前的逻辑
else
{
// 对span前后的页,尝试进行合并,缓解内存碎片问题
//向前合并
while (1)
{
//先算出页号
PAGE_ID PrevId = span->page_id - 1;
auto ret = _idSpanMap.find(PrevId);
//如果当前_idSpanMap中没有前面的页号,不合并
if (ret == _idSpanMap.end())
{
break;
}
Span* PrevSpan = ret->second;
//前面相邻页的span在使用,不合并
if (PrevSpan->_isUse == true)
{
break;
}
//当前页的页数+span的页数超过了128,不合并
if (PrevSpan->n + span->n > NPAGES - 1)
{
break;
}
//走到这里我们进行合并
span->page_id = PrevSpan->page_id;
span->n += PrevSpan->n;
//合并了之后我们还需要将PrevSpan在对应的桶里面给erase掉
_spanLists[PrevSpan->n].Erase(PrevSpan);
delete PrevSpan;
}
//向后合并
while (1)
{
//先算出页号
PAGE_ID NextId = span->page_id + span->n;
auto ret = _idSpanMap.find(NextId);
//如果当前页号在idSpanMap中找不到,不合并
if (ret == _idSpanMap.end())
{
break;
}
Span* NextSpan = ret->second;
//后面相邻的页正在被使用,不合并
if (NextSpan->_isUse == true)
{
break;
}
//当前页的页数+span的页数超过了128,不合并
if (NextSpan->n + span->n > NPAGES - 1)
{
break;
}
//走到这里我们开始合并
//向后合并页号不需要改变
span->n += NextSpan->n;
//合并了之后我们还需要将NextSpan在对应的桶里面给erase掉
_spanLists[NextSpan->n].Erase(NextSpan);
delete NextSpan;
}
//向前向后合并了之后,此时我们的span变成了一块更大的span了
//因此我们需要插入到对应的桶里面去
_spanLists[span->n].PushFront(span);
//现在我是一块更大的未使用的span了,因此isUse要变成false
span->_isUse = false;
//将当前span的起始和末尾页号页放到idSpanMap中
//这样可以方便别人来合并我
_idSpanMap[span->page_id] = span;
_idSpanMap[span->page_id + span->n - 1] = span;
}
}
注意:直接向堆申请内存时我们调用的接口是VirtualAlloc
,直接向堆释放内存我们调用的接口是VirtualFree
,Linux下的brk和mmap对应的释放接口叫做sbrk和unmmap。我们分别将申请内存接口和释放内存接口封装成了SystemAlloc以及SystemFree
直接向堆申请内存的封装之后的接口:
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
直接向堆释放内存的封装之后的接口:
//把内存直接还给堆
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
VirtualFree(ptr, 0, MEM_RELEASE);
#else
// sbrk unmmap等
#endif
}
使用定长内存池配合脱离使用new
我们这里实现的高并发内存池它的原型是tcmalloc,它是要在高并发场景下去替代malloc的,因此tcmalloc在实现的时,其内部是不能调用malloc函数的
。之前在定长内存池的时候我们测试过,在并发场景下定长内存池申请释放内存是会比mallloc/free快一些的。但是我们的代码里面有一些地方是通过new去申请内存,而我们知道new的底层是调用了malloc的,为了脱离使用new,我们下面对于那些使用new的地方去用我们的定长内存池申请或者释放内存。
我们经过分析后发现Page Cache里面用new 和delete使用的最多的,因此为了脱离malloc与free,我们可以在Page Cache的类中定义一个_spanPool,用于Span对象的申请和释放。
//单例模式--饿汉
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
// 获取从对象到span的映射
Span* MapObjectToSpan(void* obj);
// 获取一个K页的span
Span* NewSpan(size_t k);
// 释放空闲span回到Pagecache,并合并相邻的span
void ReleaseSpanToPageCache(Span* span);
//互斥锁
std::mutex _pagemtx;
private:
PageCache()
{
}
//防拷贝
PageCache(const PageCache&) = delete;
//PageCache也采用哈希桶的方式,只不过跟上两层的映射方式不一样
//它这里一共有128个桶,[1,128]
//它是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
SpanList _spanLists[NPAGES];
//建立页号与Span的映射
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
ObjectPool<Span> _spanPool;//定长内存池
static PageCache _sInst;
};
因此在Page Cache的接口中,只要用到了new或者delete的地方,我们都将其换成定长内存池中的New和Delete函数
//调用定长内存池的new
//Span* span = new Span;
Span* span = _spanPool.New();
//调用定长内存池的Delete
//delete span;
_spanPool.Delete(span);
除了上面的地方有new或者delet之外,我们申请内存的函数ConcurrentAlloc
那里也用到了new,我们也可以使用定长内存池的New函数去替换
static void* ConcurrentAlloc(size_t size)
{
//大于256KB的内存我们直接找PageCache去要
if (size>MAX_BYTES)
{
//先算出对齐后的大小
size_t align = SizeClass::RoundUp(size);
//将内存转化为页
size_t Kpage = align >> PAGE_SHIFT;
//然后去找我们的PageCache去申请一块k页的内存
//因为这里NewSpan会访问桶,因此我们需要进行加锁
PageCache::GetInstance()->_pagemtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(Kpage);
span->_objSize = size;
PageCache::GetInstance()->_pagemtx.unlock();
//拿到了对应的span之后我们要算出它的地址
void* ptr = (void*)(span->page_id << PAGE_SHIFT);
//然后返回给threadcache即可
return ptr;
}
//否则继续使用我们内存池原来的逻辑
else
{
if (pTLSThreadCache == nullptr)
{
static ObjectPool<ThreadCache>tcPool;
//pTLSThreadCache = new ThreadCache;
//调用定长内存池的New
pTLSThreadCache = tcPool.New();
}
//cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
return pTLSThreadCache->Allocate(size);
}
}
释放对象时优化为不传对象大小
我们申请内存的时候,只要给你想要申请的内存大小就可以了,但是我们释放内存的时候不光要给该对象的指针还要给该对象的大小。但是我们想想我们平时申请内存的时候一般只要给申请的内存的大小即可,释放的时候只需要给该对象的指针就可以了。
那我们有没有办法在释放对象的时候只给一个对象的指针就可以了呢?
答案是有的,我们只需要在Span的结构里面增加一个成员变量 _objSize用来表示申请的内存块对象的大小
//Span管理一个跨度的大块内存
//管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID page_id = 0;// 大块内存起始页的页号
size_t n = 0;//页的数量
Span* _prev = nullptr;//双向链表结构
Span* _next = nullptr;
size_t _objSize = 0; // 切好的小对象的大小
void* _freelist = nullptr; // 切好的小块内存的自由链表
bool _isUse = false; // 判断当前span是否在被使用
size_t _usecount = 0;// 切好小块内存,被分配给thread cache的计数
};
因为我们所有的span都是从Page Cache中拿出来的,因此每当我们调用NewSpan获取到一个k页的span时
,就应该将这个Span的_objSize保存下来。
//走到这里证明当前桶里面没有未分配对象的span
//因此我们需要找PageCache去要
PageCache::GetInstance()->_pagemtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_isUse = true;
span->_objSize = size;
PageCache::GetInstance()->_pagemtx.unlock();
如此一来,我们释放内存的函数就不需要传这个对象的大小了
static void ConcurrentFree(void* ptr)
{
//根据对象的地址找到对应的span
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
size_t size = span->_objSize;
//释放大于256KB的内存
//分为两种情况:
//1.释放的内存小于等于128页,因此还给PageCache即可
//2.释放的内存大于128页,因此需要还给堆
if (size > MAX_BYTES)
{
//将这个span还给PageCache或者是还给堆
//这里有可能访问桶我们还需要进行加锁
PageCache::GetInstance()->_pagemtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pagemtx.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
性能瓶颈分析
我们高并发内存池的整体架子已经搭出来了,下面我们来使用我们自己写得高并发内存池在多线程环境下与malloc来对比测试一下
可以看到我们自己写的高并发内存池在多线程环境下申请和释放内存与malloc和free还是有点差距的。
但是具体我们自己写得内存池慢在哪里我们是不知道的,但是我们可以借助VS上面的一个工具来进行分析。
在debug模式下点击调试里面的
性能和诊断
点击开始然后点击检测,再点击下一步即可
然后选择要分析的项目,再点下一步即可。
最后我们只要等待它运行然后出结果就可以了
通过分析结果我们可以看到,耗时的主要是Deallocate和MapObjectToSpan这两个函数,其中这两个函数就花费了超过一半的时间
然后我们在点Deallocate看看这个函数里面是什么花费了这么多时间,我们发现Deallocate函数里面主要花费时间的是ListTooLong
我们继续点进去看发现ListTooLong函数里面花费时间最长的是ReleaseListToSpan函数
我们继续点进去发现在ReleaseListToSpans函数中,调用MapObjectToSpan函数时消耗的时间是最多的。
最后我们我们在点进去发现MapObjectToSpan函数里面消耗时间最多的是因为锁
经过一番波折之后,我们终于破案了原来导致性能瓶颈的原因是因为锁
,那么我们如何解决这个瓶颈呢?
tcmalloc针对这一点使用了基数树进行优化,使得我们在读取这个映射关系时可以不用加锁。
针对性能瓶颈使用基数树进行代码的优化
基数树实际上就是一个分层的哈希表,根据所分层数不同可分为单层基数树、二层基数树、三层基数树等。
单层基数树
单层基数树它采用的是直接定址法, 每一个页号对应span的地址就是在存储数组中在以该页号为下标的位置的值。
单层基数树的代码
// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;
void** array_;
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap1() {
//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
//先计算出大小
size_t size = sizeof(void*) << BITS;
//然后将其对其一下
size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
//将其转换成页数,调用SystemAlloc开空间
array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
// Return the current value for KEY. Returns NULL if not yet set,
// or if k is out of range.
//查找对应的Span
void* get(Number k) const {
if ((k >> BITS) > 0) {
return NULL;
}
return array_[k];
}
// REQUIRES "k" is in range "[0,2^BITS-1]".
// REQUIRES "k" has been ensured before.
//
// Sets the value 'v' for key 'k'.
//建立映射
void set(Number k, void* v) {
array_[k] = v;
}
};
代码中的非类型模板参数BITS表示存储页号最多需要比特位的个数
。在32位下我们传入的是32-PAGE_SHIFT,在64位下传入的是64-PAGE_SHIFT。而其中的LENGTH成员代表的就是页号的数目,即2^BITS.
比如说32位下,我们这里以8K为一页,此时页的数目就是232/213=2^19 因此存储页号最多需要19个比特位,此时传入非类型模板参数的值就是32-13=19.由于32位平台下指针的大小是4字节,因此该数组的大小就是2^19 *4=2^21=2M ,内存消耗不大,是可行的。但如果是在64位平台下,此时该数组的大小是2^51 *8=2^53 =2^24G,这显然是不可行的,因此对于64位的平台,我们需要使用三层基数树。
二层基数树
- 这里还是以32位平台下,一页的大小为8K为例来说明,此时存储页号最多需要19个比特位。而二层基数树实际上就是把这19个比特位分为两次进行映射。
- 比如用前5个比特位在基数树的第一层进行映射,映射后得到对应的第二层,然后用剩下的比特位在基数树的第二层进行映射,映射后最终得到该页号对应的Span指针。
// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
static const int ROOT_BITS = 5;
static const int ROOT_LENGTH = 1 << ROOT_BITS;
static const int LEAF_BITS = BITS - ROOT_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
void* (*allocator_)(size_t); // Memory allocator
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap2() {
//allocator_ = allocator;
memset(root_, 0, sizeof(root_));
PreallocateMoreMemory();
}
void* get(Number k) const {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 || root_[i1] == NULL) {
return NULL;
}
return root_[i1]->values[i2];
}
void set(Number k, void* v) {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
ASSERT(i1 < ROOT_LENGTH);
root_[i1]->values[i2] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> LEAF_BITS;
// Check for overflow
if (i1 >= ROOT_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_[i1] == NULL) {
//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
//if (leaf == NULL) return false;
//不要忘了我们还有一个定长内存池,申请空间更快
static ObjectPool<Leaf>leafPool;
Leaf* leaf = (Leaf*)leafPool.New();
memset(leaf, 0, sizeof(*leaf));
root_[i1] = leaf;
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
// Allocate enough to keep track of all possible pages
Ensure(0, 1 << BITS);
}
};
注意: 二层基数树和单层基数树它俩占用的空间其实是一样的。在二层基数树中第一层数组占用的空间是2 ^5 *4=2 ^7 Byte空间,第二层数组最多占用2 ^5 *2^14 *4=2M,与单层基数树占用的空间一样。但是二层基数树相比一层基数树的好处就是
, 一层基数树必须一开始就把2M的数组开辟出来,而二层基数树一开始时只需将第一层的数组开辟出来,当需要进行某一页号映射时再开辟对应的第二层的数组就行了。
在二层基数树中有一个Ensure函数,当需要建立某一页号与其span之间的映射关系时,我们需要先调用该Ensure函数确保用于映射该页号的空间是开辟了的,如果没有开辟则会立即开辟。在32位平台下,就算将二层基数树第二层的数组全部开辟出来也只是消耗了2M的空间,内存消耗也不算太多,因此我们可以在构造二层基数树时就把第二层的数组全部开辟出来。
三层基数树
上面一层基数树和二层基数树都只适用于32位平台,而对于64位的平台就需要采用三层基数树了。三层基数树与二层基数树类似,三层基数树实际上就是把存储页号的若干比特位分为三次进行映射。
当我们要建立某一页号的映射关系时,需要先确保存储该页映射的数组空间是开辟好了的,也就是调用代码中的Ensure函数,而没有建立映射的页号就可以不用开辟其对应的数组空间,此时就能在一定程度上节省内存空间。
// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:
// How many bits should we consume at each interior level
static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
// How many bits should we consume at leaf level
static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Interior node
struct Node {
Node* ptrs[INTERIOR_LENGTH];
};
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Node* root_; // Root of radix tree
void* (*allocator_)(size_t); // Memory allocator
Node* NewNode() {
Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
if (result != NULL) {
memset(result, 0, sizeof(*result));
}
return result;
}
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
allocator_ = allocator;
root_ = NewNode();
}
void* get(Number k) const {
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 ||
root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
return NULL;
}
return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
}
void set(Number k, void* v) {
ASSERT(k >> BITS == 0);
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
// Check for overflow
if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_->ptrs[i1] == NULL) {
Node* n = NewNode();
if (n == NULL) return false;
root_->ptrs[i1] = n;
}
// Make leaf node if necessary
if (root_->ptrs[i1]->ptrs[i2] == NULL) {
Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
if (leaf == NULL) return false;
memset(leaf, 0, sizeof(*leaf));
root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
}
};
了解了基数树之后,因为是32位平台下,使用使用单层或者二层的基数树都是可以的,我们这里采用单层基数树进行优化,此时我们将PageCache类当中的unordered_map用基数树来替换
//单例模式--饿汉
class PageCache
{
public:
//...
private:
//建立页号与Span的映射
//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
//我们这里采用基数树,速度会更快
TCMalloc_PageMap1<32 - PAGE_SHIFT>_idSpanMap;
};
替换成基数树之后,对于Page Cache里面需要进行建立页号与span的映射时,我们就需要调用基数树当中的set函数。需要读取某一页号对应的span时,就调用基数树当中的get函数。
//我们将从堆申请到的页号也在idSpanMap里面建立映射
//_idSpanMap[span->page_id] = span;
_idSpanMap.set(span->page_id, span);
//auto ret = _idSpanMap.find(PrevId);
auto ret = (Span*)_idSpanMap.get(PrevId);
同时使用了基数树优化之后,我们Page Cache中用于读取映射关系的MapObjectToSpan函数内部就不需要加锁了。
// 获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
//根据自由链表的地址求出页号
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
//std::unique_lock<std::mutex>lock(_pagemtx);
然后去查找当前页号对应的span在不在idSpanMap里面
在的话就直接返回对应的Span,否则报错
//auto ret = _idSpanMap.find(id);
找到了直接返回对应的Span即可
//if (ret != _idSpanMap.end())
//{
// return ret->second;
//}
如果找不到,证明前面肯定有问题,所以我们需要断言一下
//else
//{
// assert(false);
// return nullptr;
//}
auto ret = (Span*)_idSpanMap.get(id);
//找到了就返回
assert(ret!=nullptr);
return ret;
}
使用基数树优化好了代码之后,下面我们再在 多线程下与malloc/free进行对比测试
可以看到我们现在自己写的高并发内存池申请和释放内存时就比malloc与free快很多了。
为什么使用基数树不需要加锁?
之前我们采用的是unordered_map或者map来获取映射关系,我们知道C++的STL容器是不保证线程安全的,其中unordered_map它底层是哈希表map底层是红黑树,它们在插入或者删除的时候其底层的结构都有可能会发生变化。因此为了避免出现数据不一致的问题,就不能让插入操作和读取操作同时进行,所以我们需要进行加锁来保证线程安全。
那使用基数树为什么不需要加锁呢?
- 我们在Page Cache中只会在这两个函数中建立id与Span的映射,也就是写
- 基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定的位置进行读取的。即使我们去写也不会动底层结构。
- 读写是分离的,线程1对一个位置读写的时候,线程2不可能对位置进行读写,比如说我线程1要获取一个20页的Span,那我线程2就不可能用这个20页的Span来合成一个更大的Span,因为当前Span正在被别人所使用。
扩展学习以及当前项目实现的不足
实际中我们测试了,当前实现的并发内存池比malloc/free是更加高效的,那么我们能否替换到系统调用malloc呢?实际上是可以的。
- 不同平台替换方式不同。 基于unix的系统上的glibc,使用了weak alias的方式替换。具体来说是因为这些入口函数都被定义成了weak symbols,再加上gcc支持 alias attribute,所以替换就变成了这种通用形式:
void* malloc(size_t size) THROW attribute__ ((alias (tc_malloc)))
因此所有malloc的调用都跳转到了tc_malloc的实现 。
边栏推荐
- Advanced API (UDP connection & map set & collection set)
- 机器学习 | 简单但是能提升模型效果的特征标准化方法(RobustScaler、MinMaxScaler、StandardScaler 比较和解析)
- 多个全局异常处理类,怎么规定执行顺序
- Architecture notes
- 熊市里的大机构压力倍增,灰度、Tether、微策略等巨鲸会不会成为'巨雷'?
- Software testing learning - day 3
- Application scenarios of Catalan number
- Software testing assignment - day 1
- [untitled]
- JMeter test result output
猜你喜欢
Golang operation redis: write and read kV data
VMware virtual machine C disk expansion
Software testing learning - day one
如何迁移或复制VMware虚拟机系统
vmware虚拟机C盘扩容
On the practice of performance optimization and stability guarantee
“百度杯”CTF比赛 2017 二月场,Web:爆破-1
Software testing assignment - the next day
Jenkins
10000小時定律不會讓你成為編程大師,但至少是個好的起點
随机推荐
How to specify the execution order for multiple global exception handling classes
Unit test notes
Advanced API (multithreading)
Architecture notes
The education of a value investor
New knowledge! The virtual machine network card causes your DNS resolution to slow down
[classes and objects] explain classes and objects in simple terms
【类和对象】深入浅出类和对象
691. 立方体IV
EasyExcel
Golang operation redis: write and read kV data
机械观和系统观的科学思维方式各有什么特点和作用
[day15] introduce the features, advantages and disadvantages of promise, and how to implement it internally. Implement promise by hand
Advanced API (UDP connection & map set & collection set)
Software testing learning - day one
Setting up the development environment of dataworks custom function
What are the characteristics and functions of the scientific thinking mode of mechanical view and system view
[attribute comparison] defer and async
Inno setup production and installation package
Jenkins