当前位置:网站首页>Spark source code-task submission process-6.1-sparkContext initialization-create spark driver side execution environment SparkEnv

Spark source code-task submission process-6.1-sparkContext initialization-create spark driver side execution environment SparkEnv

2022-08-05 06:09:00 zdaiqing

1.过程分析

1.1.入口

spark的driverThe creation of the side execution environment is in sparkContext的初始化过程中完成;
The specific entry location is shown in the following code snippet:在sparkContext类的try catch代码块中;
调用链:try catch块 ->createSparkEnv方法->SparkEnv.createDriverEnv方法;

class SparkContext(config: SparkConf) extends Logging {
    
  //无关代码省略...
  //创建spark env调用方法
  private[spark] def createSparkEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus): SparkEnv = {
    
    //指定driver端可以使用的cpu核数
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
  }
  //无关代码省略...
  try {
    
	//无关代码省略...
    // Create the Spark execution environment (cache, map output tracker, etc)
    //创建spark env入口
    _env = createSparkEnv(_conf, isLocal, listenerBus)
    SparkEnv.set(_env)
	//无关代码省略...
  } catch {
    
    //无关代码省略... 
  }
}

1.1.1.driver端cpu核数指定

本地模式下:
	没有指定线程数,设置默认值1
	指定线程数,Set the value according to the number of threads;
		如果线程数为*号,根据jvm虚拟机可用cpu数量确定;
yarn集群模式下,根据spark.driver.cores参数确定,默认值0;
其他情况,默认值设0;
object SparkContext extends Logging {
    
  private[spark] def numDriverCores(master: String, conf: SparkConf): Int = {
    
    //String -> Int
    def convertToInt(threads: String): Int = {
    
      if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
    }
    master match {
    
      //本地模式,Default if not specified1
      case "local" => 1
      //Local mode specifies the number of threads,根据线程数确定
      case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
      case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
      //yarn集群模式下,根据spark.driver.cores参数确定,默认值0
      case "yarn" =>
        if (conf != null && conf.getOption("spark.submit.deployMode").contains("cluster")) {
    
          conf.getInt("spark.driver.cores", 0)
        } else {
    
          0
        }
      //driverNot used or set behinddriver cpuIn the case of nuclear numbers,默认值0
      case _ => 0 // Either driver is not being used, or its core count will be interpolated later
    }
  }
}

1.2.createDriverEnv 构建driverside execution environment

object SparkEnv extends Logging {
    
  private[spark] def createDriverEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus,
      numCores: Int,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
    
    assert(conf.contains(DRIVER_HOST_ADDRESS),
      s"${
      DRIVER_HOST_ADDRESS.key} is not set on the driver!")
    assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
    //driverThe address of the network listening socket to which the endpoint is bound
    val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
    //drvier的host
    val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
    //driver的port
    val port = conf.get("spark.driver.port").toInt
    //io加密密钥
    val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
    
      Some(CryptoStreamUtils.createKey(conf))
    } else {
    
      None
    }
    //构建driverside execution environment
    create(
      conf,
      SparkContext.DRIVER_IDENTIFIER,//指定driver端executor的id
      bindAddress,
      advertiseAddress,
      Option(port),
      isLocal,
      numCores,
      ioEncryptionKey,
      listenerBus = listenerBus,
      mockOutputCommitCoordinator = mockOutputCommitCoordinator
    )
  }
}

1.2.1.driver端executor id

private[spark] val DRIVER_IDENTIFIER = "driver"

1.2.2.执行环境sparkEnv的属性

class SparkEnv (
    val executorId: String,
    private[spark] val rpcEnv: RpcEnv,
    val serializer: Serializer,//spark.serializerThe serializer specified by the parameter
    val closureSerializer: Serializer,//java序列化器
    val serializerManager: SerializerManager,//Serializer manager
    val mapOutputTracker: MapOutputTracker,//Map任务输出跟踪器
    val shuffleManager: ShuffleManager,//shuffle管理器
    val broadcastManager: BroadcastManager,//广播管理器
    val blockManager: BlockManager,//block管理器
    val securityManager: SecurityManager,//安全管理器
    val metricsSystem: MetricsSystem,//Metrics of the system
    val memoryManager: MemoryManager,//内存管理器
    val outputCommitCoordinator: OutputCommitCoordinator,//输出提交协调器
    val conf: SparkConf
    ) extends Logging {
    }

