001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.ScheduledExecutorService; 020 import java.util.concurrent.atomic.AtomicLong; 021 022 import org.apache.camel.CamelContext; 023 import org.apache.camel.Exchange; 024 import org.apache.camel.Expression; 025 import org.apache.camel.Processor; 026 import org.apache.camel.RuntimeExchangeException; 027 import org.apache.camel.Traceable; 028 import org.apache.camel.util.ObjectHelper; 029 030 /** 031 * A <a href="http://camel.apache.org/throttler.html">Throttler</a> 032 * will set a limit on the maximum number of message exchanges which can be sent 033 * to a processor within a specific time period. <p/> This pattern can be 034 * extremely useful if you have some external system which meters access; such 035 * as only allowing 100 requests per second; or if huge load can cause a 036 * particular system to malfunction or to reduce its throughput you might want 037 * to introduce some throttling. 038 * 039 * @version 040 */ 041 public class Throttler extends DelayProcessorSupport implements Traceable { 042 private volatile long maximumRequestsPerPeriod; 043 private Expression maxRequestsPerPeriodExpression; 044 private AtomicLong timePeriodMillis = new AtomicLong(1000); 045 private volatile TimeSlot slot; 046 047 public Throttler(CamelContext camelContext, Processor processor, Expression maxRequestsPerPeriodExpression, long timePeriodMillis, 048 ScheduledExecutorService executorService, boolean shutdownExecutorService) { 049 super(camelContext, processor, executorService, shutdownExecutorService); 050 051 ObjectHelper.notNull(maxRequestsPerPeriodExpression, "maxRequestsPerPeriodExpression"); 052 this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression; 053 054 if (timePeriodMillis <= 0) { 055 throw new IllegalArgumentException("TimePeriodMillis should be a positive number, was: " + timePeriodMillis); 056 } 057 this.timePeriodMillis.set(timePeriodMillis); 058 } 059 060 @Override 061 public String toString() { 062 return "Throttler[requests: " + maxRequestsPerPeriodExpression + " per: " + timePeriodMillis + " (ms) to: " 063 + getProcessor() + "]"; 064 } 065 066 public String getTraceLabel() { 067 return "throttle[" + maxRequestsPerPeriodExpression + " per: " + timePeriodMillis + "]"; 068 } 069 070 // Properties 071 // ----------------------------------------------------------------------- 072 073 /** 074 * Sets the maximum number of requests per time period expression 075 */ 076 public void setMaximumRequestsPerPeriodExpression(Expression maxRequestsPerPeriodExpression) { 077 this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression; 078 } 079 080 public Expression getMaximumRequestsPerPeriodExpression() { 081 return maxRequestsPerPeriodExpression; 082 } 083 084 public long getTimePeriodMillis() { 085 return timePeriodMillis.get(); 086 } 087 088 /** 089 * Gets the current maximum request per period value. 090 */ 091 public long getCurrentMaximumRequestsPerPeriod() { 092 return maximumRequestsPerPeriod; 093 } 094 095 /** 096 * Sets the time period during which the maximum number of requests apply 097 */ 098 public void setTimePeriodMillis(long timePeriodMillis) { 099 this.timePeriodMillis.set(timePeriodMillis); 100 } 101 102 // Implementation methods 103 // ----------------------------------------------------------------------- 104 105 protected long calculateDelay(Exchange exchange) { 106 // evaluate as Object first to see if we get any result at all 107 Object result = maxRequestsPerPeriodExpression.evaluate(exchange, Object.class); 108 if (result == null) { 109 throw new RuntimeExchangeException("The max requests per period expression was evaluated as null: " + maxRequestsPerPeriodExpression, exchange); 110 } 111 112 // then must convert value to long 113 Long longValue = exchange.getContext().getTypeConverter().convertTo(Long.class, result); 114 if (longValue != null) { 115 // log if we changed max period after initial setting 116 if (maximumRequestsPerPeriod > 0 && longValue.longValue() != maximumRequestsPerPeriod) { 117 log.debug("Throttler changed maximum requests per period from {} to {}", maximumRequestsPerPeriod, longValue); 118 } 119 if (maximumRequestsPerPeriod > longValue) { 120 slot.capacity = 0; 121 } 122 maximumRequestsPerPeriod = longValue; 123 } 124 125 if (maximumRequestsPerPeriod <= 0) { 126 throw new IllegalStateException("The maximumRequestsPerPeriod must be a positive number, was: " + maximumRequestsPerPeriod); 127 } 128 129 TimeSlot slot = nextSlot(); 130 if (!slot.isActive()) { 131 long delay = slot.startTime - currentSystemTime(); 132 return delay; 133 } else { 134 return 0; 135 } 136 } 137 138 /* 139 * Determine what the next available time slot is for handling an Exchange 140 */ 141 protected synchronized TimeSlot nextSlot() { 142 if (slot == null) { 143 slot = new TimeSlot(); 144 } 145 if (slot.isFull() || !slot.isPast()) { 146 slot = slot.next(); 147 } 148 slot.assign(); 149 return slot; 150 } 151 152 /* 153 * A time slot is capable of handling a number of exchanges within a certain period of time. 154 */ 155 protected class TimeSlot { 156 157 private volatile long capacity = Throttler.this.maximumRequestsPerPeriod; 158 private final long duration = Throttler.this.timePeriodMillis.get(); 159 private final long startTime; 160 161 protected TimeSlot() { 162 this(System.currentTimeMillis()); 163 } 164 165 protected TimeSlot(long startTime) { 166 this.startTime = startTime; 167 } 168 169 protected void assign() { 170 capacity--; 171 } 172 173 /* 174 * Start the next time slot either now or in the future 175 * (no time slots are being created in the past) 176 */ 177 protected TimeSlot next() { 178 return new TimeSlot(Math.max(System.currentTimeMillis(), this.startTime + this.duration)); 179 } 180 181 protected boolean isPast() { 182 long current = System.currentTimeMillis(); 183 return current < (startTime + duration); 184 } 185 186 protected boolean isActive() { 187 long current = System.currentTimeMillis(); 188 return startTime <= current && current < (startTime + duration); 189 } 190 191 protected boolean isFull() { 192 return capacity <= 0; 193 } 194 } 195 196 TimeSlot getSlot() { 197 return this.slot; 198 } 199 }