View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.quotas;
20  
21  import java.util.concurrent.TimeUnit;
22  
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.classification.InterfaceStability;
25  
26  import com.google.common.annotations.VisibleForTesting;
27  
28  /**
29   * Simple rate limiter.
30   *
31   * Usage Example:
32   *    // At this point you have a unlimited resource limiter
33   *   RateLimiter limiter = new AverageIntervalRateLimiter();
34   *                         or new FixedIntervalRateLimiter();
35   *   limiter.set(10, TimeUnit.SECONDS);       // set 10 resources/sec
36   *
37   *   while (true) {
38   *     // call canExecute before performing resource consuming operation
39   *     bool canExecute = limiter.canExecute();
40   *     // If there are no available resources, wait until one is available
41   *     if (!canExecute) Thread.sleep(limiter.waitInterval());
42   *     // ...execute the work and consume the resource...
43   *     limiter.consume();
44   *   }
45   */
46  @InterfaceAudience.Private
47  @InterfaceStability.Evolving
48  public abstract class RateLimiter {
49    public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter";
50    private long tunit = 1000;           // Timeunit factor for translating to ms.
51    private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to.
52    private long avail = Long.MAX_VALUE; // Currently available resource units
53  
54    /**
55     * Refill the available units w.r.t the elapsed time.
56     * @param limit Maximum available resource units that can be refilled to.
57     * @return how many resource units may be refilled ?
58     */
59    abstract long refill(long limit);
60  
61    /**
62     * Time in milliseconds to wait for before requesting to consume 'amount' resource.
63     * @param limit Maximum available resource units that can be refilled to.
64     * @param available Currently available resource units
65     * @param amount Resources for which time interval to calculate for
66     * @return estimate of the ms required to wait before being able to provide 'amount' resources.
67     */
68    abstract long getWaitInterval(long limit, long available, long amount);
69  
70  
71    /**
72     * Set the RateLimiter max available resources and refill period.
73     * @param limit The max value available resource units can be refilled to.
74     * @param timeUnit Timeunit factor for translating to ms.
75     */
76    public void set(final long limit, final TimeUnit timeUnit) {
77      switch (timeUnit) {
78      case MILLISECONDS:
79        tunit = 1;
80        break;
81      case SECONDS:
82        tunit = 1000;
83        break;
84      case MINUTES:
85        tunit = 60 * 1000;
86        break;
87      case HOURS:
88        tunit = 60 * 60 * 1000;
89        break;
90      case DAYS:
91        tunit = 24 * 60 * 60 * 1000;
92        break;
93      default:
94        throw new RuntimeException("Unsupported " + timeUnit.name() + " TimeUnit.");
95      }
96      this.limit = limit;
97      this.avail = limit;
98    }
99  
100   public String toString() {
101     String rateLimiter = this.getClass().getSimpleName();
102     if (limit == Long.MAX_VALUE) {
103       return rateLimiter + "(Bypass)";
104     }
105     return rateLimiter + "(avail=" + avail + " limit=" + limit + " tunit=" + tunit + ")";
106   }
107 
108   /**
109    * Sets the current instance of RateLimiter to a new values.
110    *
111    * if current limit is smaller than the new limit, bump up the available resources.
112    * Otherwise allow clients to use up the previously available resources.
113    */
114   public synchronized void update(final RateLimiter other) {
115     this.tunit = other.tunit;
116     if (this.limit < other.limit) {
117       this.avail += (other.limit - this.limit);
118     }
119     this.limit = other.limit;
120   }
121 
122   public synchronized boolean isBypass() {
123     return limit == Long.MAX_VALUE;
124   }
125 
126   public synchronized long getLimit() {
127     return limit;
128   }
129 
130   public synchronized long getAvailable() {
131     return avail;
132   }
133 
134   protected long getTimeUnitInMillis() {
135     return tunit;
136   }
137 
138   /**
139    * Is there at least one resource available to allow execution?
140    * @return true if there is at least one resource available, otherwise false
141    */
142   public boolean canExecute() {
143     return canExecute(1);
144   }
145 
146   /**
147    * Are there enough available resources to allow execution?
148    * @param amount the number of required resources
149    * @return true if there are enough available resources, otherwise false
150    */
151   public synchronized boolean canExecute(final long amount) {
152     long refillAmount = refill(limit);
153     if (refillAmount == 0 && avail < amount) {
154       return false;
155     }
156     // check for positive overflow
157     if (avail <= Long.MAX_VALUE - refillAmount) {
158       avail = Math.max(0, Math.min(avail + refillAmount, limit));
159     } else {
160       avail = Math.max(0, limit);
161     }
162     if (avail >= amount) {
163       return true;
164     }
165     return false;
166   }
167 
168   /**
169    * consume one available unit.
170    */
171   public void consume() {
172     consume(1);
173   }
174 
175   /**
176    * consume amount available units.
177    * @param amount the number of units to consume
178    */
179   public synchronized void consume(final long amount) {
180     this.avail -= amount;
181     if (this.avail < 0) {
182       this.avail = 0;
183     }
184   }
185 
186   /**
187    * @return estimate of the ms required to wait before being able to provide 1 resource.
188    */
189   public long waitInterval() {
190     return waitInterval(1);
191   }
192 
193   /**
194    * @return estimate of the ms required to wait before being able to provide "amount" resources.
195    */
196   public synchronized long waitInterval(final long amount) {
197     // TODO Handle over quota?
198     return (amount <= avail) ? 0 : getWaitInterval(limit, avail, amount);
199   }
200 
201   // These two method are for strictly testing purpose only
202   @VisibleForTesting
203   public abstract void setNextRefillTime(long nextRefillTime);
204 
205   @VisibleForTesting
206   public abstract long getNextRefillTime();
207 }