001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.filter.executor;
021
022import org.apache.mina.core.filterchain.IoFilterAdapter;
023import org.apache.mina.core.future.IoFutureListener;
024import org.apache.mina.core.future.WriteFuture;
025import org.apache.mina.core.service.IoProcessor;
026import org.apache.mina.core.session.IoEvent;
027import org.apache.mina.core.session.IoEventType;
028import org.apache.mina.core.session.IoSession;
029import org.apache.mina.core.write.WriteRequest;
030
031/**
032 * Attaches an {@link IoEventQueueHandler} to an {@link IoSession}'s
033 * {@link WriteRequest} queue to provide accurate write queue status tracking.
034 * <p>
035 * The biggest difference from {@link OrderedThreadPoolExecutor} and
036 * {@link UnorderedThreadPoolExecutor} is that {@link IoEventQueueHandler#polled(Object, IoEvent)}
037 * is invoked when the write operation is completed by an {@link IoProcessor},
038 * consequently providing the accurate tracking of the write request queue
039 * status to the {@link IoEventQueueHandler}.
040 * <p>
041 * Most common usage of this filter could be detecting an {@link IoSession}
042 * which writes too fast which will cause {@link OutOfMemoryError} soon:
043 * <pre>
044 *     session.getFilterChain().addLast(
045 *             "writeThrottle",
046 *             new WriteRequestFilter(new IoEventQueueThrottle()));
047 * </pre>
048 *
049 * <h3>Known issues</h3>
050 *
051 * You can run into a dead lock if you run this filter with the blocking
052 * {@link IoEventQueueHandler} implementation such as {@link IoEventQueueThrottle}
053 * in the {@link IoProcessor} thread.  It's because an {@link IoProcessor}
054 * thread is what processes the {@link WriteRequest}s and notifies related
055 * {@link WriteFuture}s; the {@link IoEventQueueHandler} implementation that
056 * waits for the size of the write request queue to decrease will never wake
057 * up.  To use such an handler, you have to insert an {@link ExecutorFilter}
058 * before this filter or call {@link IoSession#write(Object)} method always
059 * from a different thread.
060 *
061 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
062 */
063public class WriteRequestFilter extends IoFilterAdapter {
064
065    private final IoEventQueueHandler queueHandler;
066
067    /**
068     * Creates a new instance with a new default {@link IoEventQueueThrottle}.
069     */
070    public WriteRequestFilter() {
071        this(new IoEventQueueThrottle());
072    }
073
074    /**
075     * Creates a new instance with the specified {@link IoEventQueueHandler}.
076     * 
077     * @param queueHandler The {@link IoEventQueueHandler} instance to use
078     */
079    public WriteRequestFilter(IoEventQueueHandler queueHandler) {
080        if (queueHandler == null) {
081            throw new IllegalArgumentException("queueHandler");
082        }
083        this.queueHandler = queueHandler;
084    }
085
086    /**
087     * @return the {@link IoEventQueueHandler} which is attached to this
088     * filter.
089     */
090    public IoEventQueueHandler getQueueHandler() {
091        return queueHandler;
092    }
093
094    @Override
095    public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
096
097        final IoEvent e = new IoEvent(IoEventType.WRITE, session, writeRequest);
098
099        if (queueHandler.accept(this, e)) {
100            nextFilter.filterWrite(session, writeRequest);
101            WriteFuture writeFuture = writeRequest.getFuture();
102            if (writeFuture == null) {
103                return;
104            }
105
106            // We can track the write request only when it has a future.
107            queueHandler.offered(this, e);
108            writeFuture.addListener(new IoFutureListener<WriteFuture>() {
109                public void operationComplete(WriteFuture future) {
110                    queueHandler.polled(WriteRequestFilter.this, e);
111                }
112            });
113        }
114    }
115}