当前位置:网站首页>Es writing fragment process
Es writing fragment process
2022-07-03 07:30:00 【chuanyangwang】
org.elasticsearch.action.support.replication.TransportReplicationAction
TransportReplicationAction Is an abstract class
TransportReplicationAction and HandledTransportAction Subclasses that inherit it will go transportService register .
this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);
transportService.registerRequestHandler(transportPrimaryAction, executor, forceExecutionOnPrimary, true,
in -> new ConcreteShardRequest<>(requestReader, in), this::handlePrimaryRequest);
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(transportReplicaAction, executor, true, true,
in -> new ConcreteReplicaRequest<>(replicaRequestReader, in), this::handleReplicaRequest);TransportReplicationAction There are three kinds of action
- actionName ( Provided by subclasses )
- transportPrimaryAction ( For internal use )
- transportReplicaAction ( For internal use )
When the user requests actionName The treatment is as follows
protected void handleOperationRequest(final Request request, final TransportChannel channel, Task task) {
execute(task, request, new ChannelActionListener<>(channel, actionName, request));
}
// This method will eventually be called to route
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
assert request.shardId() != null : "request shardId must be set";
new ReroutePhase((ReplicationTask) task, request, listener).run();
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
// It will be executed locally transportPrimaryAction Method
performLocalAction(state, primary, node, indexMetaData);
} else {
// The request will be sent to the running master partition node On actionName
performRemoteAction(state, primary, node);
}Will eventually call
org.elasticsearch.action.support.replication.ReplicationOperation
public void execute() throws Exception {
final String activeShardCountFailure = checkActiveShardCount();
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId primaryId = primaryRouting.shardId();
if (activeShardCountFailure != null) {
finishAsFailed(new UnavailableShardsException(primaryId,
"{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
return;
}
totalShards.incrementAndGet();
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, resultListener::onFailure));
} private void handlePrimaryResult(final PrimaryResultT primaryResult) {
this.primaryResult = primaryResult;
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request);
}
// we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
// we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
// is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.computedGlobalCheckpoint();
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed
// on.
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
}
primaryResult.runPostReplicationActions(new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
successfulShards.incrementAndGet();
try {
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
} finally {
decPendingAndFinishIfNeeded();
}
}
@Override
public void onFailure(Exception e) {
logger.trace("[{}] op [{}] post replication actions failed for [{}]", primary.routingEntry().shardId(), opType, request);
// TODO: fail shard? This will otherwise have the local / global checkpoint info lagging, or possibly have replicas
// go out of sync with the primary
finishAsFailed(e);
}
});
}technological process ( The coordination node does not store the primary partition ) As follows :

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
final BulkItemRequest item = request.items()[i];
final BulkItemResponse response = item.getPrimaryResponse();
final Engine.Result operationResult;
if (item.getPrimaryResponse().isFailed()) {
if (response.getFailure().getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
continue; // ignore replication as we didn't generate a sequence number for this request.
}
final long primaryTerm;
if (response.getFailure().getTerm() == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
// primary is on older version, just take the current primary term
primaryTerm = replica.getOperationPrimaryTerm();
} else {
primaryTerm = response.getFailure().getTerm();
}
operationResult = replica.markSeqNoAsNoop(response.getFailure().getSeqNo(), primaryTerm,
response.getFailure().getMessage());
} else {
// If the entry is successfully inserted in the main partition
if (response.getResponse().getResult() == DocWriteResponse.Result.NOOP) {
continue; // ignore replication as it's a noop
}
assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
operationResult = performOpOnReplica(response.getResponse(), item.request(), replica);
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
}
return location;
}边栏推荐
- Spa single page application
- [cmake] cmake link SQLite Library
- The babbage industrial policy forum
- 【最詳細】最新最全Redis面試大全(50道)
- Pgadmin 4 v6.11 release, PostgreSQL open source graphical management tool
- 691. Cube IV
- JS monitors empty objects and empty references
- SecureCRT取消Session记录的密码
- Hash table, generic
- [set theory] Stirling subset number (Stirling subset number concept | ball model | Stirling subset number recurrence formula | binary relationship refinement relationship of division)
猜你喜欢

Map interface and method

Recursion, Fibonacci sequence

Interview questions about producers and consumers (important)

Introduction of buffer flow

High concurrency memory pool

URL programming

Summary of abnormal mechanism of interview

JS monitors empty objects and empty references

《指環王:力量之戒》新劇照 力量之戒鑄造者亮相

在 4EVERLAND 上存储 WordPress 媒体内容,完成去中心化存储
随机推荐
Take you through the whole process and comprehensively understand the software accidents that belong to testing
twenty million two hundred and twenty thousand three hundred and nineteen
Le Seigneur des anneaux: l'anneau du pouvoir
SecureCRT取消Session记录的密码
TreeMap
Visit Google homepage to display this page, which cannot be displayed
HCIA notes
Wireshark software usage
II. D3.js draw a simple figure -- circle
不出网上线CS的各种姿势
【开发笔记】基于机智云4G转接板GC211的设备上云APP控制
HISAT2 - StringTie - DESeq2 pipeline 进行bulk RNA-seq
Hash table, generic
Store WordPress media content on 4everland to complete decentralized storage
TCP cumulative acknowledgement and window value update
Logging log configuration of vertx
VMWare网络模式-桥接,Host-Only,NAT网络
IP home online query platform
High concurrency memory pool
【已解决】win10找不到本地组策略编辑器解决方法