博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark里边:Worker源代码分析和架构
阅读量:7052 次
发布时间:2019-06-28

本文共 2638 字,大约阅读时间需要 8 分钟。

首先由Spark图表理解Worker于Spark中的作用和地位:

Worker所起的作用有下面几个:

1. 接受Master的指令,启动或者杀掉Executor

2. 接受Master的指令,启动或者杀掉Driver

3. 报告Executor/Driver的状态到Master

4. 心跳到Master。心跳超时则Master觉得Worker已经挂了不能工作了

5. 向GUI报告Worker的状态

说白了,Worker就是整个集群真正干活的。首先看一下Worker重要的数据结构:

val executors = new HashMap[String, ExecutorRunner]  val finishedExecutors = new HashMap[String, ExecutorRunner]  val drivers = new HashMap[String, DriverRunner]  val finishedDrivers = new HashMap[String, DriverRunner]

这些Hash Map存储了名字和实体时间的相应关系,方便通过名字直接找到实体进行调用。

看一下怎样启动Executor:

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>      if (masterUrl != activeMasterUrl) {        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")      } else {        try {          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))          val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,            self, workerId, host,            appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),            workDir, akkaUrl, ExecutorState.RUNNING)          executors(appId + "/" + execId) = manager          manager.start()          coresUsed += cores_          memoryUsed += memory_          masterLock.synchronized {            master ! ExecutorStateChanged(appId, execId, manager.state, None, None)          }        } catch {          case e: Exception => {            logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))            if (executors.contains(appId + "/" + execId)) {              executors(appId + "/" + execId).kill()              executors -= appId + "/" + execId            }            masterLock.synchronized {              master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)            }          }        }

1行到3行是验证该命令是否发自一个合法的Master。7到10行定义了一个ExecutorRunner,实际上系统并没有一个类叫做Executor。我们所说的Executor实际上是由ExecutorRunner实现的,这个名字起得也比較贴切。11行将新建的executor放到上面提到的Hash Map中。

然后12行启动这个Executor。13行和14行将如今已经使用的core和memory进行的统计。15到17行实际上是向Master报告Executor的状态。这里须要加锁。

假设在这过程中有异常抛出,那么须要check是否是executor已经加到Hash Map中,假设有则首先停止它。然后从Hash Map中删除它。而且向Master report Executor是FAILED的。Master会又一次启动新的Executor。

接下来看一下Driver的Hash Map的使用。通过KillDriver:

case KillDriver(driverId) => {      logInfo(s"Asked to kill driver $driverId")      drivers.get(driverId) match {        case Some(runner) =>          runner.kill()        case None =>          logError(s"Asked to kill unknown driver $driverId")      }    }

这个KillDirver的命令实际上由Master发出的。而Master实际上接收了Client的kill driver的命令。这个也能够看出Scala语言的简洁性。

版权声明:本文博主原创文章,博客,未经同意不得转载。

你可能感兴趣的文章
Linux下用Java获取本机IP
查看>>
Eclipse的Spring库导入
查看>>
velocity 判断 变量 是否不是空或empty
查看>>
【leetcode】123. Best Time to Buy and Sell Stock III
查看>>
角色设计的特点
查看>>
sublime text格式化json快捷键
查看>>
获得数据库自动生成的主键
查看>>
磁盘阵列
查看>>
y轴数据变换利器——yaxis-transformer
查看>>
Hibernate缓存机制
查看>>
从头开始复习css之动画
查看>>
sed常见用法,删除匹配行的上2行,下3行
查看>>
【BZOJ】1415 [Noi2005]聪聪和可可 期望DP+记忆化搜索
查看>>
android 7.1 调用相机崩溃解决办法
查看>>
访问控制符
查看>>
Android studio修改字体(font)大小(size)
查看>>
------第二节-----------------第二讲----单链表的基本操作---------
查看>>
iOS 百度地图大头针使用
查看>>
1118: 零起点学算法25——求两点之间的距离
查看>>
delegate代理设计模式
查看>>