当前位置:网站首页>es DELETE index 源码分析
es DELETE index 源码分析
2022-08-02 19:01:00 【水的精神】
这里有前置要求:
需要已经编译通过过es源码!es源码能够自己本地成功运行。
https://blog.csdn.net/weixin_30166297/article/details/116751609
明确几个点
- 本篇文章,用类名 + . + function() 来表示某个类的某个方法。
- 本篇文章的源码调试,用的是IDEA开发工具,快捷键是默认的!
- 本篇文章说的点进去是指查看方法调用,是使用 ctrl + 鼠标左键点击 需要查看的方法,不再赘述。
列举重要的类(解析这些重要的类发挥的作用):
- MetaStateService 用来做MetaData状态变更以后的落盘操作,包括从磁盘上将集群信息读回集群。
- Node类(这个类不好找org.elasticsearch.node)
- MetaData类 集群的信息
几个方向
如何定义API,需要理清楚es的调用关系。要知道如何从一个 DETELE index 然后调用到处理逻辑上去
理清楚处理类,es是如何删除一个索引的。
理清楚es的代码结构,这些类都在哪里放着。
需要知道es的IOC容器并不是用spring来管理的,要明白在es中是如何注册的。
如果是新加功能,如何写es中的测试类 ESIntegTestCase类是es的用于集成测试的类,它为我们封装了一些常用的方法,比如启动集群,创建索引,等等。
- https://doc.codingdict.com/elasticsearch/504/
- https://www.twle.cn/l/yufei/elasticsearch/es-basic-testing.html
服务端
TransportDeleteIndexAction ,该类属于服务层的操作类。
TransportDeleteIndexAction 继承了 TransportMasterNodeAction,TransportMasterNodeAction是master节点来调度指定操作的类, 完成master节点执行和重试功能。
注意这里暂时省略了API层的分析,直接来到服务层看代码,核心代码在这里!
另外再说一点:这里并不是简单的API调用服务层。ES里边的处理逻辑是比较复杂。下边要讲的删除逻辑是真正执行删除的操作。但是API并没有直接调用它,删除流程是这样,通过es集群状态信息的变化,由这个变化的事件触发了调用删除逻辑。这样做的原因是:es是天然的分布式应用。状态信息的改变,首先发生在master节点上,然后master在将状态分发到数据节点上,在状态信息更新的时候,就捕捉到了这个变化的事件。然后就能够触发删除逻辑了。
执行删除操作的入口
TransportDeleteIndexAction.masterOperation()
源码如下:
TransportDeleteIndexAction.masterOperation()
@Override
protected void masterOperation(final DeleteIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
// 根据请求提取出来索引。
final Set<Index> concreteIndices = new HashSet<>(Arrays.asList(indexNameExpressionResolver.concreteIndices(state, request)));
if (concreteIndices.isEmpty()) {
listener.onResponse(new AcknowledgedResponse(true));
return;
}
// async delete freezing indices stats. 异步发送请求删除。遍历删除冻结索引的列表,构造一个 BulkRequest,每个索引提供一个DeleteRequest对象。
DaoUtil.delete(transportService, concreteIndices.toArray(new Index[concreteIndices.size()]));
// 构造删除集群状态信息中对应的索引信息。设置超时时间
DeleteIndexClusterStateUpdateRequest deleteRequest = new DeleteIndexClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices.toArray(new Index[concreteIndices.size()]));
// 在里边提交一个修改集群状态的任务。 todo 这里应该再看一下,它是怎么修改集群状态信息的。
deleteIndexService.deleteIndices(deleteRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
}
@Override
public void onFailure(Exception t) {
logger.debug(() -> new ParameterizedMessage("failed to delete indices [{}]", concreteIndices), t);
listener.onFailure(t);
}
});
}
该方法中有两步:
- 构建删除索引的bulk,异步提交任务。删除冻结的索引。
- 更新集群状态,将删除的索引信息从集群状态中移除,将更新后的集群状态信息,分发到其它的节点上。并根据修改后的集群状态信息,做数据物理移除操作。这一步是非常关键的,从表面上看不出来的数据是在这里移除的,从代码注释和命名上只能看出来它提交了一个修改集群状态的任务。真正的处理删除数据的逻辑,就是通过这个集群状态信息的改变,触发的。也就是说这个集群状态信息变更的操作任务,触发了数据删除的动作。
构建删除冻结索引的bulk请求
进入到TransportDeleteIndexAction.masterOperation(),然后可以看到 DaoUtil.delete(transportService, concreteIndices.toArray(new Index[concreteIndices.size()]));
这里进去以后只能看到,它构建了一个请求,并没有做执行操作。源码如下:
public static void delete(TransportService transportService, Index[] indices){
try{
BulkRequest bulkRequest = new BulkRequest();
for(Index index : indices){
bulkRequest.add(new DeleteRequest(Constant.INDEX, Constant.TYPE, DaoUtil.getId(index.getName())));
}
bulk(transportService, bulkRequest);
}catch (Exception e){
logger.error("", e);
}
}
物理删除数据操作
数据节点处理删除请求
做数据remove的操作,应该在数据节点上执行。
todo 可以去看一看 数据节点是如何接收请求的。
删除索引的位置在:IndicesService.removeIndex()
删除逻辑主要包含:
- 在es中最开始是执行closeIndex(关索引)
- closeIndex调用去closeShard(关分片)
- 将索引对应的数据进行remove
IndicesService.removeIndex()源码如下:
@Override
public void removeIndex(final Index index, final IndexRemovalReason reason, final String extraInfo) {
final String indexName = index.getName();
try {
final IndexService indexService;
final IndexEventListener listener;
synchronized (this) {
if (hasIndex(index) == false) {
return;
}
logger.debug("[{}] closing ... (reason [{}])", indexName, reason);
Map<String, IndexService> newIndices = new HashMap<>(indices);
indexService = newIndices.remove(index.getUUID());
assert indexService != null : "IndexService is null for index: " + index;
indices = unmodifiableMap(newIndices);
listener = indexService.getIndexEventListener();
}
listener.beforeIndexRemoved(indexService, reason);
logger.debug("{} closing index service (reason [{}][{}])", index, reason, extraInfo);
// 第一步的处理在这里,关索引。可以继续跟进这个方法,它又进入第二步,关分片,其中包括了将数据刷新到段中,将段中的数据落到磁盘上。并且还将分片对应的文件目录,直接删除了!这里非常重要。
indexService.close(extraInfo, reason == IndexRemovalReason.DELETED);
logger.debug("{} closed... (reason [{}][{}])", index, reason, extraInfo);
final IndexSettings indexSettings = indexService.getIndexSettings();
listener.afterIndexRemoved(indexService.index(), indexSettings, reason);
if (reason == IndexRemovalReason.DELETED) {
// now we are done - try to wipe data on disk if possible (去执行移除数据的操作,可以看下边详细分析)
deleteIndexStore(extraInfo, indexService.index(), indexSettings);
}
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to remove index {} ([{}][{}])", index, reason, extraInfo), e);
}
}
关闭索引和关闭分片的细节
es的索引是由分片组成的,分片又是一个lucene实例,索引在删除索引的时候,应该先将一个一个分片先关掉!
indexService.close()就是用来关闭分片的,同时,在这里边还做了特殊的操作,它会将一个一个的分片数据,根据目录,直接进行物理移除!调用链路非常隐蔽,这个流程是最复杂的,可以看下边的详细过程。最终调用的是:NodeEnvironment.deleteShardDirectoryUnderLock()方法。
接着点进去 indexService.close()方法
然后进入removeShard()方法
然后再进入closeShard()方法,然后从finally进入到store.close()
然后再进入decRef()
然后再从decRef() 进入refCounter.decRef()
再向下走到closeInternal(),接着就到了接口上了
接着看closeInternal()接口的实现类,走下边截图这个
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8ZPeQEk8-1659337774571)(C:\Users\zhangshuaixing\AppData\Roaming\Typora\typora-user-images\1652868197900.png)]
然后一个关键步骤来了:走到了onClose.accept(shardLock),点accept向下走,看到的是一个接口。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Kz9CE5V2-1659337774576)(C:\Users\zhangshuaixing\AppData\Roaming\Typora\typora-user-images\1652870088557.png)]
接着需要从看谁实现了Consumer这个接口,从上边的onClose.accept(shardLock),能够看到在OnClose中实现的。点onClose,进到OnClose类,看到accept只是一个接口,它的实现类在IndexService里边!然后里边调用了onShardClose(lock)方法。在里边又调用了shardStoreDeleter.deleteShardStore()方法,跟着deleteShardStore()向下看到的是一个接口,而这个接口的实现类是IndicesService.deleteShardStore()方法,里边又调用了nodeEnv.deleteShardDirectoryUnderLock(),在nodeEnv.deleteShardDirectoryUnderLock()方法里边,实现了将分片数据对应的文件夹,直接移除掉!
物理移除数据的细节deleteIndexStore()详解
注意分片对应的目录数据已经在上边close分片的时候已经移除掉了。
跟进上边代码的deleteIndexStore()方法
接着调用了deleteIndexStoreIfDeletionAllowed() 方法,在这个方法里边又调用nodeEnv.deleteIndexDirectorySafe()方法去执行删除。但是这个方法里边可能会有删除不成功的情况,因为要获取锁。所以又提供了一个兜底方案,假如删除操作失败了,则将删除索引的操作调用addPendingDelete()方法,来兜底。
nodeEnv.deleteIndexDirectorySafe()方法里边,有一个获取分片锁的过程。然后调用deleteIndexDirectoryUnderLock()方法,递归来删除数据。想要添加回收站功能,就要在这里去添加一个方法moveIndexDirectoryUnderLock(),用来替换deleteIndexDirectoryUnderLock()
再来看一下deleteIndexDirectoryUnderLock()源码
public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings) throws IOException {
final Path[] indexPaths = indexPaths(index);
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
IOUtils.rm(indexPaths);
if (indexSettings.hasCustomDataPath()) {
Path customLocation = resolveIndexCustomLocation(indexSettings);
logger.trace("deleting custom index {} directory [{}]", index, customLocation);
IOUtils.rm(customLocation);
}
}
实现更新集群状态信息源码的位置
进入到TransportDeleteIndexAction.masterOperation(),然后可以看到deleteIndexService.deleteIndices(deleteRequest, new ActionListener()
deleteIndexService.deleteIndices()中继续调用 clusterService.submitStateUpdateTask(),在这个方法中,new 了一个AckedClusterStateUpdateTask对象,这个对象里的execute()方法里又调用了**deleteIndices()**方法,这个方法里边是真正存放了从集群状态中,删除索引信息的操作。方法里边执行了移除索引状态信息的操作,并返回了一个分发任务,将更新后的集群状态信息,用于更新到集群的全部节点。后边将任务分发全部的节点,去执行更新集群状态信息的操作。
接着从clusterService.submitStateUpdateTask()继续向下跟进到了方法masterService.submitStateUpdateTasks(),再调用taskBatcher.submitTasks(safeTasks, config.timeout(), source)来提交任务,接下来会将上边构建的从集群状态中移除索引信息的任务分发给其它节点。
关于删除集群状态信息的详细操作,可以继续分析上边提到的 MetaDataDeleteIndexService.deleteIndices()
看一下它都做了什么:
- 遍历索引,从路由表中移除索引信息、移除clusterBlock、 删除IndexMetaData
- 将删除的索引放入墓碑
- 从快照中删除与待删除索引有关的内容
源码如下:
/** * Delete some indices from the cluster state. */
public ClusterState deleteIndices(ClusterState currentState, Set<Index> indices) {
final MetaData meta = currentState.metaData();
final Set<IndexMetaData> metaDatas = indices.stream().map(i -> meta.getIndexSafe(i)).collect(toSet());
// Check if index deletion conflicts with any running snapshots (检查快照)
SnapshotsService.checkIndexDeletion(currentState, metaDatas);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
MetaData.Builder metaDataBuilder = MetaData.builder(meta);
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks());
final IndexGraveyard.Builder graveyardBuilder = IndexGraveyard.builder(metaDataBuilder.indexGraveyard());
final int previousGraveyardSize = graveyardBuilder.tombstones().size();
// 关键的操作: 遍历索引
for (final Index index : indices) {
String indexName = index.getName();
logger.info("{} deleting index", index);
// 从路由表中移除索引信息,这个具体操作简单,只是从 map中根据名字,将索引移除。
routingTableBuilder.remove(indexName);
// clusterBlock是用来,标识索引允许什么类型的操作,比如read-only
clusterBlocksBuilder.removeIndexBlocks(indexName);
// 删除IndexMetaData, 它是用来描述索引信息的IndexMetaData对象,感兴趣可以去看一下IndexMetaData类
metaDataBuilder.remove(indexName);
}
// add tombstones to the cluster state for each deleted index (将删除的索引信息添加到墓碑中,墓碑中会记录删除过的索引) TOOD 注意这里比较重要,回收站功能的恢复数据是要改墓碑的。可以参考这里的获取墓碑的操作,并将墓碑中记录的删除移除掉。
final IndexGraveyard currentGraveyard = graveyardBuilder.addTombstones(indices).build(settings);
// 把墓碑数据更新到 集群元数据中
metaDataBuilder.indexGraveyard(currentGraveyard);
logger.trace("{} tombstones purged from the cluster state. Previous tombstone size: {}. Current tombstone size: {}.",
graveyardBuilder.getNumPurged(), previousGraveyardSize, currentGraveyard.getTombstones().size());
MetaData newMetaData = metaDataBuilder.build();
ClusterBlocks blocks = clusterBlocksBuilder.build();
// 更新快照列表,把和要删除的索引的快照任务都释放掉。
ImmutableOpenMap<String, ClusterState.Custom> customs = currentState.getCustoms();
// 定义一个释放释放快照类型的操作类型。
final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
if (restoreInProgress != null) {
RestoreInProgress updatedRestoreInProgress = RestoreService.updateRestoreStateWithDeletedIndices(restoreInProgress, indices);
if (updatedRestoreInProgress != restoreInProgress) {
ImmutableOpenMap.Builder<String, ClusterState.Custom> builder = ImmutableOpenMap.builder(customs);
builder.put(RestoreInProgress.TYPE, updatedRestoreInProgress);
customs = builder.build();
}
}
// 这里是定义了一个分发任务。用来更新集群状态信息。这里是点睛之笔,从这里我们一样就看出来,这个方法都改了那些地方
return allocationService.reroute(
// 集群状态,这里边挂着墓碑的状态。
ClusterState.builder(currentState)
// 路由表
.routingTable(routingTableBuilder.build())
// 元数据
.metaData(newMetaData)
// blocks信息
.blocks(blocks)
// 快照的状态信息
.customs(customs)
.build(),
"deleted indices [" + indices + "]");
}
边栏推荐
猜你喜欢
thinkphp框架5.0.23安全更新问题-漏洞修复-/thinkphp/library/think/App.php具体怎么改以及为什么要这么改
Jupyter Notebook(Anaconda)——两个环境分别修改默认打开目录(深度学习第一周番外篇)
MySQL详细安装与配置
Mppt photovoltaic maximum power point tracking control matlab simulation
分享一个 web 应用版本监测 (更新) 的工具库
阿里测试8年经验,靠着这份理解,我才得以生存下来
3 and a half years of testing experience, I don't have 20K, it seems it's time to change jobs
【C语言刷题】Leetcode203——移除链表元素
NC | 土壤微生物组的结构和功能揭示全球湿地N2O释放
看【C语言】实现简易计算器教程,让小伙伴们为你竖起大拇指
随机推荐
JVM内存和垃圾回收-03.运行时数据区概述及线程
Geoserver + mysql + openlayers problem
我用这一招让团队的开发效率提升了 100%!
golang面试题
有什么好用的IT资产管理软件
简单有效又有用的关闭antimalware service executable的方法·备份记录
Detailed explanation of AtomicInteger
golang刷leetcode动态规划(12)最小路径和
What are the useful real-time network traffic monitoring software
openlayers version update difference
松鼠短视频系统为用户加入随机头像代码-快速为用户加上随机头衔
SQL-UDT是什么功能?
中职网络安全竞赛之应用服务漏洞扫描与利用
C#里如何简单的校验时间格式
Based on OpenGL glaciers and firebird (illumination calculation model, visual, particle system)
治疗 | 如何识别和处理消极想法
香农与信息论三大定律
元旦快乐(2022)
去年,一道蚂蚁金服笔试题,还行,中等难度
AI智能剪辑,仅需2秒一键提取精彩片段