View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.proxy;
21  
22  import java.util.LinkedList;
23  import java.util.Queue;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.filterchain.IoFilter.NextFilter;
27  import org.apache.mina.core.future.DefaultWriteFuture;
28  import org.apache.mina.core.future.WriteFuture;
29  import org.apache.mina.core.session.IoSession;
30  import org.apache.mina.core.write.DefaultWriteRequest;
31  import org.apache.mina.core.write.WriteRequest;
32  import org.apache.mina.proxy.filter.ProxyFilter;
33  import org.apache.mina.proxy.filter.ProxyHandshakeIoBuffer;
34  import org.apache.mina.proxy.session.ProxyIoSession;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  
38  /**
39   * AbstractProxyLogicHandler.java - Helper class to handle proxy handshaking logic. Derived classes 
40   * implement proxy type specific logic.
41   * <p>
42   * Based upon SSLHandler from mina-filter-ssl.
43   * 
44   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
45   * @since MINA 2.0.0-M3
46   */
47  public abstract class AbstractProxyLogicHandler implements ProxyLogicHandler {
48  
49      private final static Logger LOGGER = LoggerFactory.getLogger(AbstractProxyLogicHandler.class);
50  
51      /**
52       * Object that contains all the proxy authentication session informations.
53       */
54      private ProxyIoSession proxyIoSession;
55  
56      /**
57       * Queue of write events which occurred before the proxy handshake had completed.
58       */
59      private Queue<Event> writeRequestQueue = null;
60  
61      /**
62       * Has the handshake been completed.
63       */
64      private boolean handshakeComplete = false;
65  
66      /**
67       * Creates a new {@link AbstractProxyLogicHandler}.
68       * 
69       * @param proxyIoSession {@link ProxyIoSession} in use.
70       */
71      public AbstractProxyLogicHandler(ProxyIoSession proxyIoSession) {
72          this.proxyIoSession = proxyIoSession;
73      }
74  
75      /**
76       * @return the proxy filter {@link ProxyFilter}.
77       */
78      protected ProxyFilter getProxyFilter() {
79          return proxyIoSession.getProxyFilter();
80      }
81  
82      /**
83       * @return the session.
84       */
85      protected IoSession getSession() {
86          return proxyIoSession.getSession();
87      }
88  
89      /**
90       * @return the {@link ProxyIoSession} object.
91       */
92      public ProxyIoSession getProxyIoSession() {
93          return proxyIoSession;
94      }
95  
96      /**
97       * Writes data to the proxy server.
98       * 
99       * @param nextFilter the next filter
100      * @param data Data buffer to be written.
101      * @return A Future for the write operation
102      */
103     protected WriteFuture writeData(final NextFilter nextFilter, final IoBuffer data) {
104         // write net data
105         ProxyHandshakeIoBufferer.html#ProxyHandshakeIoBuffer">ProxyHandshakeIoBuffer writeBuffer = new ProxyHandshakeIoBuffer(data);
106 
107         if (LOGGER.isDebugEnabled()) {
108             LOGGER.debug("   session write: {}", writeBuffer);
109         }
110 
111         WriteFuture writeFuture = new DefaultWriteFuture(getSession());
112         getProxyFilter().writeData(nextFilter, getSession(), new DefaultWriteRequest(writeBuffer, writeFuture), true);
113 
114         return writeFuture;
115     }
116 
117     /**
118      * @return <tt>true</tt> if handshaking is complete and
119      * data can be sent through the proxy.
120      */
121     public boolean isHandshakeComplete() {
122         synchronized (this) {
123             return handshakeComplete;
124         }
125     }
126 
127     /**
128      * Signals that the handshake has finished.
129      */
130     protected final void setHandshakeComplete() {
131         synchronized (this) {
132             handshakeComplete = true;
133         }
134 
135         ProxyIoSession proxyIoSession = getProxyIoSession();
136         proxyIoSession.getConnector().fireConnected(proxyIoSession.getSession()).awaitUninterruptibly();
137 
138         if (LOGGER.isDebugEnabled()) {
139             LOGGER.debug("  handshake completed");
140         }
141 
142         // Connected OK
143         try {
144             proxyIoSession.getEventQueue().flushPendingSessionEvents();
145             flushPendingWriteRequests();
146         } catch (Exception ex) {
147             LOGGER.error("Unable to flush pending write requests", ex);
148         }
149     }
150 
151     /**
152      * Send any write requests which were queued whilst waiting for handshaking to complete.
153      * 
154      * @throws Exception If we can't flush the pending write requests
155      */
156     protected synchronized void flushPendingWriteRequests() throws Exception {
157         if (LOGGER.isDebugEnabled()) {
158             LOGGER.debug(" flushPendingWriteRequests()");
159         }
160 
161         if (writeRequestQueue == null) {
162             return;
163         }
164 
165         Event scheduledWrite;
166         while ((scheduledWrite = writeRequestQueue.poll()) != null) {
167             if (LOGGER.isDebugEnabled()) {
168                 LOGGER.debug(" Flushing buffered write request: {}", scheduledWrite.data);
169             }
170 
171             getProxyFilter().filterWrite(scheduledWrite.nextFilter, getSession(), (WriteRequest) scheduledWrite.data);
172         }
173 
174         // Free queue
175         writeRequestQueue = null;
176     }
177 
178     /**
179      * Enqueue a message to be written once handshaking is complete.
180      */
181     public synchronized void enqueueWriteRequest(final NextFilter nextFilter, final WriteRequest writeRequest) {
182         if (writeRequestQueue == null) {
183             writeRequestQueue = new LinkedList<Event>();
184         }
185 
186         writeRequestQueue.offer(new Event(nextFilter, writeRequest));
187     }
188 
189     /**
190      * Closes the session.
191      * 
192      * @param message the error message
193      * @param t the exception which caused the session closing
194      */
195     protected void closeSession(final String message, final Throwable t) {
196         if (t != null) {
197             LOGGER.error(message, t);
198             proxyIoSession.setAuthenticationFailed(true);
199         } else {
200             LOGGER.error(message);
201         }
202 
203         getSession().closeNow();
204     }
205 
206     /**
207      * Closes the session.
208      * 
209      * @param message the error message
210      */
211     protected void closeSession(final String message) {
212         closeSession(message, null);
213     }
214 
215     /**
216      * Event wrapper class for enqueued events.
217      */
218     private final static class Event {
219         private final NextFilter nextFilter;
220 
221         private final Object data;
222 
223         Event(final NextFilter nextFilter, final Object data) {
224             this.nextFilter = nextFilter;
225             this.data = data;
226         }
227     }
228 }