Hadoop之JobTrack分析

发布时间:2013-06-28 09:46:06   来源:文档文库   
字号:

 

HadoopJobTrack分析

1.client端指定Job的各种参数配置之后调用job.waitForCompletion(true) 方法提交JobJobTracker,等待Job 完成。

[java] view plaincopyprint?

1. publicvoidthrows

2.

3. //检查JobState状态

4. //检查及设置是否使用新的MapReduce API

5.

6. // Connect to the JobTracker and submit the job

7. //链接JobTracker

8. //job信息提交

9. super

10. //更改job状态

11.


以上代码主要有两步骤,连接JobTracker并提交Job信息。connect方法主要是实例化JobClient对象,包括设置JobConfinit工作:

[java] view plaincopyprint?

1. publicvoidthrows

2. "mapred.job.tracker""local"//读取配置文件信息用于判断该Job是运行于本地单机模式还是分布式模式

3.

4.

5. this

6. if"local"//如果是单机模式,new LocalJobRunner 

7. 1

8. thisnew

9. else

10. this

11.

12.

分布式模式下就会创建一个RPC代理链接:

[java] view plaincopyprint?

1. publicstatic

2. extends

3. long

4. intthrows

5.

6. if

7.

8.

9.

10.

11. new

12. new

13. long

14.

15. if

16. return

17. else

18. thrownew

19.

20.

21.

从上述代码可以看出hadoop实际上使用了Java自带的Proxy API来实现Remote Procedure Call

初始完之后,需要提交job

[java] view plaincopyprint?

1. //job信息提交

submit方法做以下几件事情:

         1.conf中目录名字替换成hdfs代理的名字

         2.检查output是否合法:比如路径是否已经存在,是否是明确的

         3.将数据分成多个split并放到hdfs上面,写入job.xml文件

         4.调用JobTrackersubmitJob方法

        该方法主要新建JobInProgress对象,然后检查访问权限和系统参数是否满足job,最后addJob

     

[java] view plaincopyprint?

1. privatesynchronized

2. throws

3.

4.

5. synchronized

6. synchronized

7.

8. for

9.

10.

11.

12.

13.

14.

15.

16. "Job "" added successfully for user '"

17. "' to queue '"

18. "'"

19.

20.

21. return

22.

totalSubmissions记录client端提交jobJobTracker的次数。而jobs则是JobTracker所有可以管理的job的映射表

Map jobs =  Collections.synchronizedMap(new TreeMap());

taskScheduler是用于调度job先后执行策略的,其类图如下所示:

hadoop job调度机制;

public enum SchedulingMode {
  FAIR, FIFO
}
1.公平调度FairScheduler
   对于每个用户而言,分布式资源是公平分配的,每个用户都有一个job池,假若某个用户目前所占有的资源很多,对于其他用户而言是不公平的,那么调度器就会杀掉占有资源多的用户的一些task,释放资源供他人使用
2.容量调度JobQueueTaskScheduler
在分布式系统上维护多个队列,每个队列都有一定的容量,每个队列中的job按照FIFO的策略进行调度。队列中可以包含队列。

两个Scheduler都要实现TaskSchedulerpublic synchronized List assignTasks(TaskTracker tracker)方法,该方法通过具体的计算生成可以分配的task

接下来看看JobTracker的工作:

记录更新JobTracker重试的次数:

[java] view plaincopyprint?

1. whiletrue

2. try

3.

4. break

5. catch

6. "Failed to initialize recovery manager. "

7. // wait for some time

8.

9. "Retrying..."

10.

11.

启动Job调度器,默认是FairScheduler:
 taskScheduler.start();主要是初始化一些管理对象,比如job pool管理池

[java] view plaincopyprint?

1. // Initialize other pieces of the scheduler

2. new

3.

4. newthis

5.

6.

7. "mapred.fairscheduler.loadmanager"

8. classclass

9.

10.

11.

12.

13. "mapred.fairscheduler.taskselector"

14. classclass

15.

16.

[java] view plaincopyprint?

1.

[java] view plaincopyprint?

1. try

2.

3. "Initializing "

4.

5. // Inform the listeners if the job state has changed

6. // Note : that the job will be in PREP state.

7.

8. if

9.

10. new

11.

12. synchronizedthis

13.

14.

15.

16.

初始化操作主要用于初始化生成tasks然后通知其他的监听者执行其他操作。initTasks主要处理以下工作:

[java] view plaincopyprint?

1. // 记录用户提交的运行的job信息

2. try

3. new

4. @Override

5. publicthrows

6.

7.

8. returnnull

9.

10.

11. catch

12. thrownew

13.

14.

15. // 设置并记录job的优先级

16. this

17.

18. //

19. //生成每个Task需要的密钥

20. //

21.

22.

然后读取JobTracker split的数据的元信息,元信息包括以下属性信息:

[java] view plaincopyprint?

1. private//洗牌后的索引位置

2. privatelong//洗牌后数据长度

3. private//数据存储位置


然后根据元信息的长度来计算numMapTasks并校验数据存储地址是否可以连接

接下来生成map tasksreducer tasks

[java] view plaincopyprint?

1. new

2. forint0

3.

4. new

5.

6. this

7.

[java] view plaincopyprint?

1. this

2. this

3. this

4. this

5. this

6. this

7. this

8. this

9.

10.

以上除了task对应的jobTrackersplit信息和job信息外,还设置了

[java] view plaincopyprint?

1.

2. "code"class"java"

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

本文来源:https://www.2haoxitong.net/k/doc/b411d248804d2b160b4ec054.html

《Hadoop之JobTrack分析.doc》
将本文的Word文档下载到电脑,方便收藏和打印
推荐度:
点击下载文档

文档为doc格式