1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
package org.apache.commons.pipeline.stage; |
18 | |
|
19 | |
import java.lang.management.ManagementFactory; |
20 | |
import java.text.DecimalFormat; |
21 | |
import java.text.NumberFormat; |
22 | |
import java.util.HashMap; |
23 | |
import java.util.Map; |
24 | |
import java.util.concurrent.atomic.AtomicInteger; |
25 | |
import java.util.concurrent.atomic.AtomicLong; |
26 | |
|
27 | |
import javax.management.MBeanServer; |
28 | |
import javax.management.ObjectName; |
29 | |
import javax.management.StandardMBean; |
30 | |
|
31 | |
import org.apache.commons.lang.time.StopWatch; |
32 | |
import org.apache.commons.logging.Log; |
33 | |
import org.apache.commons.logging.LogFactory; |
34 | |
import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics; |
35 | |
import org.apache.commons.pipeline.Feeder; |
36 | |
import org.apache.commons.pipeline.Stage; |
37 | |
import org.apache.commons.pipeline.StageContext; |
38 | |
import org.apache.commons.pipeline.StageException; |
39 | |
|
40 | |
|
41 | |
|
42 | |
|
43 | |
|
44 | |
|
45 | |
|
46 | |
|
47 | |
|
48 | |
public abstract class ExtendedBaseStage implements Stage, ExtendedBaseStageMBean { |
49 | |
|
50 | |
private static final float BRANCH_BLOCK_THRESHOLD = 1.0f; |
51 | |
|
52 | |
private static final int DEFAULT_DESCRIPTIVE_STATS_WINDOW_SIZE = 100; |
53 | |
|
54 | |
private static final String DEFAULT_QUEUE_NAME = "[DefaultQueue]"; |
55 | |
|
56 | |
private static final int DEFAULT_STATUS_INTERVAL = 1000; |
57 | 15 | protected final Log log = LogFactory.getLog( getClass() ); |
58 | |
|
59 | |
protected StageContext stageContext; |
60 | |
private Feeder downstreamFeeder; |
61 | |
private String stageName; |
62 | 15 | private final AtomicLong objectsReceived = new AtomicLong(0); |
63 | 15 | private final AtomicLong unhandledExceptions = new AtomicLong(0); |
64 | 15 | private final AtomicLong totalServiceTime = new AtomicLong(0); |
65 | 15 | private final AtomicLong totalEmitTime = new AtomicLong(0); |
66 | 15 | private final AtomicLong totalEmits = new AtomicLong(0); |
67 | 15 | private final Map<String, AtomicLong> emitTimeByBranch = new HashMap<String, AtomicLong>(); |
68 | 15 | private int currentStatWindowSize = DEFAULT_DESCRIPTIVE_STATS_WINDOW_SIZE; |
69 | |
private SynchronizedDescriptiveStatistics serviceTimeStatistics; |
70 | 15 | private long statusInterval = DEFAULT_STATUS_INTERVAL; |
71 | 15 | private Integer statusBatchSize = 1; |
72 | 15 | private boolean collectBranchStats = false; |
73 | 15 | private boolean preProcessed = false; |
74 | 15 | private boolean postProcessed = false; |
75 | 15 | private boolean jmxEnabled = true; |
76 | |
|
77 | |
|
78 | |
|
79 | |
|
80 | |
|
81 | 15 | private final String className = getClass().getSimpleName(); |
82 | |
|
83 | |
|
84 | |
|
85 | |
|
86 | 1 | protected static ThreadLocal<AtomicLong> emitTotal = new ThreadLocal<AtomicLong>() { |
87 | |
protected synchronized AtomicLong initialValue() { |
88 | 5 | return new AtomicLong(); |
89 | |
} |
90 | |
}; |
91 | |
|
92 | |
|
93 | |
|
94 | |
|
95 | 1 | protected static ThreadLocal<Map<String, AtomicLong>> threadLocalEmitBranchTime = new ThreadLocal<Map<String, AtomicLong>>() { |
96 | |
protected synchronized Map<String, AtomicLong> initialValue() { |
97 | 2 | return new HashMap<String, AtomicLong>(); |
98 | |
} |
99 | |
}; |
100 | |
|
101 | |
|
102 | |
|
103 | |
|
104 | 1 | protected static ThreadLocal<AtomicInteger> emitCount = new ThreadLocal<AtomicInteger>() { |
105 | |
protected synchronized AtomicInteger initialValue() { |
106 | 5 | return new AtomicInteger(); |
107 | |
} |
108 | |
}; |
109 | |
|
110 | |
|
111 | |
|
112 | |
|
113 | 1 | protected static ThreadLocal<NumberFormat> floatFormatter = new ThreadLocal<NumberFormat>() { |
114 | |
protected synchronized NumberFormat initialValue() { |
115 | 1 | return new DecimalFormat("##0.000"); |
116 | |
} |
117 | |
}; |
118 | |
|
119 | 15 | public ExtendedBaseStage() { |
120 | |
|
121 | 15 | } |
122 | |
|
123 | |
public void init( StageContext context ) { |
124 | 15 | this.stageContext = context; |
125 | 15 | if (jmxEnabled) { |
126 | 15 | enableJMX(context); |
127 | |
} |
128 | 15 | } |
129 | |
|
130 | |
@SuppressWarnings("unchecked") |
131 | |
private final void enableJMX(StageContext context) { |
132 | 15 | MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); |
133 | 15 | if (mbs != null) { |
134 | |
|
135 | 15 | StringBuilder sb = new StringBuilder("org.apache.commons.pipeline:"); |
136 | 15 | sb.append("class=").append(className); |
137 | 15 | if (stageName != null) { |
138 | 0 | sb.append(",name=").append(stageName); |
139 | |
} |
140 | |
|
141 | |
try { |
142 | 15 | ObjectName name = new ObjectName(sb.toString()); |
143 | 15 | if (mbs.isRegistered(name)) { |
144 | 13 | log.info("JMX Overlap. Multiple instances of '" + name + "'. Only one will be registered."); |
145 | |
} else { |
146 | 2 | Class mbeanInterface = ExtendedBaseStageMBean.class; |
147 | |
try { |
148 | |
|
149 | |
|
150 | 2 | Class[] interfaces = getClass().getInterfaces(); |
151 | 2 | for (int i=0 ; i < interfaces.length; i++) { |
152 | 0 | Class current = interfaces[i]; |
153 | |
|
154 | |
|
155 | 0 | if (current != ExtendedBaseStageMBean.class |
156 | |
&& ExtendedBaseStageMBean.class.isAssignableFrom(current)) { |
157 | 0 | mbeanInterface = current; |
158 | 0 | break; |
159 | |
} |
160 | |
} |
161 | 0 | } catch (Exception e) { |
162 | |
|
163 | 0 | log.info("Reflection error while checking for JMX interfaces."); |
164 | |
|
165 | 0 | mbeanInterface = ExtendedBaseStageMBean.class; |
166 | 2 | } |
167 | |
|
168 | 2 | StandardMBean mbean = new StandardMBean(this, |
169 | |
mbeanInterface); |
170 | 2 | mbs.registerMBean(mbean, name); |
171 | 2 | log.info("JMX MBean registered: " + name.toString() + " (" + mbeanInterface.getSimpleName() + ")"); |
172 | |
} |
173 | 0 | } catch (Exception e) { |
174 | 0 | log.warn("Failed to register with JMX server",e); |
175 | 15 | } |
176 | |
} |
177 | 15 | } |
178 | |
|
179 | |
|
180 | |
|
181 | |
|
182 | |
|
183 | |
|
184 | |
|
185 | |
|
186 | |
public final void preprocess() throws StageException { |
187 | 6 | if ( !preProcessed ) { |
188 | 6 | serviceTimeStatistics = new SynchronizedDescriptiveStatistics(); |
189 | 6 | serviceTimeStatistics.setWindowSize(currentStatWindowSize); |
190 | 6 | innerPreprocess(); |
191 | |
} |
192 | 6 | preProcessed = true; |
193 | 6 | } |
194 | |
|
195 | |
public final void process( Object obj ) throws StageException { |
196 | 5 | objectsReceived.incrementAndGet(); |
197 | 5 | StopWatch stopWatch = new StopWatch(); |
198 | 5 | stopWatch.start(); |
199 | |
try { |
200 | 5 | this.innerProcess( obj ); |
201 | 0 | } catch (Exception e) { |
202 | |
|
203 | |
|
204 | 0 | log.error("Uncaught exception in " + className + ": " + e.getMessage(), e); |
205 | 0 | unhandledExceptions.incrementAndGet(); |
206 | 5 | } |
207 | 5 | stopWatch.stop(); |
208 | |
|
209 | 5 | long totalTime = stopWatch.getTime(); |
210 | 5 | totalServiceTime.addAndGet(totalTime); |
211 | |
|
212 | |
|
213 | |
|
214 | 5 | serviceTimeStatistics.addValue(totalTime); |
215 | |
|
216 | |
|
217 | |
|
218 | |
|
219 | 5 | totalEmits.addAndGet(emitCount.get().intValue()); |
220 | 5 | totalEmitTime.addAndGet(emitTotal.get().longValue()); |
221 | 5 | emitCount.remove(); |
222 | 5 | emitTotal.remove(); |
223 | |
|
224 | 5 | if (collectBranchStats) { |
225 | 2 | for (Map.Entry<String, AtomicLong> entry : threadLocalEmitBranchTime.get().entrySet()) { |
226 | 2 | if (emitTimeByBranch.containsKey(entry.getKey())) { |
227 | 0 | emitTimeByBranch.get(entry.getKey()).addAndGet(entry.getValue().longValue()); |
228 | |
} else { |
229 | |
|
230 | |
|
231 | |
|
232 | 2 | synchronized (emitTimeByBranch) { |
233 | |
|
234 | 2 | if (emitTimeByBranch.containsKey(entry.getKey())) { |
235 | 0 | emitTimeByBranch.get(entry.getKey()).addAndGet(entry.getValue().longValue()); |
236 | |
} else { |
237 | 2 | emitTimeByBranch.put(entry.getKey(), new AtomicLong(entry.getValue().longValue())); |
238 | |
} |
239 | 2 | } |
240 | |
} |
241 | |
} |
242 | 2 | threadLocalEmitBranchTime.remove(); |
243 | |
} |
244 | |
|
245 | 5 | if ( objectsReceived.longValue() % statusInterval == 0 ) { |
246 | 0 | logStatus(); |
247 | |
} |
248 | 5 | } |
249 | |
|
250 | |
|
251 | |
|
252 | |
|
253 | |
public final void emit( Object obj ) { |
254 | 2 | if ( log.isDebugEnabled() ) { |
255 | 2 | log.debug( this.getClass() + " is emitting object " + obj ); |
256 | |
} |
257 | 2 | if ( this.downstreamFeeder == null ) { |
258 | 2 | synchronized (this) { |
259 | |
|
260 | 2 | this.downstreamFeeder = stageContext.getDownstreamFeeder( this ); |
261 | 2 | } |
262 | |
} |
263 | 2 | feed( DEFAULT_QUEUE_NAME, downstreamFeeder, obj ); |
264 | 2 | } |
265 | |
|
266 | |
|
267 | |
|
268 | |
|
269 | |
public final void emit( String branch, Object obj ) { |
270 | 2 | Feeder feeder = this.stageContext.getBranchFeeder( branch ); |
271 | 2 | feed( branch, feeder, obj ); |
272 | 2 | } |
273 | |
|
274 | |
private void feed(String name, Feeder feeder, Object obj ) { |
275 | 4 | if ( feeder == null ) { |
276 | |
|
277 | 0 | String objectType = ( obj != null ? obj.getClass().getSimpleName() : "null" ); |
278 | 0 | log.error( "Ignoring attempt to emit " + objectType + |
279 | |
" object to invalid feeder" ); |
280 | 0 | return; |
281 | |
} |
282 | 4 | StopWatch emitWatch = new StopWatch(); |
283 | 4 | emitWatch.start(); |
284 | |
|
285 | |
|
286 | 4 | feeder.feed( obj ); |
287 | |
|
288 | 4 | emitWatch.stop(); |
289 | |
|
290 | |
|
291 | |
|
292 | 4 | emitTotal.get().addAndGet( emitWatch.getTime() ); |
293 | 4 | emitCount.get().incrementAndGet(); |
294 | |
|
295 | 4 | if (collectBranchStats) { |
296 | 2 | if (! threadLocalEmitBranchTime.get().containsKey(name)) { |
297 | 2 | AtomicLong currentTotal = new AtomicLong(emitWatch.getTime()); |
298 | 2 | threadLocalEmitBranchTime.get().put(name, currentTotal); |
299 | 2 | } else { |
300 | 0 | threadLocalEmitBranchTime.get().get(name).addAndGet(emitWatch.getTime()); |
301 | |
} |
302 | |
} |
303 | 4 | } |
304 | |
|
305 | |
|
306 | |
|
307 | |
|
308 | |
|
309 | |
|
310 | |
|
311 | |
public final void postprocess() throws StageException { |
312 | 1 | if ( !postProcessed ) { |
313 | 1 | logStatus(); |
314 | 1 | innerPostprocess(); |
315 | |
} |
316 | 1 | postProcessed = true; |
317 | 1 | } |
318 | |
|
319 | |
public void release() { |
320 | |
|
321 | 0 | } |
322 | |
|
323 | |
public abstract void innerProcess( Object obj ) |
324 | |
throws StageException; |
325 | |
|
326 | |
public void innerPreprocess() throws StageException { |
327 | |
|
328 | 0 | } |
329 | |
|
330 | |
public void innerPostprocess() throws StageException { |
331 | |
|
332 | 0 | } |
333 | |
|
334 | |
|
335 | |
|
336 | |
|
337 | |
public abstract String status(); |
338 | |
|
339 | |
public void logStatus() { |
340 | 1 | String logMessage = getStatusMessage(); |
341 | 1 | log.info(logMessage); |
342 | 1 | } |
343 | |
|
344 | |
|
345 | |
|
346 | |
|
347 | |
public String getStatusMessage() { |
348 | 5 | StringBuilder sb = new StringBuilder( 512 ); |
349 | 5 | NumberFormat formatter = floatFormatter.get(); |
350 | |
|
351 | 5 | float serviceTime = ( totalServiceTime.floatValue() / 1000.0f ); |
352 | 5 | float emitTime = ( totalEmitTime.floatValue() / 1000.0f ); |
353 | 5 | float netServiceTime = ( serviceTime - emitTime ); |
354 | |
|
355 | 5 | float emitPercentage = 0.0f; |
356 | 5 | float emitFloat = totalEmits.floatValue(); |
357 | 5 | float recvFloat = objectsReceived.floatValue(); |
358 | 5 | if (recvFloat > 0) { |
359 | 4 | emitPercentage = (emitFloat / recvFloat)*100.0f; |
360 | |
} |
361 | |
|
362 | 5 | sb.append( "\n\t === " ).append( className ).append( " Generic Stats === " ); |
363 | |
|
364 | 5 | if (statusBatchSize > 1) { |
365 | 0 | sb.append("\n\tStatus Batch Size (all /obj and /sec include this): ").append(statusBatchSize); |
366 | |
} |
367 | |
|
368 | 5 | sb.append( "\n\tTotal objects received:" ).append( objectsReceived ); |
369 | 5 | sb.append( "\n\tTotal unhandled exceptions:" ).append( unhandledExceptions ); |
370 | 5 | sb.append( "\n\tTotal objects emitted:" ).append( totalEmits ); |
371 | 5 | if (emitFloat > 0) { |
372 | 3 | sb.append(" (").append(formatter.format(emitPercentage)).append("%)"); |
373 | |
} |
374 | 5 | sb.append( "\n\tTotal gross processing time (sec):" ) |
375 | |
.append( formatter.format( serviceTime ) ); |
376 | 5 | sb.append( "\n\tTotal emit blocked time (sec):" ) |
377 | |
.append( formatter.format( emitTime ) ); |
378 | 5 | sb.append( "\n\tTotal net processing time (sec):" ) |
379 | |
.append( formatter.format( netServiceTime ) ); |
380 | |
|
381 | 5 | float avgServiceTime = 0; |
382 | 5 | float avgEmitTime = 0; |
383 | 5 | float avgNetServiceTime = 0; |
384 | 5 | if ( objectsReceived.longValue() > 0 ) { |
385 | 4 | avgServiceTime = ( serviceTime / objectsReceived.floatValue()/statusBatchSize ); |
386 | 4 | avgEmitTime = ( emitTime / objectsReceived.floatValue()/statusBatchSize ); |
387 | 4 | avgNetServiceTime = ( netServiceTime / objectsReceived.floatValue()/statusBatchSize ); |
388 | |
} |
389 | |
|
390 | 5 | sb.append( "\n\tAverage gross processing time (sec/obj):" ) |
391 | |
.append( formatter.format( avgServiceTime ) ); |
392 | 5 | sb.append( "\n\tAverage emit blocked time (sec/obj):" ) |
393 | |
.append( formatter.format( avgEmitTime ) ); |
394 | 5 | sb.append( "\n\tAverage net processing time (sec/obj):" ) |
395 | |
.append( formatter.format( avgNetServiceTime ) ); |
396 | |
|
397 | 5 | if (serviceTimeStatistics != null) { |
398 | 5 | long count = serviceTimeStatistics.getN(); |
399 | 5 | if (count > 0) { |
400 | 4 | double avgMillis = getCurrentServiceTimeAverage()/(float)statusBatchSize; |
401 | 4 | sb.append( "\n\tAverage gross processing time in last ") |
402 | |
.append(count) |
403 | |
.append(" (sec/obj):" ) |
404 | |
.append( formatter.format( avgMillis/1000 ) ); |
405 | |
} |
406 | |
} |
407 | |
|
408 | 5 | float grossThroughput = 0; |
409 | 5 | if ( avgServiceTime > 0 ) { |
410 | 2 | grossThroughput = ( 1.0f / avgServiceTime ); |
411 | |
} |
412 | 5 | float netThroughput = 0; |
413 | 5 | if ( avgNetServiceTime > 0 ) { |
414 | 1 | netThroughput = ( 1.0f / avgNetServiceTime ); |
415 | |
} |
416 | |
|
417 | 5 | sb.append( "\n\tGross throughput (obj/sec):" ) |
418 | |
.append( formatter.format( grossThroughput ) ); |
419 | 5 | sb.append( "\n\tNet throughput (obj/sec):" ) |
420 | |
.append( formatter.format( netThroughput ) ); |
421 | |
|
422 | 5 | float percWorking = 0; |
423 | 5 | float percBlocking = 0; |
424 | 5 | if ( serviceTime > 0 ) { |
425 | 2 | percWorking = ( netServiceTime / serviceTime ) * 100; |
426 | 2 | percBlocking = ( emitTime / serviceTime ) * 100; |
427 | |
} |
428 | |
|
429 | 5 | sb.append( "\n\t% time working:" ).append( formatter.format( percWorking ) ); |
430 | 5 | sb.append( "\n\t% time blocking:" ).append( formatter.format( percBlocking ) ); |
431 | |
|
432 | |
|
433 | |
|
434 | 5 | if (collectBranchStats && emitTimeByBranch.size() > 1 && percBlocking >= BRANCH_BLOCK_THRESHOLD) { |
435 | |
try { |
436 | 1 | for (Map.Entry<String, AtomicLong> entry : emitTimeByBranch.entrySet()) { |
437 | 2 | float branchBlockSec = (entry.getValue().floatValue()/1000.0f); |
438 | 2 | float branchBlockPerc = (branchBlockSec/emitTime) * 100; |
439 | 2 | sb.append("\n\t\t% branch ").append(entry.getKey()).append(":").append(formatter.format(branchBlockPerc)); |
440 | 2 | } |
441 | 0 | } catch (RuntimeException e) { |
442 | |
|
443 | |
|
444 | |
|
445 | 0 | sb.append("\n\t\tproblem getting per-branch stats: ").append(e.getMessage()); |
446 | 1 | } |
447 | |
} |
448 | |
|
449 | 5 | String stageSpecificStatus = this.status(); |
450 | 5 | if ( stageSpecificStatus != null && stageSpecificStatus.length() > 0 ) { |
451 | 0 | sb.append( "\n\t === " ) |
452 | |
.append( className ) |
453 | |
.append( " Specific Stats === " ); |
454 | 0 | sb.append( stageSpecificStatus ); |
455 | |
} |
456 | |
|
457 | 5 | return sb.toString(); |
458 | |
} |
459 | |
|
460 | |
protected String formatTotalTimeStat( String name, AtomicLong totalTime ) { |
461 | 0 | return formatTotalTimeStat( name, totalTime.longValue() ); |
462 | |
} |
463 | |
|
464 | |
protected String formatTotalTimeStat( String name, long totalTime ) { |
465 | 0 | if ( name == null || totalTime < 0 ) { |
466 | 0 | return ""; |
467 | |
} |
468 | 0 | NumberFormat formatter = floatFormatter.get(); |
469 | 0 | StringBuilder sb = new StringBuilder(); |
470 | |
|
471 | 0 | float totalSec = totalTime / 1000.0f; |
472 | 0 | float average = 0; |
473 | 0 | if ( getObjectsReceived() > 0 ) { |
474 | 0 | average = totalSec / getObjectsReceived() / (float)statusBatchSize; |
475 | |
} |
476 | |
|
477 | 0 | if ( log.isDebugEnabled() ) { |
478 | 0 | sb.append( "\n\tTotal " ) |
479 | |
.append( name ) |
480 | |
.append( " processing time (sec):" ) |
481 | |
.append( formatter.format( totalSec ) ); |
482 | |
} |
483 | |
|
484 | 0 | sb.append( "\n\tAverage " ) |
485 | |
.append( name ) |
486 | |
.append( " processing time (sec/obj):" ) |
487 | |
.append( formatter.format( average ) ); |
488 | |
|
489 | 0 | if ( log.isDebugEnabled() && average > 0 ) { |
490 | 0 | float throughput = ( 1.0f ) / average * (float)statusBatchSize; |
491 | 0 | sb.append( "\n\tThroughput for " ) |
492 | |
.append( name ) |
493 | |
.append( " (obj/sec):" ) |
494 | |
.append( formatter.format( throughput ) ); |
495 | |
} |
496 | |
|
497 | 0 | return sb.toString(); |
498 | |
} |
499 | |
|
500 | |
protected String formatCounterStat( String name, AtomicInteger count ) { |
501 | 0 | return formatCounterStat(name, count.get()); |
502 | |
} |
503 | |
|
504 | |
protected String formatCounterStat( String name, AtomicLong count ) { |
505 | 0 | return formatCounterStat(name, count.get()); |
506 | |
} |
507 | |
|
508 | |
protected String formatCounterStat( String name, long count ) { |
509 | 0 | if ( name == null || count < 0 || getObjectsReceived() <= 0) { |
510 | 0 | return ""; |
511 | |
} |
512 | 0 | NumberFormat formatter = floatFormatter.get(); |
513 | 0 | StringBuilder sb = new StringBuilder(); |
514 | |
|
515 | 0 | float perc = ((float)count*(float)statusBatchSize/(float)getObjectsReceived())*100.0f; |
516 | |
|
517 | 0 | sb.append( "\n\tNumber of " ) |
518 | |
.append( name ) |
519 | |
.append( " (" ) |
520 | |
.append( formatter.format(perc) ) |
521 | |
.append( "%) :") |
522 | |
.append( count ); |
523 | |
|
524 | 0 | return sb.toString(); |
525 | |
} |
526 | |
|
527 | |
|
528 | |
|
529 | |
|
530 | |
public Long getStatusInterval() { |
531 | 1 | return Long.valueOf(statusInterval); |
532 | |
} |
533 | |
|
534 | |
|
535 | |
|
536 | |
|
537 | |
public void setStatusInterval( Long statusInterval ) { |
538 | 1 | this.statusInterval = statusInterval; |
539 | 1 | } |
540 | |
|
541 | |
public Integer getStatusBatchSize() { |
542 | 1 | return statusBatchSize; |
543 | |
} |
544 | |
|
545 | |
public void setStatusBatchSize(Integer statusBatchSize) { |
546 | 1 | this.statusBatchSize = statusBatchSize; |
547 | 1 | } |
548 | |
|
549 | |
|
550 | |
|
551 | |
|
552 | |
public long getObjectsReceived() { |
553 | 8 | return objectsReceived.longValue(); |
554 | |
} |
555 | |
|
556 | |
|
557 | |
|
558 | |
|
559 | |
public long getTotalServiceTime() { |
560 | 0 | return totalServiceTime.longValue(); |
561 | |
} |
562 | |
|
563 | |
|
564 | |
|
565 | |
|
566 | |
public long getTotalEmitTime() { |
567 | 0 | return totalEmitTime.longValue(); |
568 | |
} |
569 | |
|
570 | |
|
571 | |
|
572 | |
|
573 | |
public long getTotalEmits() { |
574 | 6 | return totalEmits.longValue(); |
575 | |
} |
576 | |
|
577 | |
|
578 | |
|
579 | |
|
580 | |
public Boolean getCollectBranchStats() { |
581 | 1 | return collectBranchStats; |
582 | |
} |
583 | |
|
584 | |
|
585 | |
|
586 | |
|
587 | |
public void setCollectBranchStats(Boolean collectBranchStats) { |
588 | 2 | this.collectBranchStats = collectBranchStats; |
589 | 2 | } |
590 | |
|
591 | |
public Integer getCurrentStatWindowSize() { |
592 | 1 | return Integer.valueOf(currentStatWindowSize); |
593 | |
} |
594 | |
|
595 | |
public void setCurrentStatWindowSize(Integer newStatWindowSize) { |
596 | 1 | if (serviceTimeStatistics != null |
597 | |
&& newStatWindowSize != this.currentStatWindowSize) { |
598 | 0 | synchronized (serviceTimeStatistics) { |
599 | 0 | serviceTimeStatistics.setWindowSize(newStatWindowSize); |
600 | 0 | } |
601 | |
} |
602 | 1 | this.currentStatWindowSize = newStatWindowSize; |
603 | 1 | } |
604 | |
|
605 | |
public String getStageName() { |
606 | 1 | return stageName; |
607 | |
} |
608 | |
|
609 | |
public void setStageName(String name) { |
610 | 1 | this.stageName = name; |
611 | 1 | } |
612 | |
|
613 | |
public boolean isJmxEnabled() { |
614 | 2 | return jmxEnabled; |
615 | |
} |
616 | |
|
617 | |
public void setJmxEnabled(boolean jmxEnabled) { |
618 | 2 | this.jmxEnabled = jmxEnabled; |
619 | 2 | } |
620 | |
|
621 | |
|
622 | |
|
623 | |
|
624 | |
|
625 | |
|
626 | |
|
627 | |
public double getCurrentServiceTimeAverage() { |
628 | 4 | double avg = -1.0d; |
629 | |
|
630 | |
|
631 | 4 | avg = serviceTimeStatistics.getMean(); |
632 | |
|
633 | 4 | return avg; |
634 | |
} |
635 | |
} |