Spark提交Yarn的详细过程

这篇文章主要讲解了“Spark提交Yarn的详细过程”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Spark提交Yarn的详细过程”吧!

这篇文章主要讲解了“Spark提交Yarn的详细过程”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Spark提交Yarn的详细过程”吧!

网站的建设成都创新互联公司专注网站定制,经验丰富,不做模板,主营网站定制开发.小程序定制开发,H5页面制作!给你焕然一新的设计体验!已为隧道混凝土搅拌车等企业提供专业服务。

spark-submit.sh-> spark-class.sh,然后调用SparkSubmit.scala。

根据client或者cluster模式处理方式不一样。

client:直接在spark-class.sh运行的地方包装要给进程来执行driver。

cluster:将driver提交到集群去执行。

核心在SparkSubmit.scala的prepareSubmitEnvironment方法中,截取一段处理Yarn集群环境的看一下。

// In client mode, launch the application main class directly    // In addition, add the main application jar and any added jars (if any) to the classpath    if (deployMode == CLIENT) {      childMainClass = args.mainClass      if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {        childClasspath += localPrimaryResource      }      if (localJars != null) { childClasspath ++= localJars.split(",") }    }

client模式,childMainClass就是driver的main方法。

接下来看看Yarn cluster模式:

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class    if (isYarnCluster) {      childMainClass = YARN_CLUSTER_SUBMIT_CLASS      if (args.isPython) {        childArgs += ("--primary-py-file", args.primaryResource)        childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")      } else if (args.isR) {        val mainFile = new Path(args.primaryResource).getName        childArgs += ("--primary-r-file", mainFile)        childArgs += ("--class", "org.apache.spark.deploy.RRunner")      } else {        if (args.primaryResource != SparkLauncher.NO_RESOURCE) {          childArgs += ("--jar", args.primaryResource)        }        childArgs += ("--class", args.mainClass)      }      if (args.childArgs != null) {        args.childArgs.foreach { arg => childArgs += ("--arg", arg) }      }    }

这时候childMainClass变成了

YARN_CLUSTER_SUBMIT_CLASS =    "org.apache.spark.deploy.yarn.YarnClusterApplication"

private[spark] class YarnClusterApplication extends SparkApplication {  override def start(args: Array[String], conf: SparkConf): Unit = {    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,    // so remove them from sparkConf here for yarn mode.    conf.remove(JARS)    conf.remove(FILES)    new Client(new ClientArguments(args), conf, null).run()  }}

看源码可以看到,YarnClusterApplication最终是用到了deploy/yarn/Client.scala

client.run调用client.submitApplication方法提交到Yarn集群。

def submitApplication(): ApplicationId = {     // Set up the appropriate contexts to launch our AM      val containerContext = createContainerLaunchContext(newAppResponse)      val appContext = createApplicationSubmissionContext(newApp, containerContext)}

主要是createContainerLaunchContext方法:

 /**   * Set up a ContainerLaunchContext to launch our ApplicationMaster container.   * This sets up the launch environment, java options, and the command for launching the AM.   */private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse){val userClass =      if (isClusterMode) {        Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))      } else {        Nil      }     val amClass =      if (isClusterMode) {        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName      } else {        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName      } val amArgs =      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++      Seq("--properties-file",        buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++      Seq("--dist-cache-conf",        buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE))    // Command for the ApplicationMaster    val commands = prefixEnv ++      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++      javaOpts ++ amArgs ++      Seq(        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")}

这样就生成要执行的命令了,就是Command。上面这句话啥意思呢:

(1)cluster模式

用ApplicationMaster启动userClass。

(2)client模式

启动Executor

这里我们要看的是cluster模式,至此就清楚了,在cluster模式下,在Yarn集群中用ApplicationMaster包装了userClass并启动。userClass就是driver的意思。


分享题目:Spark提交Yarn的详细过程
转载来源:http://myzitong.com/article/epipod.html