博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
创建Spark执行环境SparkEnv
阅读量:5047 次
发布时间:2019-06-12

本文共 8423 字,大约阅读时间需要 28 分钟。

SparkDriver 用于提交用户的应用程序, 

一、SparkConf

负责SparkContext的配置参数加载, 主要通过ConcurrentHashMap来维护各种`spark.*`的配置属性

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {    import SparkConf._    /** Create a SparkConf that loads defaults from system properties and the classpath */    def this() = this(true)    /**     * 维护一个ConcurrentHashMap 来存储spark配置     */    private val settings = new ConcurrentHashMap[String, String]()    @transient private lazy val reader: ConfigReader = {        val _reader = new ConfigReader(new SparkConfigProvider(settings))        _reader.bindEnv(new ConfigProvider {            override def get(key: String): Option[String] = Option(getenv(key))        })        _reader    }    if (loadDefaults) {        loadFromSystemProperties(false)    }    /**     * 加载spark.*的配置     * @param silent     * @return     */    private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {        // Load any spark.* system properties, 只加载spark.*的配置        for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {            set(key, value, silent)        }        this    }}
View Code

 

二、SparkContext

2.1、创建Spark执行环境SparkEnv

SparkEnv是Spark的执行环境对象, 其中包括众多与Executor执行相关的对象。

创建, 主要通过SparkEnv.createSparkEnv, SparkContext初始化,只创建SparkEnv

def isLocal: Boolean = Utils.isLocalMaster(_conf)  // An asynchronous listener bus for Spark events  //采用监听器模式维护各类事件的处理  private[spark] val listenerBus = new LiveListenerBus(this)  // This function allows components created by SparkEnv to be mocked in unit tests:  private[spark] def createSparkEnv(      conf: SparkConf,      isLocal: Boolean,      listenerBus: LiveListenerBus): SparkEnv = {    //创建DriverEnv    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))  }
View Code

继续进入createDriverEnv, 发现调用的是create方法, 该方法是为Driver或Executor创建SparkEnv

点击createExecutorEnv发现是CoarseGrainedExecutorBackend调用

 

下面具体看看create()中做了什么操作

 2.1.1、创建SecurityManager

//创建SecurityManager    val securityManager = new SecurityManager(conf, ioEncryptionKey)    ioEncryptionKey.foreach { _ =>      if (!securityManager.isSaslEncryptionEnabled()) {        logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +          "wire.")      }    }
View Code

2.1.2、创建RpcEnv

val systemName = if (isDriver) driverSystemName else executorSystemName    val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,      securityManager, clientMode = !isDriver)
View Code

 2.1.3、通过反射创建序列化器, 此处默认创建JavaSerializer

// Create an instance of the class with the given name, possibly initializing it with our conf    def instantiateClass[T](className: String): T = {      val cls = Utils.classForName(className)      // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just      // SparkConf, then one taking no arguments      try {        cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)          .newInstance(conf, new java.lang.Boolean(isDriver))          .asInstanceOf[T]      } catch {        case _: NoSuchMethodException =>          try {            cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]          } catch {            case _: NoSuchMethodException =>              cls.getConstructor().newInstance().asInstanceOf[T]          }      }    }    // Create an instance of the class named by the given SparkConf property, or defaultClassName    // if the property is not set, possibly initializing it with our conf    def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {      instantiateClass[T](conf.get(propertyName, defaultClassName))    }    val serializer = instantiateClassFromConf[Serializer](      "spark.serializer", "org.apache.spark.serializer.JavaSerializer")    logDebug(s"Using serializer: ${serializer.getClass}")
View Code

2.1.3、创建SerializeManager 

val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)    val closureSerializer = new JavaSerializer(conf)
View Code

2.1.4、创建BroadcastManager

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
View Code

2.1.5、创建MapOutputTracker

def registerOrLookupEndpoint(        name: String, endpointCreator: => RpcEndpoint):      RpcEndpointRef = {      if (isDriver) {        logInfo("Registering " + name)        rpcEnv.setupEndpoint(name, endpointCreator)      } else {        RpcUtils.makeDriverRef(name, conf, rpcEnv)      }    }    val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)    //创建MapOutputTracker 区分Driver, Executor    val mapOutputTracker = if (isDriver) {      //Driver需要BroadcastManager      new MapOutputTrackerMaster(conf, broadcastManager, isLocal)    } else {      new MapOutputTrackerWorker(conf)    }    // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint    // requires the MapOutputTracker itself    mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,      new MapOutputTrackerMasterEndpoint(        rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
View Code

2.1.6、创建ShuffleManager

// Let the user specify short names for shuffle managers    val shortShuffleMgrNames = Map(      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")    val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
View Code

2.1.7、创建 BlockManager

val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)    val memoryManager: MemoryManager =      if (useLegacyMemoryManager) {        new StaticMemoryManager(conf, numUsableCores)      } else {        UnifiedMemoryManager(conf, numUsableCores)      }    val blockManagerPort = if (isDriver) {      conf.get(DRIVER_BLOCK_MANAGER_PORT)    } else {      conf.get(BLOCK_MANAGER_PORT)    }    val blockTransferService =      new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,        blockManagerPort, numUsableCores)    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(      BlockManagerMaster.DRIVER_ENDPOINT_NAME,      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),      conf, isDriver)    // NB: blockManager is not valid until initialize() is called later.    val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,      serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,      blockTransferService, securityManager, numUsableCores)
View Code

2.1.8、创建MetricsSystem

val metricsSystem = if (isDriver) {      // Don't start metrics system right now for Driver.      // We need to wait for the task scheduler to give us an app ID.      // Then we can start the metrics system.      MetricsSystem.createMetricsSystem("driver", conf, securityManager)    } else {      // We need to set the executor ID before the MetricsSystem is created because sources and      // sinks specified in the metrics configuration file will want to incorporate this executor's      // ID into the metrics they report.      conf.set("spark.executor.id", executorId)      val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)      ms.start()      ms    }
View Code

2.1.9、创建SparkEnv实例

val envInstance = new SparkEnv(      executorId,      rpcEnv,      serializer,      closureSerializer,      serializerManager,      mapOutputTracker,      shuffleManager,      broadcastManager,      blockManager,      securityManager,      metricsSystem,      memoryManager,      outputCommitCoordinator,      conf)
View Code

2.1.10、创建临时文件

// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is    // called, and we only need to do it for driver. Because driver may run as a service, and if we    // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.    if (isDriver) {      val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath      envInstance.driverTmpDir = Some(sparkFilesDir)    }
View Code

  

转载于:https://www.cnblogs.com/chengbao/p/10604758.html

你可能感兴趣的文章
poi 处理空单元格
查看>>
Android 内存泄漏优化总结
查看>>
luogu4849 寻找宝藏 (cdq分治+dp)
查看>>
Spring Cloud微服务笔记(五)Feign
查看>>
C语言键盘按键列表
查看>>
Codeforces Round #374 (Div. 2)
查看>>
oracle数据类型
查看>>
socket
查看>>
Vue中使用key的作用
查看>>
二叉索引树 树状数组
查看>>
日志框架--(一)基础篇
查看>>
Java设计模式之原型模式
查看>>
Spring学习(四)-----Spring Bean引用同xml和不同xml bean的例子
查看>>
哲理故事与管理之道(20)-用危机激励下属
查看>>
关于源程序到可运行程序的过程
查看>>
wepy的使用
查看>>
转载:mysql数据库密码忘记找回方法
查看>>
scratch少儿编程第一季——06、人在江湖混,没有背景怎么行。
查看>>
面向对象1
查看>>
在ns2.35中添加myevalvid框架
查看>>