当前位置:网站首页>postgresql之page分配管理(二)
postgresql之page分配管理(二)
2022-08-01 12:46:00 【happytree001】
一、淘汰page
1.1 使用时钟轮转算法获取淘汰buffer
static inline uint32
ClockSweepTick(void)
{
uint32 victim;
/* * Atomically move hand ahead one buffer - if there's several processes * doing this, this can lead to buffers being returned slightly out of * apparent order. */
victim =
pg_atomic_fetch_add_u32(&StrategyControl->nextVictimBuffer, 1);
if (victim >= NBuffers)
{
uint32 originalVictim = victim;
/* always wrap what we look up in BufferDescriptors */
victim = victim % NBuffers;
/* * If we're the one that just caused a wraparound, force * completePasses to be incremented while holding the spinlock. We * need the spinlock so StrategySyncStart() can return a consistent * value consisting of nextVictimBuffer and completePasses. */
if (victim == 0)
{
uint32 expected;
uint32 wrapped;
bool success = false;
expected = originalVictim + 1;
while (!success)
{
/* * Acquire the spinlock while increasing completePasses. That * allows other readers to read nextVictimBuffer and * completePasses in a consistent manner which is required for * StrategySyncStart(). In theory delaying the increment * could lead to an overflow of nextVictimBuffers, but that's * highly unlikely and wouldn't be particularly harmful. */
SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
wrapped = expected % NBuffers;
success = pg_atomic_compare_exchange_u32(&StrategyControl->nextVictimBuffer,
&expected, wrapped);
if (success)
StrategyControl->completePasses++;
SpinLockRelease(&StrategyControl->buffer_strategy_lock);
}
}
}
return victim;
}
通过nextVictimBuffer
记录遍历到的位置,当遍历完整个buffer后,将继续从头开始,nextVictimBuffer
被重置,并且completePasses
增加1
当freelist中没有可用的buffer时,遍历一遍整个buffer。
引用计数为0的,并且使用计数也为0的,即为淘汰的buffer;
引用计数为0的,但是使用计数不为0,将使用计数减1, 并且将trycounter重置,这样就能再次遍历到这个buffer,总能获取到一个淘汰的buffer。
BufferDesc *
StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state)
{
...
/* Nothing on the freelist, so run the "clock sweep" algorithm */
trycounter = NBuffers;
for (;;)
{
buf = GetBufferDescriptor(ClockSweepTick());
/* * If the buffer is pinned or has a nonzero usage_count, we cannot use * it; decrement the usage_count (unless pinned) and keep scanning. */
local_buf_state = LockBufHdr(buf);
if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0)
{
if (BUF_STATE_GET_USAGECOUNT(local_buf_state) != 0)
{
local_buf_state -= BUF_USAGECOUNT_ONE;
trycounter = NBuffers;
}
else
{
/* Found a usable buffer */
if (strategy != NULL)
AddBufferToRing(strategy, buf);
*buf_state = local_buf_state;
return buf;
}
}
else if (--trycounter == 0)
{
/* * We've scanned all the buffers without making any state changes, * so all the buffers are pinned (or were when we looked at them). * We could hope that someone will free one eventually, but it's * probably better to fail than to risk getting stuck in an * infinite loop. */
UnlockBufHdr(buf, local_buf_state);
elog(ERROR, "no unpinned buffers available");
}
UnlockBufHdr(buf, local_buf_state);
}
...
}
二、释放buffer
比如如下调用链
src/backend/storage/buffer/bufmgr.c
DropRelFileNodeBuffers()
src/backend/storage/buffer/bufmgr.c
InvalidateBuffer(BufferDesc *buf)
src/backend/storage/buffer/freelist.c
StrategyFreeBuffer(buf);
将释放的buffer又放回了freelist中
void
StrategyFreeBuffer(BufferDesc *buf)
{
SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
/* * It is possible that we are told to put something in the freelist that * is already in it; don't screw up the list if so. */
if (buf->freeNext == FREENEXT_NOT_IN_LIST)
{
buf->freeNext = StrategyControl->firstFreeBuffer;
if (buf->freeNext < 0)
StrategyControl->lastFreeBuffer = buf->buf_id;
StrategyControl->firstFreeBuffer = buf->buf_id;
}
SpinLockRelease(&StrategyControl->buffer_strategy_lock);
}
三、hash扩容
void *
hash_search_with_hash_value(HTAB *hashp,
const void *keyPtr,
uint32 hashvalue,
HASHACTION action,
bool *foundPtr)
{
HASHHDR *hctl = hashp->hctl;
int freelist_idx = FREELIST_IDX(hctl, hashvalue);
...
/* * If inserting, check if it is time to split a bucket. * * NOTE: failure to expand table is not a fatal error, it just means we * have to run at higher fill factor than we wanted. However, if we're * using the palloc allocator then it will throw error anyway on * out-of-memory, so we must do this before modifying the table. */
if (action == HASH_ENTER || action == HASH_ENTER_NULL)
{
/* * Can't split if running in partitioned mode, nor if frozen, nor if * table is the subject of any active hash_seq_search scans. */
if (hctl->freeList[0].nentries > (long) hctl->max_bucket &&
!IS_PARTITIONED(hctl) && !hashp->frozen &&
!has_seq_scans(hashp))
(void) expand_table(hashp);
}
...
只有在非分区模式,没有被冻结,没有序列扫描时才能扩容。
pg用的hash算法也称线性hash,当扩容后,只有一个桶中的节点需要重新hash,在桶增加/减少时,需要重新hash的节点少,但数据分布不均匀
/* * Expand the table by adding one more hash bucket. */
static bool
expand_table(HTAB *hashp)
{
HASHHDR *hctl = hashp->hctl;
HASHSEGMENT old_seg,
new_seg;
long old_bucket,
new_bucket;
long new_segnum,
new_segndx;
long old_segnum,
old_segndx;
HASHBUCKET *oldlink,
*newlink;
HASHBUCKET currElement,
nextElement;
Assert(!IS_PARTITIONED(hctl));
#ifdef HASH_STATISTICS
hash_expansions++;
#endif
new_bucket = hctl->max_bucket + 1;
new_segnum = new_bucket >> hashp->sshift;
new_segndx = MOD(new_bucket, hashp->ssize);
if (new_segnum >= hctl->nsegs)
{
/* Allocate new segment if necessary -- could fail if dir full */
if (new_segnum >= hctl->dsize)
if (!dir_realloc(hashp))
return false;
if (!(hashp->dir[new_segnum] = seg_alloc(hashp)))
return false;
hctl->nsegs++;
}
/* OK, we created a new bucket */
hctl->max_bucket++;
/* * *Before* changing masks, find old bucket corresponding to same hash * values; values in that bucket may need to be relocated to new bucket. * Note that new_bucket is certainly larger than low_mask at this point, * so we can skip the first step of the regular hash mask calc. */
old_bucket = (new_bucket & hctl->low_mask);
/* * If we crossed a power of 2, readjust masks. */
if ((uint32) new_bucket > hctl->high_mask)
{
hctl->low_mask = hctl->high_mask;
hctl->high_mask = (uint32) new_bucket | hctl->low_mask;
}
/* * Relocate records to the new bucket. NOTE: because of the way the hash * masking is done in calc_bucket, only one old bucket can need to be * split at this point. With a different way of reducing the hash value, * that might not be true! */
old_segnum = old_bucket >> hashp->sshift;
old_segndx = MOD(old_bucket, hashp->ssize);
old_seg = hashp->dir[old_segnum];
new_seg = hashp->dir[new_segnum];
oldlink = &old_seg[old_segndx];
newlink = &new_seg[new_segndx];
for (currElement = *oldlink;
currElement != NULL;
currElement = nextElement)
{
nextElement = currElement->link;
if ((long) calc_bucket(hctl, currElement->hashvalue) == old_bucket)
{
*oldlink = currElement;
oldlink = &currElement->link;
}
else
{
*newlink = currElement;
newlink = &currElement->link;
}
}
/* don't forget to terminate the rebuilt hash chains... */
*oldlink = NULL;
*newlink = NULL;
return true;
}
static bool
dir_realloc(HTAB *hashp)
{
HASHSEGMENT *p;
HASHSEGMENT *old_p;
long new_dsize;
long old_dirsize;
long new_dirsize;
if (hashp->hctl->max_dsize != NO_MAX_DSIZE)
return false;
/* Reallocate directory */
new_dsize = hashp->hctl->dsize << 1;
old_dirsize = hashp->hctl->dsize * sizeof(HASHSEGMENT);
new_dirsize = new_dsize * sizeof(HASHSEGMENT);
old_p = hashp->dir;
CurrentDynaHashCxt = hashp->hcxt;
p = (HASHSEGMENT *) hashp->alloc((Size) new_dirsize);
if (p != NULL)
{
memcpy(p, old_p, old_dirsize);
MemSet(((char *) p) + old_dirsize, 0, new_dirsize - old_dirsize);
hashp->dir = p;
hashp->hctl->dsize = new_dsize;
/* XXX assume the allocator is palloc, so we know how to free */
Assert(hashp->alloc == DynaHashAlloc);
pfree(old_p);
return true;
}
return false;
}
这里的buffer不满足,不能被扩容
边栏推荐
- Favorites|Mechanical Engineer Interview Frequently Asked Questions
- Js手写函数之new的模拟实现
- 8. SAP ABAP OData 服务如何支持创建(Create)操作
- AI目标分割能力,无需绿幕即可实现快速视频抠图
- formatdatetime function mysql (date sub function)
- Do wildcard SSL certificates not support multiple domains?
- 【StoneDB Class】Introduction Lesson 2: Analysis of the Overall Architecture of StoneDB
- Beyond Compare 4 试用期到期
- 大中型网站列表页翻页过多怎么优化?
- .NET性能优化-使用SourceGenerator-Logger记录日志
猜你喜欢
数字孪生北京故宫,元宇宙推进旅游业进程
50W+小程序开发者背后的数据库降本增效实践
STM32 CAN过滤器配置详解
多线程案例——阻塞式队列
Dameng replaces the officially authorized dm.key
SAP ABAP OData 服务如何支持创建(Create)操作试读版
Qt get all files in a folder
The CAN communication standard frame and extended frame is introduced
[5 days countdown] to explore the secret behind the great quality promotion, gift waiting for you to take of $one thousand
让程序员早点下班的效率工具
随机推荐
MVVM响应式
数字孪生北京故宫,元宇宙推进旅游业进程
华盛顿大学、Allen AI 等联合 | RealTime QA: What's the Answer Right Now?(实时 QA:现在的答案是什么?)
观察者模式
将同级数据处理成树形数据
浏览器存储
什么是一致性哈希?可以应用在哪些场景?
一文带你彻底厘清 Kubernetes 中的证书工作机制
【面试高频题】难度 1.5/5,二分经典运用题
线上问题排查常用命令,总结太全了,建议收藏!!
通讯录(静态版)(C语言)(VS)
Audio and Video Technology Development Weekly | 256
如何设计一个分布式 ID 发号器?
JMP Pro 16.0软件安装包下载及安装教程
阿里云官方 Redis 开发规范
如何使用 Authing 单点登录,集成 Discourse 论坛?
多线程案例——阻塞式队列
leetcode:1201. 丑数 III【二分 + 数学 + 容斥原理】
音视频技术开发周刊 | 256
Pytest e-commerce project combat (below)