001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.proxy.event;
021
022import java.util.LinkedList;
023import java.util.Queue;
024
025import org.apache.mina.proxy.handlers.socks.SocksProxyRequest;
026import org.apache.mina.proxy.session.ProxyIoSession;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * IoSessionEventQueue.java - Queue that contains filtered session events 
032 * while handshake isn't done.
033 * 
034 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
035 * @since MINA 2.0.0-M3
036 */
037public class IoSessionEventQueue {
038    private final static Logger logger = LoggerFactory.getLogger(IoSessionEventQueue.class);
039
040    /**
041     * The proxy session object.
042     */
043    private ProxyIoSession proxyIoSession;
044
045    /**
046     * Queue of session events which occurred before the proxy handshake had completed.
047     */
048    private Queue<IoSessionEvent> sessionEventsQueue = new LinkedList<IoSessionEvent>();
049
050    public IoSessionEventQueue(ProxyIoSession proxyIoSession) {
051        this.proxyIoSession = proxyIoSession;
052    }
053
054    /**
055     * Discard all events from the queue.
056     */
057    private void discardSessionQueueEvents() {
058        synchronized (sessionEventsQueue) {
059            // Free queue
060            sessionEventsQueue.clear();
061            logger.debug("Event queue CLEARED");
062        }
063    }
064
065    /**
066     * Event is enqueued only if necessary : 
067     * - socks proxies do not need the reconnection feature so events are always 
068     * forwarded for these.
069     * - http proxies events will be enqueued while handshake has not been completed
070     * or until connection was closed.
071     * If connection was prematurely closed previous events are discarded and only the
072     * session closed is delivered.  
073     * 
074     * @param evt the event to enqueue
075     */
076    public void enqueueEventIfNecessary(final IoSessionEvent evt) {
077        logger.debug("??? >> Enqueue {}", evt);
078
079        if (proxyIoSession.getRequest() instanceof SocksProxyRequest) {
080            // No reconnection used
081            evt.deliverEvent();
082            return;
083        }
084
085        if (proxyIoSession.getHandler().isHandshakeComplete()) {
086            evt.deliverEvent();
087        } else {
088            if (evt.getType() == IoSessionEventType.CLOSED) {
089                if (proxyIoSession.isAuthenticationFailed()) {
090                    proxyIoSession.getConnector().cancelConnectFuture();
091                    discardSessionQueueEvents();
092                    evt.deliverEvent();
093                } else {
094                    discardSessionQueueEvents();
095                }
096            } else if (evt.getType() == IoSessionEventType.OPENED) {
097                // Enqueue event cause it will not reach IoHandler but deliver it to enable 
098                // session creation.
099                enqueueSessionEvent(evt);
100                evt.deliverEvent();
101            } else {
102                enqueueSessionEvent(evt);
103            }
104        }
105    }
106
107    /**
108     * Send any session event which were queued while waiting for handshaking to complete.
109     * 
110     * Please note this is an internal method. DO NOT USE it in your code.
111     * 
112     * @throws Exception If something went wrong while flushing the pending events
113     */
114    public void flushPendingSessionEvents() throws Exception {
115        synchronized (sessionEventsQueue) {
116            IoSessionEvent evt;
117
118            while ((evt = sessionEventsQueue.poll()) != null) {
119                logger.debug(" Flushing buffered event: {}", evt);
120                evt.deliverEvent();
121            }
122        }
123    }
124
125    /**
126     * Enqueue an event to be delivered once handshaking is complete.
127     * 
128     * @param evt the session event to enqueue
129     */
130    private void enqueueSessionEvent(final IoSessionEvent evt) {
131        synchronized (sessionEventsQueue) {
132            logger.debug("Enqueuing event: {}", evt);
133            sessionEventsQueue.offer(evt);
134        }
135    }
136}