北京快三开奖

  • <tr id="U9YkSO"><strong id="U9YkSO"></strong><small id="U9YkSO"></small><button id="U9YkSO"></button><li id="U9YkSO"><noscript id="U9YkSO"><big id="U9YkSO"></big><dt id="U9YkSO"></dt></noscript></li></tr><ol id="U9YkSO"><option id="U9YkSO"><table id="U9YkSO"><blockquote id="U9YkSO"><tbody id="U9YkSO"></tbody></blockquote></table></option></ol><u id="U9YkSO"></u><kbd id="U9YkSO"><kbd id="U9YkSO"></kbd></kbd>

    <code id="U9YkSO"><strong id="U9YkSO"></strong></code>

    <fieldset id="U9YkSO"></fieldset>
          <span id="U9YkSO"></span>

              <ins id="U9YkSO"></ins>
              <acronym id="U9YkSO"><em id="U9YkSO"></em><td id="U9YkSO"><div id="U9YkSO"></div></td></acronym><address id="U9YkSO"><big id="U9YkSO"><big id="U9YkSO"></big><legend id="U9YkSO"></legend></big></address>

              <i id="U9YkSO"><div id="U9YkSO"><ins id="U9YkSO"></ins></div></i>
              <i id="U9YkSO"></i>
            1. <dl id="U9YkSO"></dl>
              1. <blockquote id="U9YkSO"><q id="U9YkSO"><noscript id="U9YkSO"></noscript><dt id="U9YkSO"></dt></q></blockquote><noframes id="U9YkSO"><i id="U9YkSO"></i>
                企业空间 推销商城 存储论坛
                北京快三开奖全闪存阵列 IBM云盘算 Acronis 安克诺斯 安腾普 腾保数据
                首页 > Hadoop > 注释

                Hadoop2.6.0运转mapreduce之推测(speculative)实行(上)

                2016-09-13 13:29泉源:中国存储网
                导读:在mapreduce中设计了Speculator接口作为推测实行的一致标准,DefaultSpeculator作为一种效劳在完成了Speculator的同时承继了AbstractService,DefaultSpeculator是mapreduce的默许完成。

                媒介

                当一个使用向YARN集群提交作业后,此作业的多个义务由于负载不平衡、资源散布不均等缘由都市招致各个义务运转完成的工夫纷歧致,乃至会呈现一个义务分明慢于统一作业的别的义务的状况。假如对这种状况不加优化,最慢的义务终极会拖慢整个作业的全体实行进度。幸亏mapreduce框架提供了义务推测实行机制,当有须要时就启动一个备份义务。终极会接纳备份义务和原义务中率先实行完的后果作为终极后果。

                由于详细剖析推测实行机制,篇幅很长,以是我会分红几篇内容连续引见。

                推测实行测试

                本文在我本人搭建的集群(集群搭建可以参阅《Linux下Hadoop2.6.0集群情况的搭建》一文)上,实行wordcount例子,来验证mapreduce框架的推测机制。我们输出以下下令:

                 
                1. hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize=19 /wordcount/input/ /wordcount/output/result1  


                义务分别的信息如下:

                Hadoop2.6.0运转mapreduce之推测(speculative)实行(上)

                我们看到map义务被分别为10:

                Hadoop2.6.0运转mapreduce之推测(speculative)实行(上)

                这一次测试并没有发作推测实行的状况,我们可以再实行一次下面的下令,最初看到的信息如下:

                Hadoop2.6.0运转mapreduce之推测(speculative)实行(上)

                此中看到实行的map数目多了1个,成了11个,并且还呈现了Killed map tasks=1的信息,这表现这次实行终极发作了推测实行。此中一个map义务添加了一个备份义务,当备份的map义务和原有的map义务中有一个率先完成了,那么会将另一个慢的map义务杀去世。reduce义务也是相似,只不外Hadoop2.6.0的子项目hadoop-mapreduce-examples中自带的wordcount例子中,运用Job.setNumReduceTasks(int)这个API将reduce义务的数目控制为1个。在这里我们看到推测实行偶然候会发作,而偶然候却不会,这是为什么呢?

                在《Hadoop2.6.0运转mapreduce之Uber形式验证》一文中我还冗长提到,假如启用了Uber运转形式,推测实行会被取消。关于这些外部的完成原理需求我们从架构设计和源码角度停止分析,由于我们还需求晓得以是然。

                mapreduce推测实行设计架构

                在mapreduce中设计了Speculator接口作为推测实行的一致标准,DefaultSpeculator作为一种效劳在完成了Speculator的同时承继了AbstractService,DefaultSpeculator是mapreduce的默许完成。DefaultSpeculator担任处置SpeculatorEvent事情,现在次要包罗四种事情,辨别是:

                • JOB_CREATE:作业方才被创立时触发的事情,并处置一些初始化任务。
                • ATTEMPT_START:一个义务实例TaskAttemptImpl启动时触发的事情,DefaultSpeculator将会运用外部的推测预算器(默许是LegacyTaskRuntimeEstimator)开启对此义务实例的监控。
                • ATTEMPT_STATUS_UPDATE:当义务实例的形态更新时触发的事情,DefaultSpeculator将会更新推测预算器对义务的监控信息;更新正在运转中的义务(维护在runningTasks中);义务的统计信息(这些统计信息用于跟踪永劫间未报告请示心跳的义务,并积极自动的停止推测实行,而不是等候义务超时)
                • TASK_CONTAINER_NEED_UPDATE:义务Container数目发作变革时触发的事情。

                TaskRuntimeEstimator接口为推测实行提供了盘算模子的标准,默许的完成类是LegacyTaskRuntimeEstimator,别的另有ExponentiallySmoothedTaskRuntimeEstimator。这里临时不合错误其内容停止深化引见,在前面会连续睁开。

                Speculator的初始化和启动随同着MRAppMaster的初始化与启动。

                接上去我们以Speculator接口的默许完成DefaultSpeculator为例,逐渐剖析其初始化、启动、推测实行等外容的任务原理。

                Speculator的初始化

                Speculator是MRAppMaster的子组件、子效劳,以是也需求初始化。有经历的Hadoop工程师,想必晓得当mapreduce作业提交给ResourceManager后,由RM担任向NodeManger通讯启动一个Container用于实行MRAppMaster。启动MRAppMaster实践也是经过挪用其main办法,此中会挪用MRAppMaster实例的serviceInit办法,此中与Speculator有关的代码完成见代码清单1。

                代码清单1 MRAppMaster的serviceInit办法中创立Speculator的代码
                1. if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)  
                2.     || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {  
                3.   //optional service to speculate on task attempts' progress  
                4.   speculator = createSpeculator(conf, context);  
                5.   addIfService(speculator);  
                6. }  
                7. speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);  
                8. dispatcher.register(Speculator.EventType.class,  
                9.     speculatorEventDispatcher);  

                代码清单1所示代码的实行步调如下:

                1. 当启用map义务推测(这里的MRJobConfig.MAP_SPECULATIVE实践由参数mapreduce.map.speculative控制,默许是true)或许启用reduce义务推测(这里的MRJobConfig.REDUCE_SPECULATIVE实践由参数mapreduce.reduce.speculative控制,默许是true)时挪用createSpeculator办法创立推测效劳。最初将Speculator添加为MRAppMaster的子效劳。
                2. 向调理器dispatcher注册推测事情与推测事情的处置器SpeculatorEventDispatcher,以便触发了推测事情后交由SpeculatorEventDispatcher作进一步处置。

                createSpeculator办法(见代码清单2)创立的推测效劳的完成类默许是DefaultSpeculator,用户也可以经过参数yarn.app.mapreduce.am.job.speculator.class(即MRJobConfig.MR_AM_JOB_SPECULATOR)指定推测效劳的完成类。

                代码清单2 创立推测器

                1. protected Speculator createSpeculator(Configuration conf,  
                2.     final AppContext context) {  
                3.   return callWithJobClassLoader(conf, new Action<Speculator>() {  
                4.     public Speculator call(Configuration conf) {  
                5.       Class<? extends Speculator> speculatorClass;  
                6.       try {  
                7.         speculatorClass  
                8.             // "yarn.mapreduce.job.speculator.class"  
                9.             = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,  
                10.                             DefaultSpeculator.class,  
                11.                             Speculator.class);  
                12.         Constructor<? extends Speculator> speculatorConstructor  
                13.             = speculatorClass.getConstructor  
                14.                  (Configuration.class, AppContext.class);  
                15.         Speculator result = speculatorConstructor.newInstance(conf, context);  
                16.         return result;  
                17.       } catch (InstantiationException ex) {  
                18.         LOG.error("Can't make a speculator -- check "  
                19.             + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);  
                20.         throw new YarnRuntimeException(ex);  
                21.       } catch (IllegalAccessException ex) {  
                22.         LOG.error("Can't make a speculator -- check "  
                23.             + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);  
                24.         throw new YarnRuntimeException(ex);  
                25.       } catch (InvocationTargetException ex) {  
                26.         LOG.error("Can't make a speculator -- check "  
                27.             + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);  
                28.         throw new YarnRuntimeException(ex);  
                29.       } catch (NoSuchMethodException ex) {  
                30.         LOG.error("Can't make a speculator -- check "  
                31.             + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);  
                32.         throw new YarnRuntimeException(ex);  
                33.       }  
                34.     }  
                35.   });  
                36. }  

                依据代码清单2,我们晓得createSpeculator办法经过反射挪用了DefaultSpeculator的结构器来实例化义务推测器。DefaultSpeculator的结构器如下:

                1. public DefaultSpeculator(Configuration conf, AppContext context) {  
                2.   this(conf, context, context.getClock());  
                3. }  
                4. public DefaultSpeculator(Configuration conf, AppContext context, Clock clock) {  
                5.   this(conf, context, getEstimator(conf, context), clock);  
                6. }  


                上述第一个结构器挪用了第二个结构器,而第二个结构器中起首挪用了getEstimator办法(见代码清单3)用于获取推测预算器。

                代码清单3 获取推测预算器
                1. static private TaskRuntimeEstimator getEstimator  
                2.     (Configuration conf, AppContext context) {  
                3.   TaskRuntimeEstimator estimator;  
                4.     
                5.   try {  
                6.     // "yarn.mapreduce.job.task.runtime.estimator.class"  
                7.     Class<? extends TaskRuntimeEstimator> estimatorClass  
                8.         = conf.getClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,  
                9.                         LegacyTaskRuntimeEstimator.class,  
                10.                         TaskRuntimeEstimator.class);  
                11.     Constructor<? extends TaskRuntimeEstimator> estimatorConstructor  
                12.         = estimatorClass.getConstructor();  
                13.     estimator = estimatorConstructor.newInstance();  
                14.     estimator.contextualize(conf, context);  
                15.   } catch (InstantiationException ex) {  
                16.     LOG.error("Can't make a speculation runtime estimator", ex);  
                17.     throw new YarnRuntimeException(ex);  
                18.   } catch (IllegalAccessException ex) {  
                19.     LOG.error("Can't make a speculation runtime estimator", ex);  
                20.     throw new YarnRuntimeException(ex);  
                21.   } catch (InvocationTargetException ex) {  
                22.     LOG.error("Can't make a speculation runtime estimator", ex);  
                23.     throw new YarnRuntimeException(ex);  
                24.   } catch (NoSuchMethodException ex) {  
                25.     LOG.error("Can't make a speculation runtime estimator", ex);  
                26.     throw new YarnRuntimeException(ex);  
                27.   }  
                28.     
                29. return estimator;  
                30. }  

                依据代码清单3可以看出推测预算器可以经过参数yarn.app.mapreduce.am.job.task.estimator.class(即MRJobConfig.MR_AM_TASK_ESTIMATOR)停止指定,假如没有指定,则默许运用LegacyTaskRuntimeEstimator。实例化LegacyTaskRuntimeEstimator后,还挪用其父类StartEndTimesBase的contextualize办法(见代码清单4)停止上下文的初始化,实践便是将以后作业添加到map义务统计列表、reduce义务统计列表,并设置作业与其慢义务阈值(mapreduce.job.speculative.slowtaskthreshold)之间的映射干系。

                代码清单4 StartEndTimesBase的初始化

                1. @Override  
                2. public void contextualize(Configuration conf, AppContext context) {  
                3.   this.context = context;  
                4.   Map<JobId, Job> allJobs = context.getAllJobs();  
                5.   for (Map.Entry<JobId, Job> entry : allJobs.entrySet()) {  
                6.     final Job job = entry.getValue();  
                7.     mapperStatistics.put(job, new DataStatistics());  
                8.     reducerStatistics.put(job, new DataStatistics());  
                9.     slowTaskRelativeTresholds.put  
                10.         (job, conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));  
                11.   }  
                12. }  


                创立并初始化推测预算器后,会再次挪用DefaultSpeculator的最初一个结构器,完成如下:

                1. public DefaultSpeculator  
                2.     (Configuration conf, AppContext context,  
                3.      TaskRuntimeEstimator estimator, Clock clock) {  
                4.   super(DefaultSpeculator.class.getName());  
                5.   this.conf = conf;  
                6.   this.context = context;  
                7.   this.estimator = estimator;  
                8.   this.clock = clock;  
                9.   this.eventHandler = context.getEventHandler();  
                10. }  

                至此,我们引见完了DefaultSpeculator的初始化进程。

                Speculator的启动

                MRAppMaster在启动的时分,会挪用其serviceStart办法,此中触及启动Speculator的局部见代码清单5。

                代码清单5 启动MRAppMaster时触及Speculator的局部

                1. if (job.isUber()) {  
                2.   speculatorEventDispatcher.disableSpeculation();  
                3.   LOG.info("MRAppMaster uberizing job " + job.getID()  
                4.       + " in local container (\"uber-AM\") on node "  
                5.       + nmHost + ":" + nmPort + ".");  
                6. } else {  
                7.   // send init to speculator only for non-uber jobs.   
                8.   // This won't yet start as dispatcher isn't started yet.  
                9.   dispatcher.getEventHandler().handle(  
                10.       new SpeculatorEvent(job.getID(), clock.getTime()));  
                11.   LOG.info("MRAppMaster launching normal, non-uberized, multi-container "  
                12.       + "job " + job.getID() + ".");  
                13. }  

                剖析代码清单5,此中与义务推测有关的逻辑如下:

                • 假如接纳了Uber运转形式,则会挪用SpeculatorEventDispatcher的disableSpeculation办法(见代码清单6),使得义务推测生效。
                • 假如未接纳Uber运转形式,则会向调理器自动发送一个SpeculatorEvent事情。此处结构SpeculatorEvent事情的代码如下;
                1. public SpeculatorEvent(JobId jobID, long timestamp) {  
                2.   super(Speculator.EventType.JOB_CREATE, timestamp);  
                3.   this.jobID = jobID;  
                4. }  
                由此可见,在启动MRAppMaster的阶段,创立的SpeculatorEvent事情的范例是Speculator.EventType.JOB_CREATE。SpeculatorEventDispatcher的handle办法会被调理器实行,用以处置SpeculatorEvent事情,其代码完成见代码清单6。
                代码清单6 SpeculatorEventDispatcher的完成
                1. private class SpeculatorEventDispatcher implements  
                2.     EventHandler<SpeculatorEvent> {  
                3.   private final Configuration conf;  
                4.   private volatile boolean disabled;  
                5.   public SpeculatorEventDispatcher(Configuration config) {  
                6.     this.conf = config;  
                7.   }  
                8.   @Override  
                9.   public void handle(final SpeculatorEvent event) {  
                10.     if (disabled) {  
                11.       return;  
                12.     }  
                13.     TaskId tId = event.getTaskID();  
                14.     TaskType tType = null;  
                15.     /* event's TaskId will be null if the event type is JOB_CREATE or 
                16.      * ATTEMPT_STATUS_UPDATE 
                17.      */  
                18.     if (tId != null) {  
                19.       tType = tId.getTaskType();   
                20.     }  
                21.     boolean shouldMapSpec =  
                22.             conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false);  
                23.     boolean shouldReduceSpec =  
                24.             conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);  
                25.     /* The point of the following is to allow the MAP and REDUCE speculative 
                26.      * config values to be independent: 
                27.      * IF spec-exec is turned on for maps AND the task is a map task 
                28.      * OR IF spec-exec is turned on for reduces AND the task is a reduce task 
                29.      * THEN call the speculator to handle the event. 
                30.      */  
                31.     if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP))  
                32.       || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) {  
                33.       // Speculator IS enabled, direct the event to there.  
                34.       callWithJobClassLoader(conf, new Action<Void>() {  
                35.         public Void call(Configuration conf) {  
                36.           speculator.handle(event);  
                37.           return null;  
                38.         }  
                39.       });  
                40.     }  
                41.   }  
                42.   public void disableSpeculation() {  
                43.     disabled = true;  
                44.   }  
                45. }  

                SpeculatorEventDispatcher的完成通知我们当启用map或许reduce义务推测时,将异步伐用Speculator的handle办法处置SpeculatorEvent事情。以默许的DefaultSpeculator的handle办法为例,来看看实在现,代码如下:

                1. @Override  
                2. public void handle(SpeculatorEvent event) {  
                3.   processSpeculatorEvent(event);  
                4. }  


                上述代码实践署理实行了processSpeculatorEvent办法(见代码清单7)

                代码清单7 DefaultSpeculator处置SpeculatorEvent事情的完成

                1. private synchronized void processSpeculatorEvent(SpeculatorEvent event) {  
                2.   switch (event.getType()) {  
                3.     case ATTEMPT_STATUS_UPDATE:  
                4.       statusUpdate(event.getReportedStatus(), event.getTimestamp());  
                5.       break;  
                6.     case TASK_CONTAINER_NEED_UPDATE:  
                7.     {  
                8.       AtomicInteger need = containerNeed(event.getTaskID());  
                9.       need.addAndGet(event.containersNeededChange());  
                10.       break;  
                11.     }  
                12.     case ATTEMPT_START:  
                13.     {  
                14.       LOG.info("ATTEMPT_START " + event.getTaskID());  
                15.       estimator.enrollAttempt  
                16.           (event.getReportedStatus(), event.getTimestamp());  
                17.       break;  
                18.     }  
                19.       
                20.     case JOB_CREATE:  
                21.     {  
                22.       LOG.info("JOB_CREATE " + event.getJobID());  
                23.       estimator.contextualize(getConfig(), context);  
                24.       break;  
                25.     }  
                26.   }  
                27. }  

                当DefaultSpeculator收到范例为JOB_CREATE的SpeculatorEvent事情时会婚配实行以下代码:

                1. case JOB_CREATE:  
                2. {  
                3.   LOG.info("JOB_CREATE " + event.getJobID());  
                4.   estimator.contextualize(getConfig(), context);  
                5.   break;  
                6. }  

                这里实践也挪用了StartEndTimesBase的contextualize办法(见代码清单4),不再赘述。

                由于DefaultSpeculator也是MRAppMaster的子组件之一,以是在启动MRAppMaster(挪用MRAppMaster的serviceStart)的进程中,也会挪用DefaultSpeculator的serviceStart办法(见代码清单8)启动DefaultSpeculator。

                代码清单8 启动DefaultSpeculator

                1. protected void serviceStart() throws Exception {  
                2.   Runnable speculationBackgroundCore  
                3.       = new Runnable() {  
                4.           @Override  
                5.           public void run() {  
                6.             while (!stopped && !Thread.currentThread().isInterrupted()) {  
                7.               long backgroundRunStartTime = clock.getTime();  
                8.               try {  
                9.                 int speculations = computeSpeculations();  
                10.                 long mininumRecomp  
                11.                     = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE  
                12.                                        : SOONEST_RETRY_AFTER_NO_SPECULATE;  
                13.                 long wait = Math.max(mininumRecomp,  
                14.                       clock.getTime() - backgroundRunStartTime);  
                15.                 if (speculations > 0) {  
                16.                   LOG.info("We launched " + speculations  
                17.                       + " speculations.  Sleeping " + wait + " milliseconds.");  
                18.                 }  
                19.                 Object pollResult  
                20.                     = scanControl.poll(wait, TimeUnit.MILLISECONDS);  
                21.               } catch (InterruptedException e) {  
                22.                 if (!stopped) {  
                23.                   LOG.error("Background thread returning, interrupted", e);  
                24.                 }  
                25.                 return;  
                26.               }  
                27.             }  
                28.           }  
                29.         };  
                30.   speculationBackgroundThread = new Thread  
                31.       (speculationBackgroundCore, "DefaultSpeculator background processing");  
                32.   speculationBackgroundThread.start();  
                33.   super.serviceStart();  
                34. }  

                启动DefaultSpeculator的次要目标是启动一个线程不时推测实行停止预算,步调如下:

                1. 创立了匿名的完成类speculationBackgroundCore,用于在独自的线程中对推测实行停止预算。
                2. 创立Thread并启动线程。
                speculationBackgroundCore中挪用的computeSpeculations办法用于盘算推测调理实行的map和reduce义务数目,实在现如下:
                1. private int computeSpeculations() {  
                2.   // We'll try to issue one map and one reduce speculation per job per run  
                3.   return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();  
                4. }  

                computeSpeculations办法前往的后果即是maybeScheduleAMapSpeculation办法(用于推测需求重新分派Container的map义务数目)和maybeScheduleAReduceSpeculation办法(用于推测需求重新分派Container的reduce义务数目)前往值的和。maybeScheduleAMapSpeculation的完成如下:

                1. private int maybeScheduleAMapSpeculation() {  
                2.   return maybeScheduleASpeculation(TaskType.MAP);  
                3. }  
                4. private int maybeScheduleAReduceSpeculation() {  
                5.   return maybeScheduleASpeculation(TaskType.REDUCE);  
                6. }  
                maybeScheduleAMapSpeculation和maybeScheduleAReduceSpeculation实践都挪用了maybeScheduleASpeculation办法,实在现见代码清单9。
                代码清单9 maybeScheduleASpeculation用于盘算map或许reduce义务推测调理的能够性
                1. private int maybeScheduleASpeculation(TaskType type) {  
                2.   int successes = 0;  
                3.   long now = clock.getTime();  
                4.   ConcurrentMap<JobId, AtomicInteger> containerNeeds  
                5.       = type == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;  
                6.   for (ConcurrentMap.Entry<JobId, AtomicInteger> jobEntry : containerNeeds.entrySet()) {  
                7.     // This race conditon is okay.  If we skip a speculation attempt we  
                8.     //  should have tried because the event that lowers the number of  
                9.     //  containers needed to zero hasn't come through, it will next time.  
                10.     // Also, if we miss the fact that the number of containers needed was  
                11.     //  zero but increased due to a failure it's not too bad to launch one  
                12.     //  container prematurely.  
                13.     if (jobEntry.getValue().get() > 0) {  
                14.       continue;  
                15.     }  
                16.     int numberSpeculationsAlready = 0;  
                17.     int numberRunningTasks = 0;  
                18.     // loop through the tasks of the kind  
                19.     Job job = context.getJob(jobEntry.getKey());  
                20.     Map<TaskId, Task> tasks = job.getTasks(type);  
                21.     int numberAllowedSpeculativeTasks  
                22.         = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,  
                23.                          PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());  
                24.     TaskId bestTaskID = null;  
                25.     long bestSpeculationValue = -1L;  
                26.     // this loop is potentially pricey.  
                27.     // TODO track the tasks that are potentially worth looking at  
                28.     for (Map.Entry<TaskId, Task> taskEntry : tasks.entrySet()) {  
                29.       long mySpeculationValue = speculationValue(taskEntry.getKey(), now);  
                30.       if (mySpeculationValue == ALREADY_SPECULATING) {  
                31.         ++numberSpeculationsAlready;  
                32.       }  
                33.       if (mySpeculationValue != NOT_RUNNING) {  
                34.         ++numberRunningTasks;  
                35.       }  
                36.       if (mySpeculationValue > bestSpeculationValue) {  
                37.         bestTaskID = taskEntry.getKey();  
                38.         bestSpeculationValue = mySpeculationValue;  
                39.       }  
                40.     }  
                41.     numberAllowedSpeculativeTasks  
                42.         = (int) Math.max(numberAllowedSpeculativeTasks,  
                43.                          PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);  
                44.     // If we found a speculation target, fire it off  
                45.     if (bestTaskID != null  
                46.         && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {  
                47.       addSpeculativeAttempt(bestTaskID);  
                48.       ++successes;  
                49.     }  
                50.   }  
                51.   return successes;  
                52. }  
                maybeScheduleASpeculation办法起首依据以后Task的范例(map或reduce)获取相应范例义务的需求分派Container数目的缓存containerNeeds,然后遍历containerNeeds。

                遍历containerNeeds的实行步调如下:

                1. 假如以后Job仍然有未分派Container的Task,那么跳过以后循环,持续下一次循环。这阐明假如以后Job的某一范例的Task仍然存在未分派Container的,则不会停止义务推测;
                2. 从以后使用的上下文AppContext中获取Job,并获取此Job的一切的Task(map或许reduce);
                3. 盘算容许实行推测的Task数目numberAllowedSpeculativeTasks(map或许reduce)。此中MINIMUM_ALLOWED_SPECULATIVE_TASKS的值是10,PROPORTION_TOTAL_TASKS_SPECULATABLE的值是0.01。numberAllowedSpeculativeTasks取MINIMUM_ALLOWED_SPECULATIVE_TASKS与PROPORTION_TOTAL_TASKS_SPECULATABLE*义务数目之积之间的最大值。因而我们晓得,当Job的某一范例(map或许reduce)的Task的数目小于1100时,盘算失掉的numberAllowedSpeculativeTasks即是10,假如Job的某一范例(map或许reduce)的Task的数目大于即是1100时,numberAllowedSpeculativeTasks才会大于10。numberAllowedSpeculativeTasks变量可以无效避免少量义务同时启动备份义务所形成的资源糜费。
                4. 遍历Job对应的map义务或许reduce义务聚集,挪用speculationValue办法获取每一个Task的推测值。并在迭代完一切的map义务或许reduce义务后,获取这一义务聚集中的推测值bestSpeculationValue最大的义务ID。
                5. 再次盘算numberAllowedSpeculativeTasks,此中PROPORTION_RUNNING_TASKS_SPECULATABLE的值即是0.1,numberRunningTasks是处于运转中的Task。numberAllowedSpeculativeTasks取numberAllowedSpeculativeTasks与PROPORTION_RUNNING_TASKS_SPECULATABLE*numberRunningTasks之积之间的最大值。因而我们晓得当Job的某一范例(map或许reduce)的正在运转中的Task的数目小于110时(假定第3步失掉的numberAllowedSpeculativeTasks即是10),盘算失掉的numberAllowedSpeculativeTasks即是10,假如Job的某一范例(map或许reduce)的正在运转中的Task的数目大于即是110时,numberAllowedSpeculativeTasks才会大于10。
                6. 假如numberAllowedSpeculativeTasks大于numberSpeculationsAlready(曾经推测实行过的Task数目),则挪用addSpeculativeAttempt办法(见代码清单10)将第4步中选出的义务的义务ID添加到推测实验中。
                代码清单10 添加推测实行的实验
                1. //Add attempt to a given Task.  
                2. protected void addSpeculativeAttempt(TaskId taskID) {  
                3.   LOG.info  
                4.       ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);  
                5.   eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_ADD_SPEC_ATTEMPT));  
                6.   mayHaveSpeculated.add(taskID);  
                7. }  

                依据代码清单10,我们看到推测实行实验是经过发送范例为TaskEventType.T_ADD_SPEC_ATTEMPT的TaskEvent事情完成的。

                预算义务的推测值

                在剖析代码清单9时,我成心跳过了speculationValue办法的剖析。speculationValue办法(见代码清单11)次要用于预算每个义务的推测值。

                代码清单11 预算义务的推测值

                1. private long speculationValue(TaskId taskID, long now) {  
                2.   Job job = context.getJob(taskID.getJobId());  
                3.   Task task = job.getTask(taskID);  
                4.   Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();  
                5.   long acceptableRuntime = Long.MIN_VALUE;  
                6.   long result = Long.MIN_VALUE;  
                7.   if (!mayHaveSpeculated.contains(taskID)) {  
                8.     acceptableRuntime = estimator.thresholdRuntime(taskID);  
                9.     if (acceptableRuntime == Long.MAX_VALUE) {  
                10.       return ON_SCHEDULE;  
                11.     }  
                12.   }  
                13.   TaskAttemptId runningTaskAttemptID = null;  
                14.   int numberRunningAttempts = 0;  
                15.   for (TaskAttempt taskAttempt : attempts.values()) {  
                16.     if (taskAttempt.getState() == TaskAttemptState.RUNNING  
                17.         || taskAttempt.getState() == TaskAttemptState.STARTING) {  
                18.       if (++numberRunningAttempts > 1) {  
                19.         return ALREADY_SPECULATING;  
                20.       }  
                21.       runningTaskAttemptID = taskAttempt.getID();  
                22.       long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);  
                23.       long taskAttemptStartTime  
                24.           = estimator.attemptEnrolledTime(runningTaskAttemptID);  
                25.       if (taskAttemptStartTime > now) {  
                26.         // This background process ran before we could process the task  
                27.         //  attempt status change that chronicles the attempt start  
                28.         return TOO_NEW;  
                29.       }  
                30.       long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;  
                31.       long estimatedReplacementEndTime  
                32.           = now + estimator.estimatedNewAttemptRuntime(taskID);  
                33.       float progress = taskAttempt.getProgress();  
                34.       TaskAttemptHistoryStatistics data =  
                35.           runningTaskAttemptStatistics.get(runningTaskAttemptID);  
                36.       if (data == null) {  
                37.         runningTaskAttemptStatistics.put(runningTaskAttemptID,  
                38.           new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));  
                39.       } else {  
                40.         if (estimatedRunTime == data.getEstimatedRunTime()  
                41.             && progress == data.getProgress()) {  
                42.           // Previous stats are same as same stats  
                43.           if (data.notHeartbeatedInAWhile(now)) {  
                44.             // Stats have stagnated for a while, simulate heart-beat.  
                45.             TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();  
                46.             taskAttemptStatus.id = runningTaskAttemptID;  
                47.             taskAttemptStatus.progress = progress;  
                48.             taskAttemptStatus.taskState = taskAttempt.getState();  
                49.             // Now simulate the heart-beat  
                50.             handleAttempt(taskAttemptStatus);  
                51.           }  
                52.         } else {  
                53.           // Stats have changed - update our data structure  
                54.           data.setEstimatedRunTime(estimatedRunTime);  
                55.           data.setProgress(progress);  
                56.           data.resetHeartBeatTime(now);  
                57.         }  
                58.       }  
                59.       if (estimatedEndTime < now) {  
                60.         return PROGRESS_IS_GOOD;  
                61.       }  
                62.       if (estimatedReplacementEndTime >= estimatedEndTime) {  
                63.         return TOO_LATE_TO_SPECULATE;  
                64.       }  
                65.       result = estimatedEndTime - estimatedReplacementEndTime;  
                66.     }  
                67.   }  
                68.   // If we are here, there's at most one task attempt.  
                69.   if (numberRunningAttempts == 0) {  
                70.     return NOT_RUNNING;  
                71.   }  
                72.   if (acceptableRuntime == Long.MIN_VALUE) {  
                73.     acceptableRuntime = estimator.thresholdRuntime(taskID);  
                74.     if (acceptableRuntime == Long.MAX_VALUE) {  
                75.       return ON_SCHEDULE;  
                76.     }  
                77.   }  
                78.   return result;  
                79. }  

                speculationValue办法的实行步调如下:

                1. 假如义务还没有被推测实行,那么挪用estimator的thresholdRuntime办法获取义务可以承受的运转时长acceptableRuntime。假如acceptableRuntime即是Long.MAX_VALUE,则将ON_SCHEDULE作为前往值,ON_SCHEDULE的值是Long.MIN_VALUE,以此表现以后义务的推测值很小,即被推测实验的能够最小。
                2. 假如义务的运转实例数大于1,则阐明此义务曾经发作了推测实行,因而前往ALREADY_SPECULATING。ALREADY_SPECULATING即是Long.MIN_VALUE + 1。
                3. 挪用estimator的estimatedRuntime办法获取义务运转实例的预算运转时长estimatedRunTime。
                4. 挪用estimator的attemptEnrolledTime办法获取义务实例开端运转的工夫,此工夫即为startTimes中缓存的start。这个值是在义务实例启动时招致DefaultSpeculator的processSpeculatorEvent办法处置Speculator.EventType.ATTEMPT_START范例的SpeculatorEvent事情时保管的。
                5. estimatedEndTime表现预算义务实例的运转完毕工夫,estimatedEndTime = estimatedRunTime + taskAttemptStartTime。
                6. 挪用estimator的estimatedNewAttemptRuntime办法预算假如此时重新为义务启动一个实例,此实例运转完毕的工夫estimatedReplacementEndTime。
                7. 假如缓存中没有义务实例的汗青统计信息,那么将estimatedRunTime、义务实例进度progress,以后工夫封装为汗青统计信息缓存起来。
                8. 假如缓存中存在义务实例的汗青统计信息,假如缓存的estimatedRunTime和本次预算的estimatedRunTime一样而且缓存的实例进度progress和本次获取的义务实例进度progress一样,阐明有一段工夫没有收到心跳了,则模仿一次心跳。假如缓存的estimatedRunTime和本次预算的estimatedRunTime纷歧样或许缓存的实例进度progress和本次获取的义务实例进度progress纷歧样,那么将estimatedRunTime、义务实例进度progress,以后工夫更新就任务虚例的汗青统计信息中。
                9. 假如estimatedEndTime小于以后工夫,则阐明义务实例的进度精良,前往PROGRESS_IS_GOOD,PROGRESS_IS_GOOD即是Long.MIN_VALUE + 3。
                10. 假如estimatedReplacementEndTime大于即是estimatedEndTime,则阐明即使启动备份义务实例也杯水车薪,由于它的完毕工夫达不到节流作业总运转时长的作用。
                11. 盘算本次预算的后果值result,它即是estimatedEndTime - estimatedReplacementEndTime,当这个差值越大表现备份义务实例运转后比原义务实例的完毕工夫就越早,因而调理实行的代价越大。
                12. 假如numberRunningAttempts即是0,则表现以后义务还没有启动义务实例,前往NOT_RUNNING,NOT_RUNNING即是Long.MIN_VALUE + 4。
                13. 重新盘算acceptableRuntime,处置方法与第1步相反。
                14. 前往result。

                总结

                依据源码剖析,我们晓得DefaultSpeculator启动的线程会时时去盘算作业的各个义务的推测值,即speculationValue办法盘算的后果。从一切义务的推测值中选择值最大,也便是说代价最高的,为其装备一个备份义务。这里有个题目,Estimator推算用的种种统计和监控数据是从那边来的呢?请看《Hadoop2.6.0运转mapreduce之推测(speculative)实行(下)》。
                 作者博客:http://blog.csdn.net/beliefer/article/details/51249119
                持续阅读
                要害词 :
                Hadoop mapreduce
                中国存储网声明:此文观念不代表本站态度,若有版权疑问请联络我们。
                相干阅读
                产物引荐

                头条阅读
                栏目热门

                Copyright @ 2006-2019 ChinaStor.COM 版权一切 京ICP备14047533号

                中国存储网

                存储第一站,存储流派,存储在线交换平台