Clover coverage report - Ant Coverage
Coverage timestamp: Tue Apr 8 2003 20:43:55 EST
file stats: LOC: 332   Methods: 11
NCLOC: 150   Classes: 2
 
 Source file Conditionals Statements Methods TOTAL
Parallel.java 84.4% 95.9% 100% 93.1%
 1   
 /*
 2   
  * The Apache Software License, Version 1.1
 3   
  *
 4   
  * Copyright (c) 2001-2003 The Apache Software Foundation.  All rights
 5   
  * reserved.
 6   
  *
 7   
  * Redistribution and use in source and binary forms, with or without
 8   
  * modification, are permitted provided that the following conditions
 9   
  * are met:
 10   
  *
 11   
  * 1. Redistributions of source code must retain the above copyright
 12   
  *    notice, this list of conditions and the following disclaimer.
 13   
  *
 14   
  * 2. Redistributions in binary form must reproduce the above copyright
 15   
  *    notice, this list of conditions and the following disclaimer in
 16   
  *    the documentation and/or other materials provided with the
 17   
  *    distribution.
 18   
  *
 19   
  * 3. The end-user documentation included with the redistribution, if
 20   
  *    any, must include the following acknowlegement:
 21   
  *       "This product includes software developed by the
 22   
  *        Apache Software Foundation (http://www.apache.org/)."
 23   
  *    Alternately, this acknowlegement may appear in the software itself,
 24   
  *    if and wherever such third-party acknowlegements normally appear.
 25   
  *
 26   
  * 4. The names "Ant" and "Apache Software
 27   
  *    Foundation" must not be used to endorse or promote products derived
 28   
  *    from this software without prior written permission. For written
 29   
  *    permission, please contact apache@apache.org.
 30   
  *
 31   
  * 5. Products derived from this software may not be called "Apache"
 32   
  *    nor may "Apache" appear in their names without prior written
 33   
  *    permission of the Apache Group.
 34   
  *
 35   
  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
 36   
  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 37   
  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 38   
  * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
 39   
  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 40   
  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 41   
  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
 42   
  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 43   
  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 44   
  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 45   
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 46   
  * SUCH DAMAGE.
 47   
  * ====================================================================
 48   
  *
 49   
  * This software consists of voluntary contributions made by many
 50   
  * individuals on behalf of the Apache Software Foundation.  For more
 51   
  * information on the Apache Software Foundation, please see
 52   
  * <http://www.apache.org/>.
 53   
  */
 54   
 package org.apache.tools.ant.taskdefs;
 55   
 
 56   
 import java.lang.reflect.Method;
 57   
 import java.util.Enumeration;
 58   
 import java.util.Vector;
 59   
 import org.apache.tools.ant.BuildException;
 60   
 import org.apache.tools.ant.Location;
 61   
 import org.apache.tools.ant.Task;
 62   
 import org.apache.tools.ant.TaskContainer;
 63   
 import org.apache.tools.ant.util.StringUtils;
 64   
 
 65   
 /**
 66   
  * Executes the contained tasks in separate threads, continuing
 67   
  * once all are completed.<br/>
 68   
  * New behavior allows for the ant script to specify a maximum number of 
 69   
  * threads that will be executed in parallel.  One should be very careful about
 70   
  * using the <code>waitFor</code> task when specifying <code>threadCount</code>
 71   
  * as it can cause deadlocks if the number of threads is too small or if one of 
 72   
  * the nested tasks fails to execute completely.  The task selection algorithm 
 73   
  * will insure that the tasks listed before a task have started before that 
 74   
  * task is started, but it will not insure a successful completion of those 
 75   
  * tasks or that those tasks will finish first (i.e. it's a classic race 
 76   
  * condition).
 77   
  * <p>
 78   
  * @author Thomas Christen <a href="mailto:chr@active.ch">chr@active.ch</a>
 79   
  * @author Conor MacNeill
 80   
  * @author Danno Ferrin
 81   
  * @since Ant 1.4
 82   
  *
 83   
  * @ant.task category="control"
 84   
  */
 85   
 public class Parallel extends Task
 86   
                       implements TaskContainer {
 87   
 
 88   
     /** Collection holding the nested tasks */
 89   
     private Vector nestedTasks = new Vector();
 90   
 
 91   
     /** Semaphore to notify of completed threads */
 92   
     private final Object semaphore = new Object();
 93   
     
 94   
     /** Total number of threads to run */
 95   
     private int numThreads = 0;
 96   
     
 97   
     /** Total number of threads per processor to run.  */
 98   
     private int numThreadsPerProcessor = 0;
 99   
     
 100   
     /** Interval (in ms) to poll for finished threads. */
 101   
     private int pollInterval = 1000; // default is once a second
 102   
 
 103   
     /**
 104   
      * Add a nested task to execute in parallel.
 105   
      * @param nestedTask  Nested task to be executed in parallel
 106   
      */
 107  24
     public void addTask(Task nestedTask) {
 108  24
         nestedTasks.addElement(nestedTask);
 109   
     }
 110   
     
 111   
     /** 
 112   
      * Dynamically generates the number of threads to execute based on the 
 113   
      * number of available processors (via 
 114   
      * <code>java.lang.Runtime.availableProcessors()</code>). Requires a J2SE 
 115   
      * 1.4 VM, and it will overwrite the value set in threadCount.  
 116   
      * If used in a 1.1, 1.2, or 1.3 VM then the task will defer to 
 117   
      * <code>threadCount</code>.; optional
 118   
      * @param numThreadsPerProcessor Number of threads to create per available 
 119   
      *        processor.
 120   
      *
 121   
      */
 122  1
     public void setThreadsPerProcessor(int numThreadsPerProcessor) {
 123  1
         this.numThreadsPerProcessor = numThreadsPerProcessor;
 124   
     }
 125   
     
 126   
     /** 
 127   
      * Statically determine the maximum number of tasks to execute 
 128   
      * simultaneously.  If there are less tasks than threads then all will be 
 129   
      * executed at once, if there are more then only <code>threadCount</code> 
 130   
      * tasks will be executed at one time.  If <code>threadsPerProcessor</code> 
 131   
      * is set and the JVM is at least a 1.4 VM then this value is 
 132   
      * ignored.; optional
 133   
      *
 134   
      * @param numThreads total number of therads.
 135   
      *
 136   
      */
 137  5
     public void setThreadCount(int numThreads) {
 138  5
         this.numThreads = numThreads;
 139   
     }
 140   
 
 141   
     /** 
 142   
      * Interval to poll for completed threads when threadCount or 
 143   
      * threadsPerProcessor is specified.  Integer in milliseconds.; optional
 144   
      *
 145   
      * @param pollInterval New value of property pollInterval.
 146   
      */
 147  5
     public void setPollInterval(int pollInterval) {
 148  5
         this.pollInterval = pollInterval;
 149   
     }
 150   
     
 151   
     /**
 152   
      * Execute the parallel tasks
 153   
      *
 154   
      * @exception BuildException if any of the threads failed.
 155   
      */
 156  8
     public void execute() throws BuildException {
 157  8
         updateThreadCounts();
 158  8
         if (numThreads == 0) {
 159  3
             numThreads = nestedTasks.size();
 160   
         }
 161  8
         spinThreads();
 162   
     }
 163   
     
 164   
     /**
 165   
      * Determine the number of threads based on the number of processors
 166   
      */
 167  8
     private void updateThreadCounts() {
 168  8
         if (numThreadsPerProcessor != 0) {
 169  1
             int numProcessors = getNumProcessors();
 170  1
             if (numProcessors != 0) {
 171  1
                 numThreads = numProcessors * numThreadsPerProcessor;
 172   
             }
 173   
         }
 174   
     }
 175   
         
 176   
     /**
 177   
      * Spin up required threads with a maximum number active at any given time.
 178   
      *
 179   
      * @exception BuildException if any of the threads failed.
 180   
      */
 181  8
     private void spinThreads() throws BuildException {
 182  8
         final int numTasks = nestedTasks.size();
 183  8
         Thread[] threads = new Thread[numTasks];
 184  8
         TaskRunnable[] runnables = new TaskRunnable[numTasks];
 185  8
         int threadNumber = 0;
 186  8
         for (Enumeration e = nestedTasks.elements(); e.hasMoreElements(); 
 187   
              threadNumber++) {
 188  24
             Task nestedTask = (Task) e.nextElement();
 189  24
             ThreadGroup group = new ThreadGroup("parallel");
 190  24
             TaskRunnable taskRunnable 
 191   
                 = new TaskRunnable(threadNumber, nestedTask);
 192  24
             runnables[threadNumber] = taskRunnable;
 193  24
             threads[threadNumber] = new Thread(group, taskRunnable);
 194   
         }
 195   
 
 196  8
         final int maxRunning = numThreads;
 197  8
         Thread[] running = new Thread[maxRunning];
 198  8
         threadNumber = 0;
 199   
         
 200   
         // now run them in limited numbers...
 201  8
         outer:
 202  39
         while (threadNumber < numTasks) {
 203  31
             synchronized (semaphore) {
 204  31
                 for (int i = 0; i < maxRunning; i++) {
 205  46
                     if (running[i] == null || !running[i].isAlive()) {
 206  24
                         running[i] = threads[threadNumber++];
 207  24
                         running[i].start();
 208   
                         // countinue on outer while loop in case we 
 209   
                         // used our last thread
 210  24
                         continue outer;
 211   
                     }
 212   
                 }
 213   
                 // if we got here all are running, so sleep a little
 214  7
                 try {
 215  7
                     semaphore.wait(pollInterval);
 216   
                 } catch (InterruptedException ie) {
 217   
                     // dosen't java know interruptions are rude?
 218   
                     // just pretend it didn't happen and go aobut out business.
 219   
                     // sheesh!
 220   
                 }
 221   
             }
 222   
         }
 223   
             
 224   
         // now join to all the threads 
 225  8
         for (int i = 0; i < maxRunning; ++i) {
 226  20
             try {
 227  20
                 if (running[i] != null) {
 228  17
                     running[i].join();
 229   
                 }
 230   
             } catch (InterruptedException ie) {
 231   
                 // who would interrupt me at a time like this?
 232   
             }
 233   
         }
 234   
         
 235   
         // now did any of the threads throw an exception
 236  8
         StringBuffer exceptionMessage = new StringBuffer();
 237  8
         int numExceptions = 0;
 238  8
         Throwable firstException = null;
 239  8
         Location firstLocation = Location.UNKNOWN_LOCATION;;
 240  8
         for (int i = 0; i < numTasks; ++i) {
 241  24
             Throwable t = runnables[i].getException();
 242  24
             if (t != null) {
 243  1
                 numExceptions++;
 244  1
                 if (firstException == null) {
 245  1
                     firstException = t;
 246   
                 }
 247  1
                 if (t instanceof BuildException && 
 248   
                         firstLocation == Location.UNKNOWN_LOCATION) {
 249  1
                     firstLocation = ((BuildException) t).getLocation();
 250   
                 }
 251  1
                 exceptionMessage.append(StringUtils.LINE_SEP);
 252  1
                 exceptionMessage.append(t.getMessage());
 253   
             }
 254   
         }
 255   
         
 256  8
         if (numExceptions == 1) {
 257  1
             if (firstException instanceof BuildException) {
 258  1
                 throw (BuildException) firstException;
 259   
             } else {
 260  0
                 throw new BuildException(firstException);
 261   
             }
 262  7
         } else if (numExceptions > 1) {
 263  0
             throw new BuildException(exceptionMessage.toString(), 
 264   
                                      firstLocation);
 265   
         }
 266   
     }
 267   
      
 268   
     /**
 269   
      * Determine the number of processors. Only effective on later VMs
 270   
      *
 271   
      * @return the number of processors available or 0 if not determinable.
 272   
      */
 273  1
     private int getNumProcessors() {
 274  1
         try {
 275  1
             Class[] paramTypes = {};
 276  1
             Method availableProcessors =
 277   
                 Runtime.class.getMethod("availableProcessors", paramTypes);
 278   
 
 279  1
             Object[] args = {};
 280  1
             Integer ret = (Integer) availableProcessors.invoke(Runtime.getRuntime(), args);
 281  1
             return ret.intValue();
 282   
         } catch (Exception e) {
 283   
             // return a bogus number
 284  0
             return 0;
 285   
         }
 286   
     }
 287   
 
 288   
     /**
 289   
      * thread that execs a task
 290   
      */
 291   
     private class TaskRunnable implements Runnable {
 292   
         private Throwable exception;
 293   
         private Task task;
 294   
         private int taskNumber;
 295   
 
 296   
         /**
 297   
          * Construct a new TaskRunnable.<p>
 298   
          *
 299   
          * @param task the Task to be executed in a seperate thread
 300   
          */
 301  24
         TaskRunnable(int taskNumber, Task task) {
 302  24
             this.task = task;
 303  24
             this.taskNumber = taskNumber;
 304   
         }
 305   
 
 306   
         /**
 307   
          * Executes the task within a thread and takes care about
 308   
          * Exceptions raised within the task.
 309   
          */
 310  24
         public void run() {
 311  24
             try {
 312  24
                 task.perform();
 313   
             } catch (Throwable t) {
 314  1
                 exception = t;
 315   
             } finally {
 316  24
                 synchronized (semaphore) {
 317  24
                     semaphore.notifyAll();
 318   
                 }
 319   
             }
 320   
         }
 321   
 
 322   
         /**
 323   
          * get any exception that got thrown during execution;
 324   
          * @return an exception or null for no exception/not yet finished
 325   
          */
 326  24
         public Throwable getException() {
 327  24
             return exception;
 328   
         }
 329   
     }
 330   
     
 331   
 }
 332