001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.filter.executor; 021 022import org.apache.mina.core.filterchain.IoFilterAdapter; 023import org.apache.mina.core.future.IoFutureListener; 024import org.apache.mina.core.future.WriteFuture; 025import org.apache.mina.core.service.IoProcessor; 026import org.apache.mina.core.session.IoEvent; 027import org.apache.mina.core.session.IoEventType; 028import org.apache.mina.core.session.IoSession; 029import org.apache.mina.core.write.WriteRequest; 030 031/** 032 * Attaches an {@link IoEventQueueHandler} to an {@link IoSession}'s 033 * {@link WriteRequest} queue to provide accurate write queue status tracking. 034 * <p> 035 * The biggest difference from {@link OrderedThreadPoolExecutor} and 036 * {@link UnorderedThreadPoolExecutor} is that {@link IoEventQueueHandler#polled(Object, IoEvent)} 037 * is invoked when the write operation is completed by an {@link IoProcessor}, 038 * consequently providing the accurate tracking of the write request queue 039 * status to the {@link IoEventQueueHandler}. 040 * <p> 041 * Most common usage of this filter could be detecting an {@link IoSession} 042 * which writes too fast which will cause {@link OutOfMemoryError} soon: 043 * <pre> 044 * session.getFilterChain().addLast( 045 * "writeThrottle", 046 * new WriteRequestFilter(new IoEventQueueThrottle())); 047 * </pre> 048 * 049 * <h3>Known issues</h3> 050 * 051 * You can run into a dead lock if you run this filter with the blocking 052 * {@link IoEventQueueHandler} implementation such as {@link IoEventQueueThrottle} 053 * in the {@link IoProcessor} thread. It's because an {@link IoProcessor} 054 * thread is what processes the {@link WriteRequest}s and notifies related 055 * {@link WriteFuture}s; the {@link IoEventQueueHandler} implementation that 056 * waits for the size of the write request queue to decrease will never wake 057 * up. To use such an handler, you have to insert an {@link ExecutorFilter} 058 * before this filter or call {@link IoSession#write(Object)} method always 059 * from a different thread. 060 * 061 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 062 */ 063public class WriteRequestFilter extends IoFilterAdapter { 064 065 private final IoEventQueueHandler queueHandler; 066 067 /** 068 * Creates a new instance with a new default {@link IoEventQueueThrottle}. 069 */ 070 public WriteRequestFilter() { 071 this(new IoEventQueueThrottle()); 072 } 073 074 /** 075 * Creates a new instance with the specified {@link IoEventQueueHandler}. 076 * 077 * @param queueHandler The {@link IoEventQueueHandler} instance to use 078 */ 079 public WriteRequestFilter(IoEventQueueHandler queueHandler) { 080 if (queueHandler == null) { 081 throw new IllegalArgumentException("queueHandler"); 082 } 083 this.queueHandler = queueHandler; 084 } 085 086 /** 087 * @return the {@link IoEventQueueHandler} which is attached to this 088 * filter. 089 */ 090 public IoEventQueueHandler getQueueHandler() { 091 return queueHandler; 092 } 093 094 @Override 095 public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { 096 097 final IoEvent e = new IoEvent(IoEventType.WRITE, session, writeRequest); 098 099 if (queueHandler.accept(this, e)) { 100 nextFilter.filterWrite(session, writeRequest); 101 WriteFuture writeFuture = writeRequest.getFuture(); 102 if (writeFuture == null) { 103 return; 104 } 105 106 // We can track the write request only when it has a future. 107 queueHandler.offered(this, e); 108 writeFuture.addListener(new IoFutureListener<WriteFuture>() { 109 public void operationComplete(WriteFuture future) { 110 queueHandler.polled(WriteRequestFilter.this, e); 111 } 112 }); 113 } 114 } 115}