Coverage Report - org.apache.commons.pipeline.stage.ExtendedBaseStage
 
Classes in this File Line Coverage Branch Coverage Complexity
ExtendedBaseStage
76%
187/245
49%
45/92
0
ExtendedBaseStage$1
100%
2/2
N/A
0
ExtendedBaseStage$2
100%
2/2
N/A
0
ExtendedBaseStage$3
100%
2/2
N/A
0
ExtendedBaseStage$4
100%
2/2
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  *
 9  
  *     http://www.apache.org/licenses/LICENSE-2.0
 10  
  *
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 package org.apache.commons.pipeline.stage;
 18  
 
 19  
 import java.lang.management.ManagementFactory;
 20  
 import java.text.DecimalFormat;
 21  
 import java.text.NumberFormat;
 22  
 import java.util.HashMap;
 23  
 import java.util.Map;
 24  
 import java.util.concurrent.atomic.AtomicInteger;
 25  
 import java.util.concurrent.atomic.AtomicLong;
 26  
 
 27  
 import javax.management.MBeanServer;
 28  
 import javax.management.ObjectName;
 29  
 import javax.management.StandardMBean;
 30  
 
 31  
 import org.apache.commons.lang.time.StopWatch;
 32  
 import org.apache.commons.logging.Log;
 33  
 import org.apache.commons.logging.LogFactory;
 34  
 import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics;
 35  
 import org.apache.commons.pipeline.Feeder;
 36  
 import org.apache.commons.pipeline.Stage;
 37  
 import org.apache.commons.pipeline.StageContext;
 38  
 import org.apache.commons.pipeline.StageException;
 39  
 
 40  
 /**
 41  
  * Base class for pipeline stages. Keeps track of performance statistics and allows
 42  
  * for collection and adjustment via JMX (optional)
 43  
  *
 44  
  * Cannot extend BaseStage because it marks the emit methods as final.
 45  
  *
 46  
  * @author mzsanford
 47  
  */
 48  
 public abstract class ExtendedBaseStage implements Stage, ExtendedBaseStageMBean {
 49  
     /**  Minimum percentage of blocking before we report per-branch stats. */
 50  
     private static final float BRANCH_BLOCK_THRESHOLD = 1.0f;
 51  
     /** Default size of the moving-window average statistics */
 52  
     private static final int DEFAULT_DESCRIPTIVE_STATS_WINDOW_SIZE = 100;
 53  
     /** Default queue name when reporting statistics */
 54  
     private static final String DEFAULT_QUEUE_NAME = "[DefaultQueue]";
 55  
     /** Default number of objects after which a status message is logged */
 56  
     private static final int DEFAULT_STATUS_INTERVAL = 1000;
 57  15
     protected final Log log = LogFactory.getLog( getClass() );
 58  
 
 59  
     protected StageContext stageContext;
 60  
     private Feeder downstreamFeeder;
 61  
     private String stageName;
 62  15
     private final AtomicLong objectsReceived = new AtomicLong(0);
 63  15
     private final AtomicLong unhandledExceptions = new AtomicLong(0);
 64  15
     private final AtomicLong totalServiceTime = new AtomicLong(0);
 65  15
     private final AtomicLong totalEmitTime = new AtomicLong(0);
 66  15
     private final AtomicLong totalEmits = new AtomicLong(0);
 67  15
     private final Map<String, AtomicLong> emitTimeByBranch = new HashMap<String, AtomicLong>();
 68  15
     private int currentStatWindowSize = DEFAULT_DESCRIPTIVE_STATS_WINDOW_SIZE;
 69  
     private SynchronizedDescriptiveStatistics serviceTimeStatistics;
 70  15
     private long statusInterval = DEFAULT_STATUS_INTERVAL;
 71  15
     private Integer statusBatchSize = 1;
 72  15
     private boolean collectBranchStats = false;
 73  15
     private boolean preProcessed = false; // prevent recursion.
 74  15
     private boolean postProcessed = false; // prevent recursion.
 75  15
     private boolean jmxEnabled = true;
 76  
 
 77  
     /**
 78  
      * Class name for status message. Needed because java.util.logging only
 79  
      * reports the base class name.
 80  
      */
 81  15
     private final String className = getClass().getSimpleName();
 82  
 
 83  
     /**
 84  
      * ThreadLocal sum of time spent waiting on blocked queues during the current process call.
 85  
      */
 86  1
     protected static ThreadLocal<AtomicLong> emitTotal = new ThreadLocal<AtomicLong>() {
 87  
         protected synchronized AtomicLong initialValue() {
 88  5
             return new AtomicLong();
 89  
         }
 90  
     };
 91  
 
 92  
     /**
 93  
      * ThreadLocal sum of time spent waiting on blocked queues during the current process call by queue name.
 94  
      */
 95  1
     protected static ThreadLocal<Map<String, AtomicLong>> threadLocalEmitBranchTime = new ThreadLocal<Map<String, AtomicLong>>() {
 96  
         protected synchronized Map<String, AtomicLong> initialValue() {
 97  2
             return new HashMap<String, AtomicLong>();
 98  
         }
 99  
     };
 100  
 
 101  
     /**
 102  
      * ThreadLocal count of emit calls during the current process call.
 103  
      */
 104  1
     protected static ThreadLocal<AtomicInteger> emitCount = new ThreadLocal<AtomicInteger>() {
 105  
         protected synchronized AtomicInteger initialValue() {
 106  5
             return new AtomicInteger();
 107  
         }
 108  
     };
 109  
 
 110  
     /**
 111  
      * ThreadLocal formatter since they are not thread safe.
 112  
      */
 113  1
     protected static ThreadLocal<NumberFormat> floatFormatter = new ThreadLocal<NumberFormat>() {
 114  
         protected synchronized NumberFormat initialValue() {
 115  1
             return new DecimalFormat("##0.000");
 116  
         }
 117  
     };
 118  
 
 119  15
     public ExtendedBaseStage() {
 120  
         // Empty constructor provided for future use.
 121  15
     }
 122  
 
 123  
     public void init( StageContext context ) {
 124  15
         this.stageContext = context;
 125  15
         if (jmxEnabled) {
 126  15
             enableJMX(context);
 127  
         }
 128  15
     }
 129  
 
 130  
     @SuppressWarnings("unchecked")
 131  
     private final void enableJMX(StageContext context) {
 132  15
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 133  15
         if (mbs != null) {
 134  
             // Try to build a unique JMX name.
 135  15
             StringBuilder sb = new StringBuilder("org.apache.commons.pipeline:");
 136  15
             sb.append("class=").append(className);
 137  15
             if (stageName != null) {
 138  0
                 sb.append(",name=").append(stageName);
 139  
             }
 140  
 
 141  
             try {
 142  15
                 ObjectName name = new ObjectName(sb.toString());
 143  15
                 if (mbs.isRegistered(name)) {
 144  13
                     log.info("JMX Overlap. Multiple instances of '" + name + "'. Only one will be registered.");
 145  
                 } else {
 146  2
                     Class mbeanInterface = ExtendedBaseStageMBean.class;
 147  
                     try {
 148  
                         // Find out if the stage has a more specific MBean. Reflection can be slow
 149  
                         // but this operation is pretty fast. Not to mention it's only done at startup.
 150  2
                         Class[] interfaces = getClass().getInterfaces();
 151  2
                         for (int i=0 ; i < interfaces.length; i++) {
 152  0
                             Class current = interfaces[i];
 153  
                             // Only use interfaces that extend ExtendedBaseStageMBean to maintain a minimum
 154  
                             // amount of functionality.
 155  0
                             if (current != ExtendedBaseStageMBean.class
 156  
                                 && ExtendedBaseStageMBean.class.isAssignableFrom(current)) {
 157  0
                                 mbeanInterface = current;
 158  0
                                 break;
 159  
                             }
 160  
                         }
 161  0
                     } catch (Exception e) {
 162  
                         // In the event of security or cast exceptions, default back to base.
 163  0
                         log.info("Reflection error while checking for JMX interfaces.");
 164  
                         // Reset in the off chance it got hosed.
 165  0
                         mbeanInterface = ExtendedBaseStageMBean.class;
 166  2
                     }
 167  
 
 168  2
                     StandardMBean mbean = new StandardMBean(this,
 169  
                                                             mbeanInterface);
 170  2
                     mbs.registerMBean(mbean, name);
 171  2
                     log.info("JMX MBean registered: " + name.toString() + " (" + mbeanInterface.getSimpleName() + ")");
 172  
                 }
 173  0
             } catch (Exception e) {
 174  0
                 log.warn("Failed to register with JMX server",e);
 175  15
             }
 176  
         }
 177  15
     }
 178  
 
 179  
     /**
 180  
      * Called when a stage has been created but before the first object is
 181  
      * sent to the stage for processing. Subclasses
 182  
      * should use the innerPreprocess method, which is called by this method.
 183  
      *
 184  
      * @see org.apache.commons.pipeline.Stage#preprocess()
 185  
      */
 186  
     public final void preprocess() throws StageException {
 187  6
         if ( !preProcessed ) {
 188  6
             serviceTimeStatistics = new SynchronizedDescriptiveStatistics();
 189  6
             serviceTimeStatistics.setWindowSize(currentStatWindowSize);
 190  6
             innerPreprocess();
 191  
         }
 192  6
         preProcessed = true;
 193  6
     }
 194  
 
 195  
     public final void process( Object obj ) throws StageException {
 196  5
         objectsReceived.incrementAndGet();
 197  5
         StopWatch stopWatch = new StopWatch();
 198  5
         stopWatch.start();
 199  
         try {
 200  5
             this.innerProcess( obj );
 201  0
         } catch (Exception e) {
 202  
             // Hate to catch Exception, but don't want anything killing off the thread
 203  
             // and hanging the pipeline.
 204  0
             log.error("Uncaught exception in " + className + ": " + e.getMessage(), e);
 205  0
             unhandledExceptions.incrementAndGet();
 206  5
         }
 207  5
         stopWatch.stop();
 208  
 
 209  5
         long totalTime = stopWatch.getTime();
 210  5
         totalServiceTime.addAndGet(totalTime);
 211  
 
 212  
         // I hate to synchronize anything in the base class, but this
 213  
         // call should be very fast and is not thread safe.
 214  5
         serviceTimeStatistics.addValue(totalTime);
 215  
 
 216  
         // Use ThreadLocals so that the stats only reflect process
 217  
         // calls that have completed. Otherwise the emit times can
 218  
         // exceed the process times.
 219  5
         totalEmits.addAndGet(emitCount.get().intValue());
 220  5
         totalEmitTime.addAndGet(emitTotal.get().longValue());
 221  5
         emitCount.remove();
 222  5
         emitTotal.remove();
 223  
 
 224  5
         if (collectBranchStats) {
 225  2
             for (Map.Entry<String, AtomicLong> entry : threadLocalEmitBranchTime.get().entrySet()) {
 226  2
                 if (emitTimeByBranch.containsKey(entry.getKey())) {
 227  0
                     emitTimeByBranch.get(entry.getKey()).addAndGet(entry.getValue().longValue());
 228  
                 } else {
 229  
                     // Race condition. containsKey could return false and another thread could insert
 230  
                     // here. We can synchronize here and we will very rarely hit this block. Only the first
 231  
                     // time each queue is accessed.
 232  2
                     synchronized (emitTimeByBranch) {
 233  
                         // Double check for the race condition.
 234  2
                         if (emitTimeByBranch.containsKey(entry.getKey())) {
 235  0
                             emitTimeByBranch.get(entry.getKey()).addAndGet(entry.getValue().longValue());
 236  
                         } else {
 237  2
                             emitTimeByBranch.put(entry.getKey(), new AtomicLong(entry.getValue().longValue()));
 238  
                         }
 239  2
                     }
 240  
                 }
 241  
             }
 242  2
             threadLocalEmitBranchTime.remove();
 243  
         }
 244  
 
 245  5
         if ( objectsReceived.longValue() % statusInterval == 0 ) {
 246  0
             logStatus();
 247  
         }
 248  5
     }
 249  
 
 250  
     /**
 251  
      * Convenience method to feed the specified object to the next stage downstream.
 252  
      */
 253  
     public final void emit( Object obj ) {
 254  2
         if ( log.isDebugEnabled() ) {
 255  2
             log.debug( this.getClass() + " is emitting object " + obj );
 256  
         }
 257  2
         if ( this.downstreamFeeder == null ) {
 258  2
             synchronized (this) {
 259  
                 // Lazy init the default feeder.
 260  2
                 this.downstreamFeeder = stageContext.getDownstreamFeeder( this );
 261  2
             }
 262  
         }
 263  2
         feed( DEFAULT_QUEUE_NAME, downstreamFeeder, obj );
 264  2
     }
 265  
 
 266  
     /**
 267  
      * Convenience method to feed the specified object to the first stage of the specified branch.
 268  
      */
 269  
     public final void emit( String branch, Object obj ) {
 270  2
         Feeder feeder = this.stageContext.getBranchFeeder( branch );
 271  2
         feed( branch, feeder, obj );
 272  2
     }
 273  
 
 274  
     private void feed(String name, Feeder feeder, Object obj ) {
 275  4
         if ( feeder == null ) {
 276  
             // The pipeline code should never allow this to happen.
 277  0
             String objectType = ( obj != null ? obj.getClass().getSimpleName() : "null" );
 278  0
             log.error( "Ignoring attempt to emit " + objectType +
 279  
                        " object to invalid feeder" );
 280  0
             return;
 281  
         }
 282  4
         StopWatch emitWatch = new StopWatch();
 283  4
         emitWatch.start();
 284  
 
 285  
         // Pass the emitted object to the next stage (default or branch)
 286  4
         feeder.feed( obj );
 287  
 
 288  4
         emitWatch.stop();
 289  
 
 290  
         // Use ThreadLocal variables so the emit totals do not
 291  
         // go up until the process call completes.
 292  4
         emitTotal.get().addAndGet( emitWatch.getTime() );
 293  4
         emitCount.get().incrementAndGet();
 294  
 
 295  4
         if (collectBranchStats) {
 296  2
             if (! threadLocalEmitBranchTime.get().containsKey(name)) {
 297  2
                 AtomicLong currentTotal = new AtomicLong(emitWatch.getTime());
 298  2
                 threadLocalEmitBranchTime.get().put(name, currentTotal);
 299  2
             } else {
 300  0
                 threadLocalEmitBranchTime.get().get(name).addAndGet(emitWatch.getTime());
 301  
             }
 302  
         }
 303  4
     }
 304  
 
 305  
     /**
 306  
      * Called when a stage has completed all processing. Subclasses
 307  
      * should use the innerPostprocess method, which is called by this method.
 308  
      *
 309  
      * @see org.apache.commons.pipeline.Stage#postprocess()
 310  
      */
 311  
     public final void postprocess() throws StageException {
 312  1
         if ( !postProcessed ) {
 313  1
             logStatus();
 314  1
             innerPostprocess();
 315  
         }
 316  1
         postProcessed = true;
 317  1
     }
 318  
 
 319  
     public void release() {
 320  
         // No-op implementation to fulfill interface
 321  0
     }
 322  
 
 323  
     public abstract void innerProcess( Object obj )
 324  
       throws StageException;
 325  
 
 326  
     public void innerPreprocess() throws StageException {
 327  
         // No-op default implementation.
 328  0
     }
 329  
 
 330  
     public void innerPostprocess() throws StageException {
 331  
         // No-op default implementation.
 332  0
     }
 333  
 
 334  
     /**
 335  
      * Class-specific status message. Null or empty status' will be ignored.
 336  
      */
 337  
     public abstract String status();
 338  
 
 339  
     public void logStatus() {
 340  1
         String logMessage = getStatusMessage();
 341  1
         log.info(logMessage);
 342  1
     }
 343  
 
 344  
     /**
 345  
      * @return Log message including both base stage and class specific stats.
 346  
      */
 347  
     public String getStatusMessage() {
 348  5
         StringBuilder sb = new StringBuilder( 512 );
 349  5
         NumberFormat formatter = floatFormatter.get();
 350  
 
 351  5
         float serviceTime = ( totalServiceTime.floatValue() / 1000.0f );
 352  5
         float emitTime = ( totalEmitTime.floatValue() / 1000.0f );
 353  5
         float netServiceTime = ( serviceTime - emitTime );
 354  
 
 355  5
         float emitPercentage = 0.0f;
 356  5
         float emitFloat = totalEmits.floatValue();
 357  5
         float recvFloat = objectsReceived.floatValue();
 358  5
         if (recvFloat > 0) {
 359  4
             emitPercentage = (emitFloat / recvFloat)*100.0f;
 360  
         }
 361  
 
 362  5
         sb.append( "\n\t === " ).append( className ).append( " Generic Stats === " );
 363  
 
 364  5
         if (statusBatchSize > 1) {
 365  0
             sb.append("\n\tStatus Batch Size (all /obj and /sec include this): ").append(statusBatchSize);
 366  
         }
 367  
 
 368  5
         sb.append( "\n\tTotal objects received:" ).append( objectsReceived );
 369  5
         sb.append( "\n\tTotal unhandled exceptions:" ).append( unhandledExceptions );
 370  5
         sb.append( "\n\tTotal objects emitted:" ).append( totalEmits );
 371  5
         if (emitFloat > 0) {
 372  3
             sb.append(" (").append(formatter.format(emitPercentage)).append("%)");
 373  
         }
 374  5
         sb.append( "\n\tTotal gross processing time (sec):" )
 375  
           .append( formatter.format( serviceTime ) );
 376  5
         sb.append( "\n\tTotal emit blocked time (sec):" )
 377  
           .append( formatter.format( emitTime ) );
 378  5
         sb.append( "\n\tTotal net processing time (sec):" )
 379  
           .append( formatter.format( netServiceTime ) );
 380  
 
 381  5
         float avgServiceTime = 0;
 382  5
         float avgEmitTime = 0;
 383  5
         float avgNetServiceTime = 0;
 384  5
         if ( objectsReceived.longValue() > 0 ) {
 385  4
             avgServiceTime = ( serviceTime / objectsReceived.floatValue()/statusBatchSize );
 386  4
             avgEmitTime = ( emitTime / objectsReceived.floatValue()/statusBatchSize );
 387  4
             avgNetServiceTime = ( netServiceTime / objectsReceived.floatValue()/statusBatchSize );
 388  
         }
 389  
 
 390  5
         sb.append( "\n\tAverage gross processing time (sec/obj):" )
 391  
           .append( formatter.format( avgServiceTime ) );
 392  5
         sb.append( "\n\tAverage emit blocked time (sec/obj):" )
 393  
           .append( formatter.format( avgEmitTime ) );
 394  5
         sb.append( "\n\tAverage net processing time (sec/obj):" )
 395  
           .append( formatter.format( avgNetServiceTime ) );
 396  
 
 397  5
         if (serviceTimeStatistics != null) {
 398  5
             long count = serviceTimeStatistics.getN();
 399  5
             if (count > 0) {
 400  4
                 double avgMillis = getCurrentServiceTimeAverage()/(float)statusBatchSize;
 401  4
                 sb.append( "\n\tAverage gross processing time in last ")
 402  
                   .append(count)
 403  
                   .append(" (sec/obj):" )
 404  
                   .append( formatter.format( avgMillis/1000 ) );
 405  
             }
 406  
         }
 407  
 
 408  5
         float grossThroughput = 0;
 409  5
         if ( avgServiceTime > 0 ) {
 410  2
             grossThroughput = ( 1.0f / avgServiceTime );
 411  
         }
 412  5
         float netThroughput = 0;
 413  5
         if ( avgNetServiceTime > 0 ) {
 414  1
             netThroughput = ( 1.0f / avgNetServiceTime );
 415  
         }
 416  
 
 417  5
         sb.append( "\n\tGross throughput (obj/sec):" )
 418  
           .append( formatter.format( grossThroughput ) );
 419  5
         sb.append( "\n\tNet throughput (obj/sec):" )
 420  
           .append( formatter.format( netThroughput ) );
 421  
 
 422  5
         float percWorking = 0;
 423  5
         float percBlocking = 0;
 424  5
         if ( serviceTime > 0 ) {
 425  2
             percWorking = ( netServiceTime / serviceTime ) * 100;
 426  2
             percBlocking = ( emitTime / serviceTime ) * 100;
 427  
         }
 428  
 
 429  5
         sb.append( "\n\t% time working:" ).append( formatter.format( percWorking ) );
 430  5
         sb.append( "\n\t% time blocking:" ).append( formatter.format( percBlocking ) );
 431  
 
 432  
         // No need to output for a non-branching stage or if there was very little
 433  
         // blocking (as defined in the constant)
 434  5
         if (collectBranchStats && emitTimeByBranch.size() > 1 && percBlocking >= BRANCH_BLOCK_THRESHOLD) {
 435  
             try {
 436  1
                 for (Map.Entry<String, AtomicLong> entry : emitTimeByBranch.entrySet()) {
 437  2
                     float branchBlockSec = (entry.getValue().floatValue()/1000.0f);
 438  2
                     float branchBlockPerc = (branchBlockSec/emitTime) * 100;
 439  2
                     sb.append("\n\t\t% branch ").append(entry.getKey()).append(":").append(formatter.format(branchBlockPerc));
 440  2
                 }
 441  0
             } catch (RuntimeException e) {
 442  
                 // Synchronizing would be slow, ConcurrentMod is possible but unlikely since the map is
 443  
                 // only modified the first time a stage is emitted to so just catch and
 444  
                 // log it. No need to stop all processing over a reporting failure.
 445  0
                 sb.append("\n\t\tproblem getting per-branch stats: ").append(e.getMessage());
 446  1
             }
 447  
         }
 448  
 
 449  5
         String stageSpecificStatus = this.status();
 450  5
         if ( stageSpecificStatus != null && stageSpecificStatus.length() > 0 ) {
 451  0
             sb.append( "\n\t === " )
 452  
               .append( className )
 453  
               .append( " Specific Stats === " );
 454  0
             sb.append( stageSpecificStatus );
 455  
         }
 456  
 
 457  5
         return sb.toString();
 458  
     }
 459  
 
 460  
     protected String formatTotalTimeStat( String name, AtomicLong totalTime ) {
 461  0
         return formatTotalTimeStat( name, totalTime.longValue() );
 462  
     }
 463  
 
 464  
     protected String formatTotalTimeStat( String name, long totalTime ) {
 465  0
         if ( name == null || totalTime < 0 ) {
 466  0
             return "";
 467  
         }
 468  0
         NumberFormat formatter = floatFormatter.get();
 469  0
         StringBuilder sb = new StringBuilder();
 470  
         // Total processing time minus calls to emit.
 471  0
         float totalSec = totalTime / 1000.0f;
 472  0
         float average = 0;
 473  0
         if ( getObjectsReceived() > 0 ) {
 474  0
             average = totalSec / getObjectsReceived() / (float)statusBatchSize;
 475  
         }
 476  
 
 477  0
         if ( log.isDebugEnabled() ) {
 478  0
             sb.append( "\n\tTotal " )
 479  
               .append( name )
 480  
               .append( " processing time (sec):" )
 481  
               .append( formatter.format( totalSec ) );
 482  
         }
 483  
 
 484  0
         sb.append( "\n\tAverage " )
 485  
           .append( name )
 486  
           .append( " processing time (sec/obj):" )
 487  
           .append( formatter.format( average ) );
 488  
 
 489  0
         if ( log.isDebugEnabled() && average > 0 ) {
 490  0
             float throughput = ( 1.0f ) / average * (float)statusBatchSize;
 491  0
             sb.append( "\n\tThroughput for " )
 492  
               .append( name )
 493  
               .append( " (obj/sec):" )
 494  
               .append( formatter.format( throughput ) );
 495  
         }
 496  
 
 497  0
         return sb.toString();
 498  
     }
 499  
 
 500  
     protected String formatCounterStat( String name, AtomicInteger count ) {
 501  0
         return formatCounterStat(name, count.get());
 502  
     }
 503  
 
 504  
     protected String formatCounterStat( String name, AtomicLong count ) {
 505  0
         return formatCounterStat(name, count.get());
 506  
     }
 507  
 
 508  
     protected String formatCounterStat( String name, long count ) {
 509  0
         if ( name == null || count < 0 || getObjectsReceived() <= 0) {
 510  0
             return "";
 511  
         }
 512  0
         NumberFormat formatter = floatFormatter.get();
 513  0
         StringBuilder sb = new StringBuilder();
 514  
 
 515  0
         float perc = ((float)count*(float)statusBatchSize/(float)getObjectsReceived())*100.0f;
 516  
 
 517  0
         sb.append( "\n\tNumber of " )
 518  
           .append( name )
 519  
           .append( " (" )
 520  
           .append( formatter.format(perc) )
 521  
           .append( "%) :")
 522  
           .append( count );
 523  
 
 524  0
         return sb.toString();
 525  
     }
 526  
 
 527  
     /**
 528  
      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getStatusInterval()
 529  
      */
 530  
     public Long getStatusInterval() {
 531  1
         return Long.valueOf(statusInterval);
 532  
     }
 533  
 
 534  
     /**
 535  
      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#setStatusInterval(long)
 536  
      */
 537  
     public void setStatusInterval( Long statusInterval ) {
 538  1
         this.statusInterval = statusInterval;
 539  1
     }
 540  
 
 541  
     public Integer getStatusBatchSize() {
 542  1
         return statusBatchSize;
 543  
     }
 544  
 
 545  
     public void setStatusBatchSize(Integer statusBatchSize) {
 546  1
         this.statusBatchSize = statusBatchSize;
 547  1
     }
 548  
 
 549  
     /**
 550  
      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getObjectsReceived()
 551  
      */
 552  
     public long getObjectsReceived() {
 553  8
         return objectsReceived.longValue();
 554  
     }
 555  
 
 556  
     /**
 557  
      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalServiceTime()
 558  
      */
 559  
     public long getTotalServiceTime() {
 560  0
         return totalServiceTime.longValue();
 561  
     }
 562  
 
 563  
     /**
 564  
      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmitTime()
 565  
      */
 566  
     public long getTotalEmitTime() {
 567  0
         return totalEmitTime.longValue();
 568  
     }
 569  
 
 570  
     /**
 571  
      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getTotalEmits()
 572  
      */
 573  
     public long getTotalEmits() {
 574  6
         return totalEmits.longValue();
 575  
     }
 576  
 
 577  
     /**
 578  
      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#getCollectBranchStats()
 579  
      */
 580  
     public Boolean getCollectBranchStats() {
 581  1
         return collectBranchStats;
 582  
     }
 583  
 
 584  
     /**
 585  
      * @see org.apache.commons.pipeline.ExtendedBaseStageMBean#setCollectBranchStats(Boolean)
 586  
      */
 587  
     public void setCollectBranchStats(Boolean collectBranchStats) {
 588  2
         this.collectBranchStats = collectBranchStats;
 589  2
     }
 590  
 
 591  
     public Integer getCurrentStatWindowSize() {
 592  1
         return Integer.valueOf(currentStatWindowSize);
 593  
     }
 594  
 
 595  
     public void setCurrentStatWindowSize(Integer newStatWindowSize) {
 596  1
         if (serviceTimeStatistics != null
 597  
                 && newStatWindowSize != this.currentStatWindowSize) {
 598  0
             synchronized (serviceTimeStatistics) {
 599  0
                 serviceTimeStatistics.setWindowSize(newStatWindowSize);
 600  0
             }
 601  
         }
 602  1
         this.currentStatWindowSize = newStatWindowSize;
 603  1
     }
 604  
 
 605  
     public String getStageName() {
 606  1
         return stageName;
 607  
     }
 608  
 
 609  
     public void setStageName(String name) {
 610  1
         this.stageName = name;
 611  1
     }
 612  
 
 613  
     public boolean isJmxEnabled() {
 614  2
         return jmxEnabled;
 615  
     }
 616  
 
 617  
     public void setJmxEnabled(boolean jmxEnabled) {
 618  2
         this.jmxEnabled = jmxEnabled;
 619  2
     }
 620  
 
 621  
     /**
 622  
      * Returns a moving average of the service time. This does not yet take into account time spent
 623  
      * calling emit, nor does it return minimum, maximum or other statistical information at this time.
 624  
      *
 625  
      * @return Average time to process the last <code>currentStatWindowSize</code> objects.
 626  
      */
 627  
     public double getCurrentServiceTimeAverage() {
 628  4
         double avg = -1.0d;
 629  
 
 630  
         // Hate to synchronize in the base class, but this should be very quick.
 631  4
         avg = serviceTimeStatistics.getMean();
 632  
 
 633  4
         return avg;
 634  
     }
 635  
 }