001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.proxy;
021
022import java.util.LinkedList;
023import java.util.Queue;
024
025import org.apache.mina.core.buffer.IoBuffer;
026import org.apache.mina.core.filterchain.IoFilter.NextFilter;
027import org.apache.mina.core.future.DefaultWriteFuture;
028import org.apache.mina.core.future.WriteFuture;
029import org.apache.mina.core.session.IoSession;
030import org.apache.mina.core.write.DefaultWriteRequest;
031import org.apache.mina.core.write.WriteRequest;
032import org.apache.mina.proxy.filter.ProxyFilter;
033import org.apache.mina.proxy.filter.ProxyHandshakeIoBuffer;
034import org.apache.mina.proxy.session.ProxyIoSession;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * AbstractProxyLogicHandler.java - Helper class to handle proxy handshaking logic. Derived classes 
040 * implement proxy type specific logic.
041 * <p>
042 * Based upon SSLHandler from mina-filter-ssl.
043 * 
044 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
045 * @since MINA 2.0.0-M3
046 */
047public abstract class AbstractProxyLogicHandler implements ProxyLogicHandler {
048
049    private final static Logger LOGGER = LoggerFactory.getLogger(AbstractProxyLogicHandler.class);
050
051    /**
052     * Object that contains all the proxy authentication session informations.
053     */
054    private ProxyIoSession proxyIoSession;
055
056    /**
057     * Queue of write events which occurred before the proxy handshake had completed.
058     */
059    private Queue<Event> writeRequestQueue = null;
060
061    /**
062     * Has the handshake been completed.
063     */
064    private boolean handshakeComplete = false;
065
066    /**
067     * Creates a new {@link AbstractProxyLogicHandler}.
068     * 
069     * @param proxyIoSession {@link ProxyIoSession} in use.
070     */
071    public AbstractProxyLogicHandler(ProxyIoSession proxyIoSession) {
072        this.proxyIoSession = proxyIoSession;
073    }
074
075    /**
076     * @return the proxy filter {@link ProxyFilter}.
077     */
078    protected ProxyFilter getProxyFilter() {
079        return proxyIoSession.getProxyFilter();
080    }
081
082    /**
083     * @return the session.
084     */
085    protected IoSession getSession() {
086        return proxyIoSession.getSession();
087    }
088
089    /**
090     * @return the {@link ProxyIoSession} object.
091     */
092    public ProxyIoSession getProxyIoSession() {
093        return proxyIoSession;
094    }
095
096    /**
097     * Writes data to the proxy server.
098     * 
099     * @param nextFilter the next filter
100     * @param data Data buffer to be written.
101     * @return A Future for the write operation
102     */
103    protected WriteFuture writeData(final NextFilter nextFilter, final IoBuffer data) {
104        // write net data
105        ProxyHandshakeIoBuffer writeBuffer = new ProxyHandshakeIoBuffer(data);
106
107        LOGGER.debug("   session write: {}", writeBuffer);
108
109        WriteFuture writeFuture = new DefaultWriteFuture(getSession());
110        getProxyFilter().writeData(nextFilter, getSession(), new DefaultWriteRequest(writeBuffer, writeFuture), true);
111
112        return writeFuture;
113    }
114
115    /**
116     * @return <tt>true</tt> if handshaking is complete and
117     * data can be sent through the proxy.
118     */
119    public boolean isHandshakeComplete() {
120        synchronized (this) {
121            return handshakeComplete;
122        }
123    }
124
125    /**
126     * Signals that the handshake has finished.
127     */
128    protected final void setHandshakeComplete() {
129        synchronized (this) {
130            handshakeComplete = true;
131        }
132
133        ProxyIoSession proxyIoSession = getProxyIoSession();
134        proxyIoSession.getConnector().fireConnected(proxyIoSession.getSession()).awaitUninterruptibly();
135
136        LOGGER.debug("  handshake completed");
137
138        // Connected OK
139        try {
140            proxyIoSession.getEventQueue().flushPendingSessionEvents();
141            flushPendingWriteRequests();
142        } catch (Exception ex) {
143            LOGGER.error("Unable to flush pending write requests", ex);
144        }
145    }
146
147    /**
148     * Send any write requests which were queued whilst waiting for handshaking to complete.
149     * 
150     * @throws Exception If we can't flush the pending write requests
151     */
152    protected synchronized void flushPendingWriteRequests() throws Exception {
153        LOGGER.debug(" flushPendingWriteRequests()");
154
155        if (writeRequestQueue == null) {
156            return;
157        }
158
159        Event scheduledWrite;
160        while ((scheduledWrite = writeRequestQueue.poll()) != null) {
161            LOGGER.debug(" Flushing buffered write request: {}", scheduledWrite.data);
162
163            getProxyFilter().filterWrite(scheduledWrite.nextFilter, getSession(), (WriteRequest) scheduledWrite.data);
164        }
165
166        // Free queue
167        writeRequestQueue = null;
168    }
169
170    /**
171     * Enqueue a message to be written once handshaking is complete.
172     */
173    public synchronized void enqueueWriteRequest(final NextFilter nextFilter, final WriteRequest writeRequest) {
174        if (writeRequestQueue == null) {
175            writeRequestQueue = new LinkedList<Event>();
176        }
177
178        writeRequestQueue.offer(new Event(nextFilter, writeRequest));
179    }
180
181    /**
182     * Closes the session.
183     * 
184     * @param message the error message
185     * @param t the exception which caused the session closing
186     */
187    protected void closeSession(final String message, final Throwable t) {
188        if (t != null) {
189            LOGGER.error(message, t);
190            proxyIoSession.setAuthenticationFailed(true);
191        } else {
192            LOGGER.error(message);
193        }
194
195        getSession().closeNow();
196    }
197
198    /**
199     * Closes the session.
200     * 
201     * @param message the error message
202     */
203    protected void closeSession(final String message) {
204        closeSession(message, null);
205    }
206
207    /**
208     * Event wrapper class for enqueued events.
209     */
210    private final static class Event {
211        private final NextFilter nextFilter;
212
213        private final Object data;
214
215        Event(final NextFilter nextFilter, final Object data) {
216            this.nextFilter = nextFilter;
217            this.data = data;
218        }
219    }
220}