当前位置:网站首页>Es writing fragment process
Es writing fragment process
2022-07-03 07:30:00 【chuanyangwang】
org.elasticsearch.action.support.replication.TransportReplicationAction
TransportReplicationAction Is an abstract class
TransportReplicationAction and HandledTransportAction Subclasses that inherit it will go transportService register .
this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);
transportService.registerRequestHandler(transportPrimaryAction, executor, forceExecutionOnPrimary, true,
in -> new ConcreteShardRequest<>(requestReader, in), this::handlePrimaryRequest);
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(transportReplicaAction, executor, true, true,
in -> new ConcreteReplicaRequest<>(replicaRequestReader, in), this::handleReplicaRequest);TransportReplicationAction There are three kinds of action
- actionName ( Provided by subclasses )
- transportPrimaryAction ( For internal use )
- transportReplicaAction ( For internal use )
When the user requests actionName The treatment is as follows
protected void handleOperationRequest(final Request request, final TransportChannel channel, Task task) {
execute(task, request, new ChannelActionListener<>(channel, actionName, request));
}
// This method will eventually be called to route
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
assert request.shardId() != null : "request shardId must be set";
new ReroutePhase((ReplicationTask) task, request, listener).run();
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
// It will be executed locally transportPrimaryAction Method
performLocalAction(state, primary, node, indexMetaData);
} else {
// The request will be sent to the running master partition node On actionName
performRemoteAction(state, primary, node);
}Will eventually call
org.elasticsearch.action.support.replication.ReplicationOperation
public void execute() throws Exception {
final String activeShardCountFailure = checkActiveShardCount();
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId primaryId = primaryRouting.shardId();
if (activeShardCountFailure != null) {
finishAsFailed(new UnavailableShardsException(primaryId,
"{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
return;
}
totalShards.incrementAndGet();
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, resultListener::onFailure));
} private void handlePrimaryResult(final PrimaryResultT primaryResult) {
this.primaryResult = primaryResult;
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request);
}
// we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
// we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
// is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.computedGlobalCheckpoint();
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed
// on.
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
}
primaryResult.runPostReplicationActions(new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
successfulShards.incrementAndGet();
try {
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
} finally {
decPendingAndFinishIfNeeded();
}
}
@Override
public void onFailure(Exception e) {
logger.trace("[{}] op [{}] post replication actions failed for [{}]", primary.routingEntry().shardId(), opType, request);
// TODO: fail shard? This will otherwise have the local / global checkpoint info lagging, or possibly have replicas
// go out of sync with the primary
finishAsFailed(e);
}
});
}technological process ( The coordination node does not store the primary partition ) As follows :

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
final BulkItemRequest item = request.items()[i];
final BulkItemResponse response = item.getPrimaryResponse();
final Engine.Result operationResult;
if (item.getPrimaryResponse().isFailed()) {
if (response.getFailure().getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
continue; // ignore replication as we didn't generate a sequence number for this request.
}
final long primaryTerm;
if (response.getFailure().getTerm() == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
// primary is on older version, just take the current primary term
primaryTerm = replica.getOperationPrimaryTerm();
} else {
primaryTerm = response.getFailure().getTerm();
}
operationResult = replica.markSeqNoAsNoop(response.getFailure().getSeqNo(), primaryTerm,
response.getFailure().getMessage());
} else {
// If the entry is successfully inserted in the main partition
if (response.getResponse().getResult() == DocWriteResponse.Result.NOOP) {
continue; // ignore replication as it's a noop
}
assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
operationResult = performOpOnReplica(response.getResponse(), item.request(), replica);
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
}
return location;
}边栏推荐
- [HCAI] learning summary OSI model
- sharepoint 2007 versions
- List exercises after class
- IPv4 address
- Store WordPress media content on 4everland to complete decentralized storage
- Basic knowledge about SQL database
- Jeecg request URL signature
- [Development Notes] cloud app control on device based on smart cloud 4G adapter gc211
- [solved] sqlexception: invalid value for getint() - 'Tian Peng‘
- Use of other streams
猜你喜欢
随机推荐
Recursion, Fibonacci sequence
An overview of IfM Engage
Common analysis with criteria method
[solved] win10 cannot find a solution to the local group policy editor
The underlying mechanism of advertising on websites
在 4EVERLAND 上存储 WordPress 媒体内容,完成去中心化存储
Common methods of file class
IPv4 address
Win 2008 R2 crashed at the final installation stage
Basic knowledge about SQL database
为什么说数据服务化是下一代数据中台的方向?
How long is the fastest time you can develop data API? One minute is enough for me
docket
The babbage industrial policy forum
Read config configuration file of vertx
Operation and maintenance technical support personnel have hardware maintenance experience in Hong Kong
Leetcode 213: looting II
Deep learning parameter initialization (I) Xavier initialization with code
Raspberry pie update tool chain
TypeScript let與var的區別







![[solved] unknown error 1146](/img/f1/b8dd3ca8359ac9eb19e1911bd3790a.png)

