当前位置:网站首页>Flink1.15 source code reading Flink clients client execution process (reading is boring)
Flink1.15 source code reading Flink clients client execution process (reading is boring)
2022-07-27 08:34:00 【Jinghe small ant】
Follow the previous article flink-clients debugging , Then read the more detailed source code , It's boring , Read slowly .
Reference link :https://blog.51cto.com/u_13870740/5321778, The explanation was very good
List of articles
- 1. Start reading
- CliFrontend#main Source code analysis
- obtain flink Profile path for
- Parse and load flink-conf.yaml Configuration in the configuration file to Configuration( It's essentially a Map)
- initialization 3 Different species CLI Namely GenericCLI The corresponding is per-job Pattern ,flinkYarnSessionCLI The corresponding is yarn-session Pattern , as well as DefaultCLI The corresponding is standalone Pattern
- initialization CliFrontend Client object
- SecurityUtils.install(new SecurityConfiguration(cli.configuration));
- call parseAndRun Parse and execute the program
1. Start reading
public class CliFrontend {
public static void main(final String[] args) {
}
}

customCommandLines This variable is used to save CustomCommandLine, Its main functions will be explained below ,configuration It's for keeping flink-conf.yaml Configuration information in the configuration file , Equivalent to one map,clusterClientServiceLoader It is used to find the appropriate cluster client factory according to the provided components . Initialize it in the constructor .
CliFrontend#main Source code analysis
/** Submits the job based on the arguments. */
public static void main(final String[] args) {
// obtain JVM Information 、hadoop Information, etc. print logs
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
// obtain flink Profile path for namely : flink/conf/flink-conf.yaml
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
// Parse and load flink-conf.yaml Configuration in the configuration file to Configuration( It's essentially a Map<object,object>)
final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
// initialization 3 Different species CLI Namely GenericCLI The corresponding is per-job Pattern ,flinkYarnSessionCLI The corresponding is yarn-session Pattern , as well as DefaultCLI The corresponding is standalone Pattern
final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(configuration, configurationDirectory);
int retCode = 31;
try {
// initialization CliFrontend Client object
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
// call parseAndRun perform
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
} catch (Throwable t) {
// When reading the source code , Just look at the trunk code ,catch The source code inside can be ignored
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
} finally {
System.exit(retCode);
}
}
main The code logic of the method is very clear , It can be roughly divided into the following 5 A step :
- obtain flink Profile path for namely : flink/conf/flink-conf.yaml
- Parse and load flink-conf.yaml Configuration in the configuration file to Configuration( It's essentially a Map)
- initialization 3 Different species CLI Namely GenericCLI The corresponding is per-job Pattern ,flinkYarnSessionCLI The corresponding is yarn-session Pattern , as well as DefaultCLI The corresponding is standalone Pattern
- initialization CliFrontend Client object
- call parseAndRun Parse and execute the program
Let's take a look at what has been done in each step .
obtain flink Profile path for
final String configurationDirectory = getConfigurationDirectoryFromEnv();
call getConfigurationDirectoryFromEnv()
public static String getConfigurationDirectoryFromEnv() {
// Get from the environment variable FLINK_CONF_DIR value
String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
// If location Not empty
if (location != null) {
// If the file path exists, return directly FLINK_CONF_DIR File path
if (new File(location).exists()) {
return location;
} else {
throw new RuntimeException(
"The configuration directory '"
+ location
+ "', specified in the '"
+ ConfigConstants.ENV_FLINK_CONF_DIR
+ "' environment variable, does not exist.");
}
// If there is 1 return private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
} else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
location = CONFIG_DIRECTORY_FALLBACK_1;
// If there is 2 return private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
} else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
location = CONFIG_DIRECTORY_FALLBACK_2;
} else {
throw new RuntimeException(
"The configuration directory was not specified. "
+ "Please specify the directory containing the configuration file through the '"
+ ConfigConstants.ENV_FLINK_CONF_DIR
+ "' environment variable.");
}
return location;
}
Simple code , Get the value of the system environment variable directly , Then return flink-conf.yaml The path to the configuration file .
Parse and load flink-conf.yaml Configuration in the configuration file to Configuration( It's essentially a Map)
// 2. load the global configuration
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
Get the configuration call method implementation source code as follows :
/** * Loads the configuration files from the specified directory. * Load the configuration file from the specified directory * <p>YAML files are supported as configuration files. * yaml Files are supported as configuration files * @param configDir the directory which contains the configuration files */
public static Configuration loadConfiguration(final String configDir) {
return loadConfiguration(configDir, null);
}
The code pasted from above , We can't see how to get the configuration , Then enter the next level of code
/** * Loads the configuration files from the specified directory. If the dynamic properties * configuration is not null, then it is added to the loaded configuration. * Load the configuration file through the specified file directory , If the dynamic attribute is not empty , Then it can also be loaded into the configuration * @param configDir directory to load the configuration from The specified file path is immutable * @param dynamicProperties configuration file containing the dynamic properties. Null if none. Specify dynamic properties * @return The configuration loaded from the given configuration directory */
public static Configuration loadConfiguration(
final String configDir, @Nullable final Configuration dynamicProperties) {
// Judge configDir It's empty , Throw an exception
if (configDir == null) {
throw new IllegalArgumentException(
"Given configuration directory is null, cannot load configuration");
}
// If this file does not exist , Throw an exception
final File confDirFile = new File(configDir);
if (!(confDirFile.exists())) {
throw new IllegalConfigurationException(
"The given configuration directory name '"
+ configDir
+ "' ("
+ confDirFile.getAbsolutePath()
+ ") does not describe an existing directory.");
}
// get Flink yaml configuration file
// The English notes are very clear , obtain yaml file
final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
// If not, throw an exception
if (!yamlConfigFile.exists()) {
throw new IllegalConfigurationException(
"The Flink config file '"
+ yamlConfigFile
+ "' ("
+ yamlConfigFile.getAbsolutePath()
+ ") does not exist.");
}
// Load through this method yaml To configure
Configuration configuration = loadYAMLResource(yamlConfigFile);
// If the dynamic configuration is not empty , Then all are loaded into the configuration object
if (dynamicProperties != null) {
configuration.addAll(dynamicProperties);
}
// over Return the configuration object
return configuration;
}
I still don't see how the source code loads the configuration in the file into the configuration , Look again loadYAMLResource(yamlConfigFile); The concrete realization of
/** * Loads a YAML-file of key-value pairs. * * <p>Colon and whitespace ": " separate key and value (one per line). The hash tag "#" starts a * single-line comment. * * <p>Example: * * <pre> * jobmanager.rpc.address: localhost # network address for communication with the job manager * jobmanager.rpc.port : 6123 # network port to connect to for communication with the job manager * taskmanager.rpc.port : 6122 # network port the task manager expects incoming IPC connections * </pre> * * <p>This does not span the whole YAML specification, but only the *syntax* of simple YAML * key-value pairs (see issue #113 on GitHub). If at any point in time, there is a need to go * beyond simple key-value pairs syntax compatibility will allow to introduce a YAML parser * library. * * @param file the YAML file to read from * @see <a href="http://www.yaml.org/spec/1.2/spec.html">YAML 1.2 specification</a> */
private static Configuration loadYAMLResource(File file) {
final Configuration config = new Configuration();
// It seems that these are also used at the bottom of the source code
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(new FileInputStream(file)))) {
// Initialize the line number cursor
String line;
int lineNo = 0;
// loop reader by null Out of the loop
while ((line = reader.readLine()) != null) {
lineNo++;
// 1. check for comments
String[] comments = line.split("#", 2);
String conf = comments[0].trim();
// 2. get key and value
if (conf.length() > 0) {
String[] kv = conf.split(": ", 2);
// skip line with no valid key-value pair
if (kv.length == 1) {
LOG.warn(
"Error while trying to split key and value in configuration file "
+ file
+ ":"
+ lineNo
+ ": \""
+ line
+ "\"");
continue;
}
// Get configured key: value
// such as ,jobmanager.rpc.address: flink1
// key: jobmanager.rpc.address
// value: flink1
String key = kv[0].trim();
String value = kv[1].trim();
// sanity check
if (key.length() == 0 || value.length() == 0) {
LOG.warn(
"Error after splitting key and value in configuration file "
+ file
+ ":"
+ lineNo
+ ": \""
+ line
+ "\"");
continue;
}
LOG.info(
"Loading configuration property: {}, {}",
key,
isSensitive(key) ? HIDDEN_CONTENT : value);
config.setString(key, value);
}
}
} catch (IOException e) {
throw new RuntimeException("Error parsing YAML configuration.", e);
}
return config;
}
ok , The original read configuration file is like this , Is it similar to the file reading program written by ourselves .
initialization 3 Different species CLI Namely GenericCLI The corresponding is per-job Pattern ,flinkYarnSessionCLI The corresponding is yarn-session Pattern , as well as DefaultCLI The corresponding is standalone Pattern
// 3. load the custom command lines
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
Enter into loadCustomCommandLines In the method ,
public static List<CustomCommandLine> loadCustomCommandLines(
Configuration configuration, String configurationDirectory) {
// Initialize the command line set
List<CustomCommandLine> customCommandLines = new ArrayList<>();
// 1. add to GenericCLI To the command line set
customCommandLines.add(new GenericCLI(configuration, configurationDirectory));
// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
// The command line interface belongs to YARN session, Is a special initialization Prefix options are y/yarn,
// You can see org.apache.flink.yarn.cli.FlinkYarnSessionCli
final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
try {
// 2. add to flinkYarnSessionCLI To the command line set
customCommandLines.add(
loadCustomCommandLine(
flinkYarnSessionCLI,
configuration,
configurationDirectory,
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
try {
LOG.info("Loading FallbackYarnSessionCli");
customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
} catch (Exception exception) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
}
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get
// the
// active CustomCommandLine in order and DefaultCLI isActive always return true.
// Roughly translate from the above three lines : Tips :DefaultCLI Must be added last , because getActiveCustomCommandLine(..) The activated... Will be obtained in sequence CustomCommandLine, and DefaultCLI isActive Always returns true.
// 3. add to DefaultCLI To the command line set
customCommandLines.add(new DefaultCLI());
return customCommandLines;
}
loadCustomCommandLines It is mainly used to initialize CustomCommandLine Of , Return to one CustomCommandLine Set .CustomCommandLine Is a custom command line interface used to load the command line .
There are mainly three different CustomCommandLine Implementation class , Namely GenericCLI,FlinkYarnSessionCli,DefaultCLI.
Although at first glance, it can be realized CustomCommandLine There are so many implementation classes of interfaces , But the three implementation classes above are really used , Others can be classified as test or exception implementation classes .
If you want to specifically compare the differences between the three , Please move to my other article 《Flink1.15 Source code reading flink-clients And GenericCLI、flinkYarnSessionCLI and DefaultCLI》.
initialization CliFrontend Client object
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
Then look at
public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines) {
this(configuration, new DefaultClusterClientServiceLoader(), customCommandLines);
}
public CliFrontend(
Configuration configuration,
ClusterClientServiceLoader clusterClientServiceLoader,
List<CustomCommandLine> customCommandLines) {
this.configuration = checkNotNull(configuration);
this.customCommandLines = checkNotNull(customCommandLines);
this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
FileSystem.initialize(
configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
this.customCommandLineOptions = new Options();
for (CustomCommandLine customCommandLine : customCommandLines) {
customCommandLine.addGeneralOptions(customCommandLineOptions);
customCommandLine.addRunOptions(customCommandLineOptions);
}
this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
}
Obtained from the above configuration and customCommandLines Information initialization CliFrontend object .
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
// Install security configuration
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
/** * Installs a process-wide security configuration. * * <p>Applies the configuration using the available security modules (i.e. Hadoop, JAAS). */
public static void install(SecurityConfiguration config) throws Exception {
// Install the security modules first before installing the security context
installModules(config);
installContext(config);
}
static void installModules(SecurityConfiguration config) throws Exception {
// install the security module factories
List<SecurityModule> modules = new ArrayList<>();
for (String moduleFactoryClass : config.getSecurityModuleFactories()) {
SecurityModuleFactory moduleFactory = null;
try {
moduleFactory = SecurityFactoryServiceLoader.findModuleFactory(moduleFactoryClass);
} catch (NoMatchSecurityFactoryException ne) {
LOG.error("Unable to instantiate security module factory {}", moduleFactoryClass);
throw new IllegalArgumentException("Unable to find module factory class", ne);
}
SecurityModule module = moduleFactory.createModule(config);
// can be null if a SecurityModule is not supported in the current environment
if (module != null) {
module.install();
modules.add(module);
}
}
installedModules = modules;
}
static void installContext(SecurityConfiguration config) throws Exception {
// install the security context factory
for (String contextFactoryClass : config.getSecurityContextFactories()) {
try {
SecurityContextFactory contextFactory =
SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
if (contextFactory.isCompatibleWith(config)) {
try {
installedContext = contextFactory.createContext(config);
// install the first context that's compatible and ignore the remaining.
break;
} catch (SecurityContextInitializeException e) {
LOG.error(
"Cannot instantiate security context with: " + contextFactoryClass,
e);
} catch (LinkageError le) {
LOG.error(
"Error occur when instantiate security context with: "
+ contextFactoryClass,
le);
}
} else {
LOG.debug("Unable to install security context factory {}", contextFactoryClass);
}
} catch (NoMatchSecurityFactoryException ne) {
LOG.warn("Unable to instantiate security context factory {}", contextFactoryClass);
}
}
if (installedContext == null) {
LOG.error("Unable to install a valid security context factory!");
throw new Exception("Unable to install a valid security context factory!");
}
}
call parseAndRun Parse and execute the program
parseAndRun Analyze and run the program source code
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
So let's look down
/** * Parses the command line arguments and starts the requested action. * * @param args command line arguments of the client. * @return The return code of the program */
public int parseAndRun(String[] args) {
// check for action
if (args.length < 1) {
CliFrontendParser.printHelp(customCommandLines);
System.out.println("Please specify an action.");
return 1;
}
// get action
// Get the upcoming action ID
String action = args[0];
// remove action from parameters
final String[] params = Arrays.copyOfRange(args, 1, args.length);
try {
// do action
switch (action) {
case ACTION_RUN:
// This time just say run
run(params);
return 0;
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
case ACTION_LIST:
list(params);
return 0;
case ACTION_INFO:
info(params);
return 0;
case ACTION_CANCEL:
cancel(params);
return 0;
case ACTION_STOP:
stop(params);
return 0;
case ACTION_SAVEPOINT:
savepoint(params);
return 0;
case "-h":
case "--help":
CliFrontendParser.printHelp(customCommandLines);
return 0;
case "-v":
case "--version":
String version = EnvironmentInformation.getVersion();
String commitID = EnvironmentInformation.getRevisionInformation().commitId;
System.out.print("Version: " + version);
System.out.println(
commitID.equals(EnvironmentInformation.UNKNOWN)
? ""
: ", Commit ID: " + commitID);
return 0;
default:
System.out.printf("\"%s\" is not a valid action.\n", action);
System.out.println();
System.out.println(
"Valid actions are \"run\", \"run-application\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
System.out.println();
System.out.println(
"Specify the version option (-v or --version) to print Flink version.");
System.out.println();
System.out.println(
"Specify the help option (-h or --help) to get help on the command.");
return 1;
}
} catch (CliArgsException ce) {
return handleArgException(ce);
} catch (ProgramParametrizationException ppe) {
return handleParametrizationException(ppe);
} catch (ProgramMissingJobException pmje) {
return handleMissingJobException();
} catch (Exception e) {
return handleError(e);
}
}
front 5 Each step is to do some preparatory work , The last step is to really start executing the program , Because what we do is flink run command , So I will go to run(params) Method inside .
/** * Executions the run action. * * @param args Command line arguments for the run action. */
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
final List<URL> jobJars = getJobJarAndDependencies(programOptions);
final Configuration effectiveConfiguration =
getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
try (
PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
// Really start implementing
executeProgram(effectiveConfiguration, program);
}
}
First of all, I will get flink be-all options, Then get what we entered flink command . If there is h Will print help Information . Next, you will get the information in active State of CustomCommandLine, What we get here should be GenericCLI . Then get the submitted by the user jar Packages and dependency packages , Last call executeProgram Start the real execution program .
Let's move on , from executeProgram Method click in
protected void executeProgram(final Configuration configuration, final PackagedProgram program)
throws ProgramInvocationException {
ClientUtils.executeProgram(
new DefaultExecutorServiceLoader(), configuration, program, false, false);
}
next see see
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout)
throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
// Get the class loader of user code , By default ChildFirstClassLoader This can be configured in the configuration file
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
// Get the classloader of the current thread
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
// Set the classloader of the current thread to ChildFirstClassLoader
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
LOG.info(
"Starting program (detached: {})",
!configuration.getBoolean(DeploymentOptions.ATTACHED));
// Initialization context configuration information ContextEnvironment
ContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
// initialization StreamContextEnvironment
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
try {
// Execute user written code through reflection
program.invokeInteractiveModeForExecution();
} finally {
ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();
}
} finally {
// Finally, switch the class loader back
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
First, the class loader that will get the user code , By default ChildFirstClassLoader This can be flink-conf.yaml Configuration file
#Flink Class loading strategy of
classloader.resolve-order: child-first/parent-first
Here is a very interesting place , Get the classloader of the current thread contextClassLoader , Then set the classloader of the current thread to ChildFirstClassLoader perhaps ParentFirstClassLoader And then initialization ContextEnvironment and StreamContextEnvironment Context configuration information , Finally, it is called by reflection invokeInteractiveModeForExecution Method , That is, executing the user's code , Pay attention to the last finally After the user code is executed, the classloader of the current thread is switched to the previous contextClassLoader, It is equivalent to switching the thread class loader , In this way , User code and flink Isolation of framework code .
Next, let's look at the code
/** * This method assumes that the context environment is prepared, or the execution will be a * local execution by default. */
public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
try {
callMainMethod(mainClass, args);
} finally {
FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
}
}
The above code calls callMainMethod(mainClass, args);, It should be possible to think of , What is called here is the main method of user code
private static void callMainMethod(Class<?> entryClass, String[] args)
throws ProgramInvocationException {
Method mainMethod;
if (!Modifier.isPublic(entryClass.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " must be public.");
}
try {
// Get main Method
mainMethod = entryClass.getMethod("main", String[].class);
} catch (NoSuchMethodException e) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " has no main(String[]) method.");
} catch (Throwable t) {
throw new ProgramInvocationException(
"Could not look up the main(String[]) method from the class "
+ entryClass.getName()
+ ": "
+ t.getMessage(),
t);
}
if (!Modifier.isStatic(mainMethod.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " declares a non-static main method.");
}
if (!Modifier.isPublic(mainMethod.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " declares a non-public main method.");
}
try {
// Here is the Lord , Call user code main The program starts here
mainMethod.invoke(null, (Object) args);
} catch (IllegalArgumentException e) {
throw new ProgramInvocationException(
"Could not invoke the main method, arguments are not matching.", e);
} catch (IllegalAccessException e) {
throw new ProgramInvocationException(
"Access to the main method was denied: " + e.getMessage(), e);
} catch (InvocationTargetException e) {
Throwable exceptionInMethod = e.getTargetException();
if (exceptionInMethod instanceof Error) {
throw (Error) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramParametrizationException) {
throw (ProgramParametrizationException) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramInvocationException) {
throw (ProgramInvocationException) exceptionInMethod;
} else {
throw new ProgramInvocationException(
"The main method caused an error: " + exceptionInMethod.getMessage(),
exceptionInMethod);
}
} catch (Throwable t) {
throw new ProgramInvocationException(
"An error occurred while invoking the program's main method: " + t.getMessage(),
t);
}
}
stay callMainMethod In the method , First, judge whether this kind of decoration is public Of , Then get the main method , Continue to determine whether the main method is a static method and whether it is public Method , Finally through mainMethod.invoke(null,(Object) args) Call user code .
This article mainly talked about flink-client modular client End start process , So as to deepen the understanding of flink Understanding of the underlying execution process , In addition, I also read about user code and flink Logic of self framework code isolation ;
First of all, the client execution process must often be asked by the interview , User code isolation failed once two years ago , These are the key points .
边栏推荐
- All in one 1319 - queue for water
- Attack and defense World Lottery
- Shenzhi Kalan Temple
- All in one 1251 - Fairy Island for medicine (breadth first search)
- [BJDCTF2020]EasySearch 1
- CMD command and NPM command
- It's better to be full than delicious; It's better to be drunk than drunk
- Luogu super Mary game
- Function realization of order system
- P7 Day1 get to know the flask framework
猜你喜欢