1.2.3.create The underlying logic of execution environment construction

1.2.3.1.driver端判断

	//Judgment is made based on incoming
	val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

    //driverThe client requires that the listener bus cannot be empty
    if (isDriver) {
    
      assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
    }

1.2.3.2.securityManageSecurity manager initialization

	val securityManager = new SecurityManager(conf, ioEncryptionKey)
    if (isDriver) {
    
      securityManager.initializeAuth()
    }

1.2.3.3.rpcEnv 初始化

	val systemName = if (isDriver) driverSystemName else executorSystemName
    val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
      securityManager, numUsableCores, !isDriver)

1.2.3.4.spark.serializerThe parameter specifies the serializer initialization

	val serializer = instantiateClassFromConf[Serializer](
      "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
    logDebug(s"Using serializer: ${
      serializer.getClass}")

1.2.3.5.serializerManager Serialization manager initialization

	val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

1.2.3.6.javaSerializer initialization

	val closureSerializer = new JavaSerializer(conf)

1.2.3.7.broadcastManager Broadcast manager initialization

	val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

1.2.3.8.mapOutputTracker MapTask output tracker initialization

	val mapOutputTracker = if (isDriver) {
    
      new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
    } else {
    
      new MapOutputTrackerWorker(conf)
    }

    // 分配节点
    mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
      new MapOutputTrackerMasterEndpoint(
        rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

1.2.3.9.shuffleManager初始化

	val shortShuffleMgrNames = Map(
      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    val shuffleMgrClass =
      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

1.2.3.10.useLegacyMemoryManager Unified memory management initialization

	val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
    val memoryManager: MemoryManager =
      if (useLegacyMemoryManager) {
    
        new StaticMemoryManager(conf, numUsableCores)
      } else {
    
        UnifiedMemoryManager(conf, numUsableCores)
      }

1.2.3.11.blockManager初始化

	val blockManagerPort = if (isDriver) {
    
      conf.get(DRIVER_BLOCK_MANAGER_PORT)
    } else {
    
      conf.get(BLOCK_MANAGER_PORT)
    }

    val blockTransferService =
      new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
        blockManagerPort, numUsableCores)

    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
      BlockManagerMaster.DRIVER_ENDPOINT_NAME,
      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
      conf, isDriver)

    // NB: blockManager is not valid until initialize() is called later.
    val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
      serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
      blockTransferService, securityManager, numUsableCores)

1.2.3.12.metricsSystem The indicator management system is initialized

	val metricsSystem = if (isDriver) {
    
      // Don't start metrics system right now for Driver.
      // We need to wait for the task scheduler to give us an app ID.
      // Then we can start the metrics system.
      MetricsSystem.createMetricsSystem("driver", conf, securityManager)
    } else {
    
      // We need to set the executor ID before the MetricsSystem is created because sources and
      // sinks specified in the metrics configuration file will want to incorporate this executor's
      // ID into the metrics they report.
      conf.set("spark.executor.id", executorId)
      val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
      ms.start()
      ms
    }

1.2.3.13.outputCommitCoordinator Output commit coordinator initialization

	val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
    
      new OutputCommitCoordinator(conf, isDriver)
    }
    val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
      new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
    outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)

1.2.3.14.driverCreate a temporary directory on the side

	// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
    // called, and we only need to do it for driver. Because driver may run as a service, and if we
    // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
    if (isDriver) {
    
      val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
      envInstance.driverTmpDir = Some(sparkFilesDir)
    }

2.参考资料

创建Spark执行环境SparkEnv
Spark源码分析18 - 调度系统03:OutputCommitCoordinator和MapOutputTracker
spark MetricsSystem 完全揭秘
Spark执行环境——安全管理器SecurityManager
Spark2.1.0——广播管理器BroadcastManager
Spark-shuffleRead the source code carefully:从ShuffleManager开始
Spark原理分析–Implementation of unified memory management

原网站

版权声明
本文为[zdaiqing]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/217/202208050514371430.html