当前位置:网站首页>Es writing fragment process
Es writing fragment process
2022-07-03 07:30:00 【chuanyangwang】
org.elasticsearch.action.support.replication.TransportReplicationAction
TransportReplicationAction Is an abstract class
TransportReplicationAction and HandledTransportAction Subclasses that inherit it will go transportService register .
this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);
transportService.registerRequestHandler(transportPrimaryAction, executor, forceExecutionOnPrimary, true,
in -> new ConcreteShardRequest<>(requestReader, in), this::handlePrimaryRequest);
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(transportReplicaAction, executor, true, true,
in -> new ConcreteReplicaRequest<>(replicaRequestReader, in), this::handleReplicaRequest);TransportReplicationAction There are three kinds of action
- actionName ( Provided by subclasses )
- transportPrimaryAction ( For internal use )
- transportReplicaAction ( For internal use )
When the user requests actionName The treatment is as follows
protected void handleOperationRequest(final Request request, final TransportChannel channel, Task task) {
execute(task, request, new ChannelActionListener<>(channel, actionName, request));
}
// This method will eventually be called to route
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
assert request.shardId() != null : "request shardId must be set";
new ReroutePhase((ReplicationTask) task, request, listener).run();
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
// It will be executed locally transportPrimaryAction Method
performLocalAction(state, primary, node, indexMetaData);
} else {
// The request will be sent to the running master partition node On actionName
performRemoteAction(state, primary, node);
}Will eventually call
org.elasticsearch.action.support.replication.ReplicationOperation
public void execute() throws Exception {
final String activeShardCountFailure = checkActiveShardCount();
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId primaryId = primaryRouting.shardId();
if (activeShardCountFailure != null) {
finishAsFailed(new UnavailableShardsException(primaryId,
"{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
return;
}
totalShards.incrementAndGet();
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, resultListener::onFailure));
} private void handlePrimaryResult(final PrimaryResultT primaryResult) {
this.primaryResult = primaryResult;
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request);
}
// we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
// we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
// is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.computedGlobalCheckpoint();
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed
// on.
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
}
primaryResult.runPostReplicationActions(new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
successfulShards.incrementAndGet();
try {
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
} finally {
decPendingAndFinishIfNeeded();
}
}
@Override
public void onFailure(Exception e) {
logger.trace("[{}] op [{}] post replication actions failed for [{}]", primary.routingEntry().shardId(), opType, request);
// TODO: fail shard? This will otherwise have the local / global checkpoint info lagging, or possibly have replicas
// go out of sync with the primary
finishAsFailed(e);
}
});
}technological process ( The coordination node does not store the primary partition ) As follows :

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
final BulkItemRequest item = request.items()[i];
final BulkItemResponse response = item.getPrimaryResponse();
final Engine.Result operationResult;
if (item.getPrimaryResponse().isFailed()) {
if (response.getFailure().getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
continue; // ignore replication as we didn't generate a sequence number for this request.
}
final long primaryTerm;
if (response.getFailure().getTerm() == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
// primary is on older version, just take the current primary term
primaryTerm = replica.getOperationPrimaryTerm();
} else {
primaryTerm = response.getFailure().getTerm();
}
operationResult = replica.markSeqNoAsNoop(response.getFailure().getSeqNo(), primaryTerm,
response.getFailure().getMessage());
} else {
// If the entry is successfully inserted in the main partition
if (response.getResponse().getResult() == DocWriteResponse.Result.NOOP) {
continue; // ignore replication as it's a noop
}
assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
operationResult = performOpOnReplica(response.getResponse(), item.request(), replica);
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
}
return location;
}边栏推荐
- Hisat2 - stringtie - deseq2 pipeline for bulk RNA seq
- Jeecg request URL signature
- 【无标题】
- SharePoint modification usage analysis report is more than 30 days
- SecureCRT取消Session记录的密码
- 最全SQL与NoSQL优缺点对比
- Topic | synchronous asynchronous
- SQL create temporary table
- The embodiment of generics in inheritance and wildcards
- c语言指针的概念
猜你喜欢

Comparison of advantages and disadvantages between most complete SQL and NoSQL

Hash table, generic

【已解决】Unknown error 1146

Basic knowledge about SQL database

Summary of Arduino serial functions related to print read

Leetcode 213: looting II

Pat grade a real problem 1166

IPv4 address

Reconnaissance et détection d'images - Notes

1. E-commerce tool cefsharp autojs MySQL Alibaba cloud react C RPA automated script, open source log
随机推荐
圖像識別與檢測--筆記
[set theory] Stirling subset number (Stirling subset number concept | ball model | Stirling subset number recurrence formula | binary relationship refinement relationship of division)
Vertx metric Prometheus monitoring indicators
HISAT2 - StringTie - DESeq2 pipeline 进行bulk RNA-seq
Common architectures of IO streams
TypeScript let与var的区别
Common problems in io streams
在 4EVERLAND 上存储 WordPress 媒体内容,完成去中心化存储
File operation serialization recursive copy
Some experiences of Arduino soft serial port communication
Deep learning parameter initialization (I) Xavier initialization with code
4everland: the Web3 Developer Center on IPFs has deployed more than 30000 dapps!
sharepoint 2007 versions
SQL create temporary table
[solved] sqlexception: invalid value for getint() - 'Tian Peng‘
1. E-commerce tool cefsharp autojs MySQL Alibaba cloud react C RPA automated script, open source log
[Development Notes] cloud app control on device based on smart cloud 4G adapter gc211
VMWare网络模式-桥接,Host-Only,NAT网络
Summary of abnormal mechanism of interview
[plus de détails] dernière entrevue complète redis (50)