Hadoop之JobTrack分析
1.client端指定Job的各种参数配置之后调用job.waitForCompletion(true) 方法提交Job给JobTracker,等待Job 完成。
[java] view plaincopyprint?
1. publicvoidthrows
2.
3. //检查JobState状态
4. //检查及设置是否使用新的MapReduce API
5.
6. // Connect to the JobTracker and submit the job
7. //链接JobTracker
8. //将job信息提交
9. super
10. //更改job状态
11.
以上代码主要有两步骤,连接JobTracker并提交Job信息。connect方法主要是实例化JobClient对象,包括设置JobConf和init工作:
[java] view plaincopyprint?
1. publicvoidthrows
2. "mapred.job.tracker""local"//读取配置文件信息用于判断该Job是运行于本地单机模式还是分布式模式
3.
4.
5. this
6. if"local"//如果是单机模式,new LocalJobRunner
7. 1
8. thisnew
9. else
10. this
11.
12.
分布式模式下就会创建一个RPC代理链接:
[java] view plaincopyprint?
1. publicstatic
2. extends
3. long
4. intthrows
5.
6. if
7.
8.
9.
10.
11. new
12. new
13. long
14.
15. if
16. return
17. else
18. thrownew
19.
20.
21.
从上述代码可以看出hadoop实际上使用了Java自带的Proxy API来实现Remote Procedure Call
初始完之后,需要提交job
[java] view plaincopyprint?
1. //将job信息提交
submit方法做以下几件事情:
1.将conf中目录名字替换成hdfs代理的名字
2.检查output是否合法:比如路径是否已经存在,是否是明确的
3.将数据分成多个split并放到hdfs上面,写入job.xml文件
4.调用JobTracker的submitJob方法
该方法主要新建JobInProgress对象,然后检查访问权限和系统参数是否满足job,最后addJob:
[java] view plaincopyprint?
1. privatesynchronized
2. throws
3.
4.
5. synchronized
6. synchronized
7.
8. for
9.
10.
11.
12.
13.
14.
15.
16. "Job "" added successfully for user '"
17. "' to queue '"
18. "'"
19.
20.
21. return
22.
totalSubmissions记录client端提交job到JobTracker的次数。而jobs则是JobTracker所有可以管理的job的映射表
Map
taskScheduler是用于调度job先后执行策略的,其类图如下所示:
hadoop job调度机制;
public enum SchedulingMode { FAIR, FIFO}1.公平调度FairScheduler 对于每个用户而言,分布式资源是公平分配的,每个用户都有一个job池,假若某个用户目前所占有的资源很多,对于其他用户而言是不公平的,那么调度器就会杀掉占有资源多的用户的一些task,释放资源供他人使用2.容量调度JobQueueTaskScheduler在分布式系统上维护多个队列,每个队列都有一定的容量,每个队列中的job按照FIFO的策略进行调度。队列中可以包含队列。
两个Scheduler都要实现TaskScheduler的public synchronized List
接下来看看JobTracker的工作:
记录更新JobTracker重试的次数:
[java] view plaincopyprint?
1. whiletrue
2. try
3.
4. break
5. catch
6. "Failed to initialize recovery manager. "
7. // wait for some time
8.
9. "Retrying..."
10.
11.
启动Job调度器,默认是FairScheduler: taskScheduler.start();主要是初始化一些管理对象,比如job pool管理池
[java] view plaincopyprint?
1. // Initialize other pieces of the scheduler
2. new
3.
4. newthis
5.
6.
7. "mapred.fairscheduler.loadmanager"
8. classclass
9.
10.
11.
12.
13. "mapred.fairscheduler.taskselector"
14. classclass
15.
16.
[java] view plaincopyprint?
1.
[java] view plaincopyprint?
1. try
2.
3. "Initializing "
4.
5. // Inform the listeners if the job state has changed
6. // Note : that the job will be in PREP state.
7.
8. if
9.
10. new
11.
12. synchronizedthis
13.
14.
15.
16.
初始化操作主要用于初始化生成tasks然后通知其他的监听者执行其他操作。initTasks主要处理以下工作:
[java] view plaincopyprint?
1. // 记录用户提交的运行的job信息
2. try
3. new
4. @Override
5. publicthrows
6.
7.
8. returnnull
9.
10.
11. catch
12. thrownew
13.
14.
15. // 设置并记录job的优先级
16. this
17.
18. //
19. //生成每个Task需要的密钥
20. //
21.
22.
然后读取JobTracker split的数据的元信息,元信息包括以下属性信息:
[java] view plaincopyprint?
1. private//洗牌后的索引位置
2. privatelong//洗牌后数据长度
3. private//数据存储位置
然后根据元信息的长度来计算numMapTasks并校验数据存储地址是否可以连接
接下来生成map tasks和reducer tasks:
[java] view plaincopyprint?
1. new
2. forint0
3.
4. new
5.
6. this
7.
[java] view plaincopyprint?
1. this
2. this
3. this
4. this
5. this
6. this
7. this
8. this
9.
10.
以上除了task对应的jobTracker,split信息和job信息外,还设置了
[java] view plaincopyprint?
1.
2. "code"class"java"
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
本文来源:https://www.2haoxitong.net/k/doc/b411d248804d2b160b4ec054.html
文档为doc格式