当前位置:网站首页>Es writing fragment process
Es writing fragment process
2022-07-03 07:30:00 【chuanyangwang】
org.elasticsearch.action.support.replication.TransportReplicationAction
TransportReplicationAction Is an abstract class
TransportReplicationAction and HandledTransportAction Subclasses that inherit it will go transportService register .
this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);
transportService.registerRequestHandler(transportPrimaryAction, executor, forceExecutionOnPrimary, true,
in -> new ConcreteShardRequest<>(requestReader, in), this::handlePrimaryRequest);
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(transportReplicaAction, executor, true, true,
in -> new ConcreteReplicaRequest<>(replicaRequestReader, in), this::handleReplicaRequest);
TransportReplicationAction There are three kinds of action
- actionName ( Provided by subclasses )
- transportPrimaryAction ( For internal use )
- transportReplicaAction ( For internal use )
When the user requests actionName The treatment is as follows
protected void handleOperationRequest(final Request request, final TransportChannel channel, Task task) {
execute(task, request, new ChannelActionListener<>(channel, actionName, request));
}
// This method will eventually be called to route
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
assert request.shardId() != null : "request shardId must be set";
new ReroutePhase((ReplicationTask) task, request, listener).run();
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
// It will be executed locally transportPrimaryAction Method
performLocalAction(state, primary, node, indexMetaData);
} else {
// The request will be sent to the running master partition node On actionName
performRemoteAction(state, primary, node);
}
Will eventually call
org.elasticsearch.action.support.replication.ReplicationOperation
public void execute() throws Exception {
final String activeShardCountFailure = checkActiveShardCount();
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId primaryId = primaryRouting.shardId();
if (activeShardCountFailure != null) {
finishAsFailed(new UnavailableShardsException(primaryId,
"{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
return;
}
totalShards.incrementAndGet();
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, resultListener::onFailure));
}
private void handlePrimaryResult(final PrimaryResultT primaryResult) {
this.primaryResult = primaryResult;
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request);
}
// we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
// we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
// is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.computedGlobalCheckpoint();
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed
// on.
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
}
primaryResult.runPostReplicationActions(new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
successfulShards.incrementAndGet();
try {
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
} finally {
decPendingAndFinishIfNeeded();
}
}
@Override
public void onFailure(Exception e) {
logger.trace("[{}] op [{}] post replication actions failed for [{}]", primary.routingEntry().shardId(), opType, request);
// TODO: fail shard? This will otherwise have the local / global checkpoint info lagging, or possibly have replicas
// go out of sync with the primary
finishAsFailed(e);
}
});
}
technological process ( The coordination node does not store the primary partition ) As follows :
public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
final BulkItemRequest item = request.items()[i];
final BulkItemResponse response = item.getPrimaryResponse();
final Engine.Result operationResult;
if (item.getPrimaryResponse().isFailed()) {
if (response.getFailure().getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
continue; // ignore replication as we didn't generate a sequence number for this request.
}
final long primaryTerm;
if (response.getFailure().getTerm() == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
// primary is on older version, just take the current primary term
primaryTerm = replica.getOperationPrimaryTerm();
} else {
primaryTerm = response.getFailure().getTerm();
}
operationResult = replica.markSeqNoAsNoop(response.getFailure().getSeqNo(), primaryTerm,
response.getFailure().getMessage());
} else {
// If the entry is successfully inserted in the main partition
if (response.getResponse().getResult() == DocWriteResponse.Result.NOOP) {
continue; // ignore replication as it's a noop
}
assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
operationResult = performOpOnReplica(response.getResponse(), item.request(), replica);
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
}
return location;
}
边栏推荐
- twenty million two hundred and twenty thousand three hundred and nineteen
- Introduction of buffer flow
- Understanding of class
- [cmake] cmake link SQLite Library
- Jeecg request URL signature
- Image recognition and detection -- Notes
- 为什么说数据服务化是下一代数据中台的方向?
- 【已解决】Unknown error 1146
- [untitled]
- The babbage industrial policy forum
猜你喜欢
《指环王:力量之戒》新剧照 力量之戒铸造者亮相
C code production YUV420 planar format file
Sorting, dichotomy
3311. Longest arithmetic
Store WordPress media content on 4everland to complete decentralized storage
Pat grade a real problem 1166
Common architectures of IO streams
Comparison of advantages and disadvantages between most complete SQL and NoSQL
The embodiment of generics in inheritance and wildcards
Homology policy / cross domain and cross domain solutions /web security attacks CSRF and XSS
随机推荐
[HCAI] learning summary OSI model
Read config configuration file of vertx
HCIA notes
Leetcode 198: 打家劫舍
New stills of Lord of the rings: the ring of strength: the caster of the ring of strength appears
Vertx multi vertical shared data
Spa single page application
4279. Cartesian tree
pgAdmin 4 v6.11 发布,PostgreSQL 开源图形化管理工具
Logging log configuration of vertx
Jeecg data button permission settings
sharepoint 2007 versions
[Development Notes] cloud app control on device based on smart cloud 4G adapter gc211
【开发笔记】基于机智云4G转接板GC211的设备上云APP控制
"Moss ma not found" solution
Lombok cooperates with @slf4j and logback to realize logging
Vertx metric Prometheus monitoring indicators
Warehouse database fields_ Summary of SQL problems in kingbase8 migration of Jincang database
Chapter VI - Containers
4EVERLAND:IPFS 上的 Web3 开发者中心,部署了超过 30,000 个 Dapp!