情人节,我用字符画出了一个对象!

Forced login, seven cattle cloud upload pictures

带宽 与 货币

The third letter to the little sister of the test | Oracle stored procedure knowledge sharing and test instructions

Zhongang Mining: the new energy industry is developing rapidly, and fluorine chemical products have a strong momentum

Day5 - Flame restful request response and Sqlalchemy Foundation

"PHP Basics" PHP statements and statement blocks

面试官:什么是脚手架?为什么需要脚手架?常用的脚手架有哪些?

Using ecological power, opengauss breaks through the performance bottleneck

Ubuntu: install PostgreSQL
随机推荐
MCDF顶层验证方案
while Loop
pytorch_ demo1
缓存一致性与内存屏障
Do a reptile project by yourself
JWT authentication and login function implementation, exit login
First experience of tryme in opengauss
Dirsearch[directory scanning tool]
无法获取下列许可SOLIDWORKS Standard,无法找到使用许可文件。(-1,359,2)。
Process control - Branch
Element display mode: block level, inline, inline block, nesting specification, display mode conversion
Hundreds of people participated. What are these people talking about in the opengauss open source community?
[MRCTF2020]Ezpop 1
[uni app advanced practice] take you hand-in-hand to learn the development of a purely practical complex project 1/100
Using ecological power, opengauss breaks through the performance bottleneck
VS Code中#include报错(新建的头文件)
Apache SSI remote command execution vulnerability
阿里云国际版回执消息简介与配置流程
Minio installation and use
Use of "PHP Basics" delimiters