001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.Locale; 020 import java.util.concurrent.TimeUnit; 021 022 import org.apache.camel.AsyncCallback; 023 import org.apache.camel.Exchange; 024 import org.apache.camel.Processor; 025 import org.slf4j.Logger; 026 import org.slf4j.LoggerFactory; 027 028 /** 029 * A <code>SamplingThrottler</code> is a special kind of throttler. It also 030 * limits the number of exchanges sent to a downstream endpoint. It differs from 031 * a normal throttler in that it will not queue exchanges above the threshold 032 * for a given period. Instead these exchanges will be stopped, precluding them 033 * from being processed at all by downstream consumers. 034 * <p/> 035 * This kind of throttling can be useful for taking a sample from 036 * an exchange stream, rough consolidation of noisy and bursty exchange traffic 037 * or where queuing of throttled exchanges is undesirable. 038 * 039 * @version 040 */ 041 public class SamplingThrottler extends DelegateAsyncProcessor { 042 043 protected final Logger log = LoggerFactory.getLogger(getClass()); 044 private long messageFrequency; 045 private long currentMessageCount; 046 private long samplePeriod; 047 private long periodInMillis; 048 private TimeUnit units; 049 private long timeOfLastExchange; 050 private StopProcessor stopper = new StopProcessor(); 051 private final Object calculationLock = new Object(); 052 private SampleStats sampled = new SampleStats(); 053 054 public SamplingThrottler(Processor processor, long messageFrequency) { 055 super(processor); 056 057 if (messageFrequency <= 0) { 058 throw new IllegalArgumentException("A positive value is required for the sampling message frequency"); 059 } 060 this.messageFrequency = messageFrequency; 061 } 062 063 public SamplingThrottler(Processor processor, long samplePeriod, TimeUnit units) { 064 super(processor); 065 066 if (samplePeriod <= 0) { 067 throw new IllegalArgumentException("A positive value is required for the sampling period"); 068 } 069 if (units == null) { 070 throw new IllegalArgumentException("A invalid null value was supplied for the units of the sampling period"); 071 } 072 this.samplePeriod = samplePeriod; 073 this.units = units; 074 this.periodInMillis = units.toMillis(samplePeriod); 075 } 076 077 @Override 078 public String toString() { 079 if (messageFrequency > 0) { 080 return "SamplingThrottler[1 exchange per: " + messageFrequency + " messages received -> " + getProcessor() + "]"; 081 } else { 082 return "SamplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase(Locale.ENGLISH) + " -> " + getProcessor() + "]"; 083 } 084 } 085 086 public String getTraceLabel() { 087 if (messageFrequency > 0) { 088 return "samplingThrottler[1 exchange per: " + messageFrequency + " messages received]"; 089 } else { 090 return "samplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase(Locale.ENGLISH) + "]"; 091 } 092 } 093 094 @Override 095 public boolean process(Exchange exchange, AsyncCallback callback) { 096 boolean doSend = false; 097 098 synchronized (calculationLock) { 099 100 if (messageFrequency > 0) { 101 currentMessageCount++; 102 if (currentMessageCount % messageFrequency == 0) { 103 doSend = true; 104 } 105 } else { 106 long now = System.currentTimeMillis(); 107 if (now >= timeOfLastExchange + periodInMillis) { 108 doSend = true; 109 if (log.isTraceEnabled()) { 110 log.trace(sampled.sample()); 111 } 112 timeOfLastExchange = now; 113 } else { 114 if (log.isTraceEnabled()) { 115 log.trace(sampled.drop()); 116 } 117 } 118 } 119 } 120 121 if (doSend) { 122 // continue routing 123 return processor.process(exchange, callback); 124 } else { 125 // okay to invoke this synchronously as the stopper 126 // will just set a property 127 try { 128 stopper.process(exchange); 129 } catch (Exception e) { 130 exchange.setException(e); 131 } 132 } 133 134 // we are done synchronously 135 callback.done(true); 136 return true; 137 } 138 139 private static class SampleStats { 140 private long droppedThisPeriod; 141 private long totalDropped; 142 private long totalSampled; 143 private long totalThisPeriod; 144 145 String drop() { 146 droppedThisPeriod++; 147 totalThisPeriod++; 148 totalDropped++; 149 return getDroppedLog(); 150 } 151 152 String sample() { 153 totalThisPeriod = 1; // a new period, reset to 1 154 totalSampled++; 155 droppedThisPeriod = 0; 156 return getSampledLog(); 157 } 158 159 String getSampledLog() { 160 return String.format("Sampled %d of %d total exchanges", totalSampled, totalSampled + totalDropped); 161 } 162 163 String getDroppedLog() { 164 return String.format("Dropped %d of %d exchanges in this period, totalling %d dropped of %d exchanges overall.", 165 droppedThisPeriod, totalThisPeriod, totalDropped, totalSampled + totalDropped); 166 } 167 } 168 169 }