001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.filter.executor; 021 022import java.util.concurrent.atomic.AtomicInteger; 023 024import org.apache.mina.core.session.IoEvent; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028/** 029 * Throttles incoming or outgoing events. 030 * 031 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 032 */ 033public class IoEventQueueThrottle implements IoEventQueueHandler { 034 /** A logger for this class */ 035 private final static Logger LOGGER = LoggerFactory.getLogger(IoEventQueueThrottle.class); 036 037 /** The event size estimator instance */ 038 private final IoEventSizeEstimator eventSizeEstimator; 039 040 private volatile int threshold; 041 042 private final Object lock = new Object(); 043 044 private final AtomicInteger counter = new AtomicInteger(); 045 046 private int waiters; 047 048 public IoEventQueueThrottle() { 049 this(new DefaultIoEventSizeEstimator(), 65536); 050 } 051 052 public IoEventQueueThrottle(int threshold) { 053 this(new DefaultIoEventSizeEstimator(), threshold); 054 } 055 056 public IoEventQueueThrottle(IoEventSizeEstimator eventSizeEstimator, int threshold) { 057 if (eventSizeEstimator == null) { 058 throw new IllegalArgumentException("eventSizeEstimator"); 059 } 060 061 this.eventSizeEstimator = eventSizeEstimator; 062 063 setThreshold(threshold); 064 } 065 066 public IoEventSizeEstimator getEventSizeEstimator() { 067 return eventSizeEstimator; 068 } 069 070 public int getThreshold() { 071 return threshold; 072 } 073 074 public int getCounter() { 075 return counter.get(); 076 } 077 078 public void setThreshold(int threshold) { 079 if (threshold <= 0) { 080 throw new IllegalArgumentException("threshold: " + threshold); 081 } 082 083 this.threshold = threshold; 084 } 085 086 public boolean accept(Object source, IoEvent event) { 087 return true; 088 } 089 090 public void offered(Object source, IoEvent event) { 091 int eventSize = estimateSize(event); 092 int currentCounter = counter.addAndGet(eventSize); 093 logState(); 094 095 if (currentCounter >= threshold) { 096 block(); 097 } 098 } 099 100 public void polled(Object source, IoEvent event) { 101 int eventSize = estimateSize(event); 102 int currentCounter = counter.addAndGet(-eventSize); 103 104 logState(); 105 106 if (currentCounter < threshold) { 107 unblock(); 108 } 109 } 110 111 private int estimateSize(IoEvent event) { 112 int size = getEventSizeEstimator().estimateSize(event); 113 114 if (size < 0) { 115 throw new IllegalStateException(IoEventSizeEstimator.class.getSimpleName() + " returned " 116 + "a negative value (" + size + "): " + event); 117 } 118 119 return size; 120 } 121 122 private void logState() { 123 if (LOGGER.isDebugEnabled()) { 124 LOGGER.debug(Thread.currentThread().getName() + " state: " + counter.get() + " / " + getThreshold()); 125 } 126 } 127 128 protected void block() { 129 if (LOGGER.isDebugEnabled()) { 130 LOGGER.debug(Thread.currentThread().getName() + " blocked: " + counter.get() + " >= " + threshold); 131 } 132 133 synchronized (lock) { 134 while (counter.get() >= threshold) { 135 waiters++; 136 try { 137 lock.wait(); 138 } catch (InterruptedException e) { 139 // Wait uninterruptably. 140 } finally { 141 waiters--; 142 } 143 } 144 } 145 146 if (LOGGER.isDebugEnabled()) { 147 LOGGER.debug(Thread.currentThread().getName() + " unblocked: " + counter.get() + " < " + threshold); 148 } 149 } 150 151 protected void unblock() { 152 synchronized (lock) { 153 if (waiters > 0) { 154 lock.notifyAll(); 155 } 156 } 157 } 158}