当前位置:网站首页>Es writing fragment process
Es writing fragment process
2022-07-03 07:30:00 【chuanyangwang】
org.elasticsearch.action.support.replication.TransportReplicationAction
TransportReplicationAction Is an abstract class
TransportReplicationAction and HandledTransportAction Subclasses that inherit it will go transportService register .
this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);
transportService.registerRequestHandler(transportPrimaryAction, executor, forceExecutionOnPrimary, true,
in -> new ConcreteShardRequest<>(requestReader, in), this::handlePrimaryRequest);
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(transportReplicaAction, executor, true, true,
in -> new ConcreteReplicaRequest<>(replicaRequestReader, in), this::handleReplicaRequest);TransportReplicationAction There are three kinds of action
- actionName ( Provided by subclasses )
- transportPrimaryAction ( For internal use )
- transportReplicaAction ( For internal use )
When the user requests actionName The treatment is as follows
protected void handleOperationRequest(final Request request, final TransportChannel channel, Task task) {
execute(task, request, new ChannelActionListener<>(channel, actionName, request));
}
// This method will eventually be called to route
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
assert request.shardId() != null : "request shardId must be set";
new ReroutePhase((ReplicationTask) task, request, listener).run();
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
// It will be executed locally transportPrimaryAction Method
performLocalAction(state, primary, node, indexMetaData);
} else {
// The request will be sent to the running master partition node On actionName
performRemoteAction(state, primary, node);
}Will eventually call
org.elasticsearch.action.support.replication.ReplicationOperation
public void execute() throws Exception {
final String activeShardCountFailure = checkActiveShardCount();
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId primaryId = primaryRouting.shardId();
if (activeShardCountFailure != null) {
finishAsFailed(new UnavailableShardsException(primaryId,
"{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
return;
}
totalShards.incrementAndGet();
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, resultListener::onFailure));
} private void handlePrimaryResult(final PrimaryResultT primaryResult) {
this.primaryResult = primaryResult;
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request);
}
// we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
// we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
// is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.computedGlobalCheckpoint();
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed
// on.
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
}
primaryResult.runPostReplicationActions(new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
successfulShards.incrementAndGet();
try {
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
} finally {
decPendingAndFinishIfNeeded();
}
}
@Override
public void onFailure(Exception e) {
logger.trace("[{}] op [{}] post replication actions failed for [{}]", primary.routingEntry().shardId(), opType, request);
// TODO: fail shard? This will otherwise have the local / global checkpoint info lagging, or possibly have replicas
// go out of sync with the primary
finishAsFailed(e);
}
});
}technological process ( The coordination node does not store the primary partition ) As follows :

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
final BulkItemRequest item = request.items()[i];
final BulkItemResponse response = item.getPrimaryResponse();
final Engine.Result operationResult;
if (item.getPrimaryResponse().isFailed()) {
if (response.getFailure().getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
continue; // ignore replication as we didn't generate a sequence number for this request.
}
final long primaryTerm;
if (response.getFailure().getTerm() == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
// primary is on older version, just take the current primary term
primaryTerm = replica.getOperationPrimaryTerm();
} else {
primaryTerm = response.getFailure().getTerm();
}
operationResult = replica.markSeqNoAsNoop(response.getFailure().getSeqNo(), primaryTerm,
response.getFailure().getMessage());
} else {
// If the entry is successfully inserted in the main partition
if (response.getResponse().getResult() == DocWriteResponse.Result.NOOP) {
continue; // ignore replication as it's a noop
}
assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
operationResult = performOpOnReplica(response.getResponse(), item.request(), replica);
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
}
return location;
}边栏推荐
- 图像识别与检测--笔记
- Sorting, dichotomy
- Comparison of advantages and disadvantages between most complete SQL and NoSQL
- Vertx restful style web router
- File operation serialization recursive copy
- 《指环王:力量之戒》新剧照 力量之戒铸造者亮相
- Chapter VI - Containers
- Paper learning -- Study on the similarity of water level time series of Xingzi station in Poyang Lake
- sharepoint 2007 versions
- [solved] unknown error 1146
猜你喜欢

1. E-commerce tool cefsharp autojs MySQL Alibaba cloud react C RPA automated script, open source log

SecureCRT password to cancel session recording

JS monitors empty objects and empty references

How long is the fastest time you can develop data API? One minute is enough for me

IO stream system and FileReader, filewriter

【CoppeliaSim4.3】C#调用 remoteApi控制场景中UR5
![[solved] sqlexception: invalid value for getint() - 'Tian Peng‘](/img/bf/f6310304d58d964b3d09a9d011ddb5.png)
[solved] sqlexception: invalid value for getint() - 'Tian Peng‘

带你全流程,全方位的了解属于测试的软件事故

Introduction of buffer flow

High concurrency memory pool
随机推荐
Download address collection of various versions of devaexpress
The concept of C language pointer
Pat grade a real problem 1166
SecureCRT取消Session记录的密码
Realize the reuse of components with different routing parameters and monitor the changes of routing parameters
Common architectures of IO streams
Vertx restful style web router
Lombok cooperates with @slf4j and logback to realize logging
Dora (discover offer request recognition) process of obtaining IP address
An overview of IfM Engage
Map interface and method
Hisat2 - stringtie - deseq2 pipeline for bulk RNA seq
C代码生产YUV420 planar格式文件
Vertx multi vertical shared data
Leetcode 213: 打家劫舍 II
Comparison of advantages and disadvantages between most complete SQL and NoSQL
[most detailed] latest and complete redis interview book (50)
Warehouse database fields_ Summary of SQL problems in kingbase8 migration of Jincang database
Image recognition and detection -- Notes
Jeecg request URL signature