Explain the implementation of efficient distribution and collection of Java ES multi node tasks in detail
- 2021-10-15 10:26:01
- OfStack
1. Overview
As we know, when we initiate search requests or other operations on es, we often randomly select one coordinator to initiate requests. This request may be handled by this node, or it may not be handled by this node, or it may need to be handled by multiple nodes together, so it can be said that the situation is more complicated.
Therefore, the important work of coordinator is to distribute requests and collect results. Then, how to realize this function with high performance, safety and accuracy is very important.
2. A simple idea for request distribution
The request distribution we are talking about here is generally for multiple network nodes. So, how do you send requests to multiple nodes and finally merge the results?
Each node is requested synchronously, and when the first node responds, the request is made to the second node, and so on, until all the requests are completed, and then the results are aggregated. The demand is completed, and it takes no effort. Is it simple?
Brain-less processing has its own disadvantages. Request each node in turn, unable to make good use of the distributed characteristics of the system, and change the behavior of serial, good or not. In addition, for the current request, when it does not process the distribution and collection of all nodes, the current thread will be occupied. As a result, downstream requests will no longer be able to be accessed, thus giving you concurrency ability, which is comparable to the thread pool size. This is not good.
Let's find ways to optimize it in turn.
First of all, we can change serial distribution request into parallel distribution, that is, we can use multiple threads to initiate requests to multiple nodes, and when a thread completes processing, it will return results. Use a synchronization tool similar to CountDownLatch to ensure that all nodes are processed, and then the results are merged by an external single main thread.
The above optimization looks good and avoids the performance problem of synchronization. However, when one node responds very slowly, it will block the work of subsequent nodes, thus slowing down the whole request, which will also become the bottleneck of thread pool size, which is the concurrency capability. It can be said that the symptoms are not the root causes.
Again, continue to optimize. We can release the holding of the main thread, and let each distribution thread check whether the task queue is completed when the current task is completed. Ignore if it is not completed, and start the merge task if it is completed.
It looks good. It's already completely concurrent. But can it be optimized? The distribution of each node is also a synchronous request. Although the processing is simple, the thread cannot be used during the server response. If there are too many similar requests, it must be a lot of consumption. If the request of a single node can be processed asynchronously, wouldn't it be perfect? But I'm afraid this is not easy to do! However, it is a good idea after all.
3. Multi-node distribution collection of search in es
Let's take the distribution collection of search as a starting point to see how es can do this. The reason is that search is the most common and classic in es. Although it is not possible to realize one in every place, at least there is some reference significance. Therefore, search is taken as the breakthrough point. The framework workflow of search, which we have studied before, will start with the core in this section, which is in TransportSearchAction. executeRequest ().
// org.elasticsearch.action.search.TransportSearchAction#executeRequest
private void executeRequest(Task task, SearchRequest searchRequest,
SearchAsyncActionProvider searchAsyncActionProvider, ActionListener<SearchResponse> listener) {
final long relativeStartNanos = System.nanoTime();
final SearchTimeProvider timeProvider =
new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
// situations when source is rewritten to null due to a bug
searchRequest.source(source);
}
final ClusterState clusterState = clusterService.state();
final SearchContextId searchContext;
final Map<String, OriginalIndices> remoteClusterIndices;
if (searchRequest.pointInTimeBuilder() != null) {
searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId());
remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
} else {
searchContext = null;
remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),
searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState));
}
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
if (remoteClusterIndices.isEmpty()) {
executeLocalSearch(
task, timeProvider, searchRequest, localIndices, clusterState, listener, searchContext, searchAsyncActionProvider);
} else {
// Multi-node data request
if (shouldMinimizeRoundtrips(searchRequest)) {
// Pass parentTaskId Associate all subtasks
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).getTaskId();
ccsRemoteReduce(parentTaskId, searchRequest, localIndices, remoteClusterIndices, timeProvider,
searchService.aggReduceContextBuilder(searchRequest),
remoteClusterService, threadPool, listener,
(r, l) -> executeLocalSearch(
task, timeProvider, r, localIndices, clusterState, l, searchContext, searchAsyncActionProvider));
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
// Direct distribution is more shard Request to each node
collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
ActionListener.wrap(
searchShardsResponses -> {
// When all nodes respond, then do the follow-up logical processing, that is, the post-monitoring here
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup =
getRemoteClusterNodeLookup(searchShardsResponses);
final Map<String, AliasFilter> remoteAliasFilters;
final List<SearchShardIterator> remoteShardIterators;
if (searchContext != null) {
remoteAliasFilters = searchContext.aliasFilter();
remoteShardIterators = getRemoteShardsIteratorFromPointInTime(searchShardsResponses,
searchContext, searchRequest.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices);
} else {
remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
remoteShardIterators = getRemoteShardsIterator(searchShardsResponses, remoteClusterIndices,
remoteAliasFilters);
}
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteClusterIndices.size() + localClusters;
int successfulClusters = searchShardsResponses.size() + localClusters;
// As for the follow-up search, it is not here
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteShardIterators,
clusterNodeLookup, clusterState, remoteAliasFilters, listener,
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
searchContext, searchAsyncActionProvider);
},
listener::onFailure));
}
}
}, listener::onFailure);
if (searchRequest.source() == null) {
rewriteListener.onResponse(searchRequest.source());
} else {
Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener);
}
}
It can be seen that search functions of es will be divided into several types, some of which will be distributed in clusters, while others will not be needed. Naturally, we want to go cluster distribution, so just look at collectSearchShards (). This is actually a sequential request for multiple cluster nodes, and of course, the collection of results.
// org.elasticsearch.action.search.TransportSearchAction#collectSearchShards
static void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, AtomicInteger skippedClusters,
Map<String, OriginalIndices> remoteIndicesByCluster, RemoteClusterService remoteClusterService,
ThreadPool threadPool, ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
// Use this counter for result control
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
final AtomicReference<Exception> exceptions = new AtomicReference<>();
// Iterate each node and send requests in turn
for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
final String clusterAlias = entry.getKey();
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
Client clusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
final String[] indices = entry.getValue().indices();
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices)
.indicesOptions(indicesOptions).local(true).preference(preference).routing(routing);
// To the cluster clusterAlias Asynchronous initiation request processing search
clusterClient.admin().cluster().searchShards(searchShardsRequest,
new CCSActionListener<ClusterSearchShardsResponse, Map<String, ClusterSearchShardsResponse>>(
clusterAlias, skipUnavailable, responsesCountDown, skippedClusters, exceptions, listener) {
@Override
void innerOnResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
// Each time a single node responds, the result is stored in the searchShardsResponses Medium
searchShardsResponses.put(clusterAlias, clusterSearchShardsResponse);
}
@Override
Map<String, ClusterSearchShardsResponse> createFinalResponse() {
// Returns the result set when all nodes return
return searchShardsResponses;
}
}
);
}
}
// org.elasticsearch.client.support.AbstractClient.ClusterAdmin#searchShards
@Override
public void searchShards(final ClusterSearchShardsRequest request, final ActionListener<ClusterSearchShardsResponse> listener) {
// Initiate a request indices:admin/shards/search_shards, Its corresponding processor is TransportClusterSearchShardsAction
execute(ClusterSearchShardsAction.INSTANCE, request, listener);
}
The above is the process that es initiates requests to multiple nodes in the cluster, and its emphasis is that all requests are asynchronous requests, that is, after sending complete requests to each node, the current thread is disconnected. This reflects the ability of non-blocking, and the subsequent business is processed in the form of listner. This is naturally no problem for sending, but how to collect results? It is actually handled through listner. After the remote node responds, listener. onResponse () will be called.
3.1. Multi-node response result processing
This is the focus of our discussion in this paper. As we saw earlier, es has sent requests asynchronously (no matter how they are sent), so how to collect the results is also critical. In es, the practice is very simple, using one ConcurrentHashMap to collect each result, and one CountDown to identify whether the processing has been completed.
// org.elasticsearch.action.search.TransportSearchAction.CCSActionListener#CCSActionListener
CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters,
AtomicReference<Exception> exceptions, ActionListener<FinalResponse> originalListener) {
this.clusterAlias = clusterAlias;
this.skipUnavailable = skipUnavailable;
this.countDown = countDown;
this.skippedClusters = skippedClusters;
this.exceptions = exceptions;
this.originalListener = originalListener;
}
// Response on success
@Override
public final void onResponse(Response response) {
// inner The response is to put the result into the searchShardsResponses Medium
innerOnResponse(response);
// maybeFinish If the result is completed, the callback method is called to construct the result
maybeFinish();
}
private void maybeFinish() {
// Use 1 A AtomicInteger Carry out control
if (countDown.countDown()) {
Exception exception = exceptions.get();
if (exception == null) {
FinalResponse response;
try {
// To create a response result, here search That is searchShardsResponses
response = createFinalResponse();
} catch(Exception e) {
originalListener.onFailure(e);
return;
}
// Successfully respond to callback, and realize other business processing after result collection
originalListener.onResponse(response);
} else {
originalListener.onFailure(exceptions.get());
}
}
}
// CountDown The implementation is relatively simple, and only the last 1 Return true, Everything else false, That is to say, the At Most Once Semantics
/**
* Decrements the count-down and returns <code>true</code> iff this call
* reached zero otherwise <code>false</code>
*/
public boolean countDown() {
assert originalCount > 0;
for (;;) {
final int current = countDown.get();
assert current >= 0;
if (current == 0) {
return false;
}
if (countDown.compareAndSet(current, current - 1)) {
return current == 1;
}
}
}
It can be seen that the result collection in ES is processed by CountDown implemented by AtomicInteger. When all nodes respond, the final result will be processed, otherwise, the data of each node will be put into ConcurrentHashMap for temporary storage.
And through an Client general asynchronous call framework, multi-node asynchronous submission is realized. The whole node response takes CCSActionListener as the receiver. It can be said to be relatively concise, and it seems that it does not have the complexity we discussed earlier. Because: Avenue to Jane.
3.2. Implementation of asynchronous submission request
As we know, if you want to commit requests asynchronously locally, you only need to use another thread or thread pool technology. For asynchronous submission of remote Client, external tools are needed. Here, with the help of channel. write () implementation of Netty, the node responds by calling back to restore the context. In the whole process, there is no 1 point blocking synchronization, thus achieving efficient processing capacity. Of course, there are other 1 exception handling, needless to say.
The specific example is roughly as follows: Because the final processor is processed with TransportClusterSearchShardsAction, go directly to TransportClusterSearchShardsAction.
// org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction
public class TransportClusterSearchShardsAction extends
TransportMasterNodeReadAction<ClusterSearchShardsRequest, ClusterSearchShardsResponse> {
private final IndicesService indicesService;
@Inject
public TransportClusterSearchShardsAction(TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(ClusterSearchShardsAction.NAME, transportService, clusterService, threadPool, actionFilters,
ClusterSearchShardsRequest::new, indexNameExpressionResolver, ClusterSearchShardsResponse::new, ThreadPool.Names.SAME);
this.indicesService = indicesService;
}
@Override
protected ClusterBlockException checkBlock(ClusterSearchShardsRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ,
indexNameExpressionResolver.concreteIndexNames(state, request));
}
@Override
protected void masterOperation(final ClusterSearchShardsRequest request, final ClusterState state,
final ActionListener<ClusterSearchShardsResponse> listener) {
ClusterState clusterState = clusterService.state();
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, request);
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(state, request.routing(), request.indices());
Map<String, AliasFilter> indicesAndFilters = new HashMap<>();
Set<String> indicesAndAliases = indexNameExpressionResolver.resolveExpressions(clusterState, request.indices());
for (String index : concreteIndices) {
final AliasFilter aliasFilter = indicesService.buildAliasFilter(clusterState, index, indicesAndAliases);
final String[] aliases = indexNameExpressionResolver.indexAliases(clusterState, index, aliasMetadata -> true, true,
indicesAndAliases);
indicesAndFilters.put(index, new AliasFilter(aliasFilter.getQueryBuilder(), aliases));
}
Set<String> nodeIds = new HashSet<>();
GroupShardsIterator<ShardIterator> groupShardsIterator = clusterService.operationRouting()
.searchShards(clusterState, concreteIndices, routingMap, request.preference());
ShardRouting shard;
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];
int currentGroup = 0;
for (ShardIterator shardIt : groupShardsIterator) {
ShardId shardId = shardIt.shardId();
ShardRouting[] shardRoutings = new ShardRouting[shardIt.size()];
int currentShard = 0;
shardIt.reset();
while ((shard = shardIt.nextOrNull()) != null) {
shardRoutings[currentShard++] = shard;
nodeIds.add(shard.currentNodeId());
}
groupResponses[currentGroup++] = new ClusterSearchShardsGroup(shardId, shardRoutings);
}
DiscoveryNode[] nodes = new DiscoveryNode[nodeIds.size()];
int currentNode = 0;
for (String nodeId : nodeIds) {
nodes[currentNode++] = clusterState.getNodes().get(nodeId);
}
listener.onResponse(new ClusterSearchShardsResponse(groupResponses, nodes, indicesAndFilters));
}
}
// doExecute Completed in the parent class
// org.elasticsearch.action.support.master.TransportMasterNodeAction#doExecute
@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
if (task != null) {
request.setParentTask(clusterService.localNode().getId(), task.getId());
}
new AsyncSingleAction(task, request, listener).doStart(state);
}
// org.elasticsearch.action.support.master.TransportMasterNodeAction.AsyncSingleAction#doStart
AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
this.task = task;
this.request = request;
this.listener = listener;
this.startTime = threadPool.relativeTimeInMillis();
}
protected void doStart(ClusterState clusterState) {
try {
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
// check for block, if blocked, retry, else, execute locally
final ClusterBlockException blockException = checkBlock(request, clusterState);
if (blockException != null) {
if (!blockException.retryable()) {
listener.onFailure(blockException);
} else {
logger.debug("can't execute due to a cluster block, retrying", blockException);
// Retry processing
retry(clusterState, blockException, newState -> {
try {
ClusterBlockException newException = checkBlock(request, newState);
return (newException == null || !newException.retryable());
} catch (Exception e) {
// accept state as block will be rechecked by doStart() and listener.onFailure() then called
logger.trace("exception occurred during cluster block checking, accepting state", e);
return true;
}
});
}
} else {
ActionListener<Response> delegate = ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) {
logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " +
"stepped down before publishing action [{}], scheduling a retry", actionName), t);
retryOnMasterChange(clusterState, t);
} else {
delegatedListener.onFailure(t);
}
});
// The execution results of local nodes can be directly processed by asynchronous threads
threadPool.executor(executor)
.execute(ActionRunnable.wrap(delegate, l -> masterOperation(task, request, clusterState, l)));
}
} else {
if (nodes.getMasterNode() == null) {
logger.debug("no known master node, scheduling a retry");
retryOnMasterChange(clusterState, null);
} else {
DiscoveryNode masterNode = nodes.getMasterNode();
final String actionName = getMasterActionName(masterNode);
// Send to master Node, to netty As a communication tool, callback after completion Current listner
transportService.sendRequest(masterNode, actionName, request,
new ActionListenerResponseHandler<Response>(listener, responseReader) {
@Override
public void handleException(final TransportException exp) {
Throwable cause = exp.unwrapCause();
if (cause instanceof ConnectTransportException ||
(exp instanceof RemoteTransportException && cause instanceof NodeClosedException)) {
// we want to retry here a bit to see if a new master is elected
logger.debug("connection exception while trying to forward request with action name [{}] to " +
"master node [{}], scheduling a retry. Error: [{}]",
actionName, nodes.getMasterNode(), exp.getDetailedMessage());
retryOnMasterChange(clusterState, cause);
} else {
listener.onFailure(exp);
}
}
});
}
}
} catch (Exception e) {
listener.onFailure(e);
}
}
It can be seen that there are two asynchronous submission methods in es. One is that the current node is the execution node and directly uses the thread pool to submit; The other is that the remote node starts the network call, and finally how to achieve asynchronous and look down.
// org.elasticsearch.transport.TransportService#sendRequest
public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
final Transport.Connection connection;
try {
// If it is not this node, get the remote 1 A connection, channel
connection = getConnection(node);
} catch (final NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
return;
}
sendRequest(connection, action, request, options, handler);
}
// org.elasticsearch.transport.TransportService#getConnection
/**
* Returns either a real transport connection or a local node connection if we are using the local node optimization.
* @throws NodeNotConnectedException if the given node is not connected
*/
public Transport.Connection getConnection(DiscoveryNode node) {
if (isLocalNode(node)) {
return localNodeConnection;
} else {
return connectionManager.getConnection(node);
}
}
// org.elasticsearch.transport.TransportService#sendRequest
/**
* Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked.
*
* @param connection the connection to send the request on
* @param action the name of the action
* @param request the request
* @param options the options for this request
* @param handler the response handler
* @param <T> the type of the transport response
*/
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
try {
final TransportResponseHandler<T> delegate;
if (request.getParentTask().isSet()) {
// If the connection is a proxy connection, then we will create a cancellable proxy task on the proxy node and an actual
// child task on the target node of the remote cluster.
// ----> a parent task on the local cluster
// |
// ----> a proxy task on the proxy node on the remote cluster
// |
// ----> an actual child task on the target node on the remote cluster
// To cancel the child task on the remote cluster, we must send a cancel request to the proxy node instead of the target
// node as the parent task of the child task is the proxy task not the parent task on the local cluster. Hence, here we
// unwrap the connection and keep track of the connection to the proxy node instead of the proxy connection.
final Transport.Connection unwrappedConn = unwrapConnection(connection);
final Releasable unregisterChildNode = taskManager.registerChildConnection(request.getParentTask().getId(), unwrappedConn);
delegate = new TransportResponseHandler<T>() {
@Override
public void handleResponse(T response) {
unregisterChildNode.close();
handler.handleResponse(response);
}
@Override
public void handleException(TransportException exp) {
unregisterChildNode.close();
handler.handleException(exp);
}
@Override
public String executor() {
return handler.executor();
}
@Override
public T read(StreamInput in) throws IOException {
return handler.read(in);
}
@Override
public String toString() {
return getClass().getName() + "/[" + action + "]:" + handler.toString();
}
};
} else {
delegate = handler;
}
asyncSender.sendRequest(connection, action, request, options, delegate);
} catch (final Exception ex) {
// the caller might not handle this so we invoke the handler
final TransportException te;
if (ex instanceof TransportException) {
te = (TransportException) ex;
} else {
te = new TransportException("failure to send", ex);
}
handler.handleException(te);
}
}
// org.elasticsearch.transport.TransportService#sendRequestInternal
private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
if (connection == null) {
throw new IllegalStateException("can't send request to a null connection");
}
DiscoveryNode node = connection.getNode();
Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
// TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring
final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
final TimeoutHandler timeoutHandler;
if (options.timeout() != null) {
timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);
responseHandler.setTimeoutHandler(timeoutHandler);
} else {
timeoutHandler = null;
}
try {
if (lifecycle.stoppedOrClosed()) {
/*
* If we are not started the exception handling will remove the request holder again and calls the handler to notify the
* caller. It will only notify if toStop hasn't done the work yet.
*/
throw new NodeClosedException(localNode);
}
if (timeoutHandler != null) {
assert options.timeout() != null;
timeoutHandler.scheduleTimeout(options.timeout());
}
connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
} catch (final Exception e) {
// usually happen either because we failed to connect to the node
// or because we failed serializing the message
final Transport.ResponseContext<? extends TransportResponse> contextToNotify = responseHandlers.remove(requestId);
// If holderToNotify == null then handler has already been taken care of.
if (contextToNotify != null) {
if (timeoutHandler != null) {
timeoutHandler.cancel();
}
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows. In the special case of running into a closing node we run on the current
// thread on a best effort basis though.
final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);
final String executor = lifecycle.stoppedOrClosed() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public void onRejection(Exception e) {
// if we get rejected during node shutdown we don't wanna bubble it up
logger.debug(
() -> new ParameterizedMessage(
"failed to notify response handler on rejection, action: {}",
contextToNotify.action()),
e);
}
@Override
public void onFailure(Exception e) {
logger.warn(
() -> new ParameterizedMessage(
"failed to notify response handler on exception, action: {}",
contextToNotify.action()),
e);
}
@Override
protected void doRun() throws Exception {
contextToNotify.handler().handleException(sendRequestException);
}
});
} else {
logger.debug("Exception while sending request, handler likely already notified due to timeout", e);
}
}
}
// org.elasticsearch.transport.RemoteConnectionManager.ProxyConnection#sendRequest
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
TransportActionProxy.wrapRequest(targetNode, request), options);
}
// org.elasticsearch.transport.TcpTransport.NodeChannels#sendRequest
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
if (isClosing.get()) {
throw new NodeNotConnectedException(node, "connection already closed");
}
TcpChannel channel = channel(options.type());
outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), compress, false);
}
// org.elasticsearch.transport.OutboundHandler#sendRequest
/**
* Sends the request to the given channel. This method should be used to send {@link TransportRequest}
* objects back to the caller.
*/
void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long requestId, final String action,
final TransportRequest request, final TransportRequestOptions options, final Version channelVersion,
final boolean compressRequest, final boolean isHandshake) throws IOException, TransportException {
Version version = Version.min(this.version, channelVersion);
OutboundMessage.Request message = new OutboundMessage.Request(threadPool.getThreadContext(), features, request, version, action,
requestId, isHandshake, compressRequest);
ActionListener<Void> listener = ActionListener.wrap(() ->
messageListener.onRequestSent(node, requestId, action, request, options));
sendMessage(channel, message, listener);
}
// org.elasticsearch.transport.OutboundHandler#sendMessage
private void sendMessage(TcpChannel channel, OutboundMessage networkMessage, ActionListener<Void> listener) throws IOException {
MessageSerializer serializer = new MessageSerializer(networkMessage, bigArrays);
SendContext sendContext = new SendContext(channel, serializer, listener, serializer);
internalSend(channel, sendContext);
}
private void internalSend(TcpChannel channel, SendContext sendContext) throws IOException {
channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
BytesReference reference = sendContext.get();
// stash thread context so that channel event loop is not polluted by thread context
try (ThreadContext.StoredContext existing = threadPool.getThreadContext().stashContext()) {
channel.sendMessage(reference, sendContext);
} catch (RuntimeException ex) {
sendContext.onFailure(ex);
CloseableChannel.closeChannel(channel);
throw ex;
}
}
// org.elasticsearch.transport.netty4.Netty4TcpChannel#sendMessage
@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
// netty Send data, asynchronous callback, complete asynchronous request
channel.writeAndFlush(Netty4Utils.toByteBuf(reference), addPromise(listener, channel));
if (channel.eventLoop().isShutdown()) {
listener.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
}
}
Simply put, it relies on pipeline mechanism of netty and eventLoop to realize remote asynchronous request. As for the specific implementation, please refer to previous articles or various network articles.
The above is a detailed explanation of Java ES multi-node task efficient distribution and collection of implementation details, more on Java ES multi-node task efficient distribution and collection of information please pay attention to other related articles on this site!