View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.proxy;
21  
22  import java.util.LinkedList;
23  import java.util.Queue;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.filterchain.IoFilter.NextFilter;
27  import org.apache.mina.core.future.DefaultWriteFuture;
28  import org.apache.mina.core.future.WriteFuture;
29  import org.apache.mina.core.session.IoSession;
30  import org.apache.mina.core.write.DefaultWriteRequest;
31  import org.apache.mina.core.write.WriteRequest;
32  import org.apache.mina.proxy.filter.ProxyFilter;
33  import org.apache.mina.proxy.filter.ProxyHandshakeIoBuffer;
34  import org.apache.mina.proxy.session.ProxyIoSession;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  
38  /**
39   * AbstractProxyLogicHandler.java - Helper class to handle proxy handshaking logic. Derived classes 
40   * implement proxy type specific logic.
41   * <p>
42   * Based upon SSLHandler from mina-filter-ssl.
43   * 
44   * @author The Apache MINA Project (dev@mina.apache.org)
45   * @since MINA 2.0.0-M3
46   */
47  public abstract class AbstractProxyLogicHandler implements ProxyLogicHandler {
48  
49      private final static Logger LOGGER = LoggerFactory
50              .getLogger(AbstractProxyLogicHandler.class);
51  
52      /**
53       * Object that contains all the proxy authentication session informations.
54       */
55      private ProxyIoSession proxyIoSession;
56  
57      /**
58       * Queue of write events which occurred before the proxy handshake had completed.
59       */
60      private Queue<Event> writeRequestQueue = null;
61  
62      /**
63       * Has the handshake been completed.
64       */
65      private boolean handshakeComplete = false;
66  
67      /**
68       * Creates a new {@link AbstractProxyLogicHandler}.
69       * 
70       * @param proxyIoSession {@link ProxyIoSession} in use.
71       */
72      public AbstractProxyLogicHandler(ProxyIoSession proxyIoSession) {
73          this.proxyIoSession = proxyIoSession;
74      }
75  
76      /**
77       * Returns the proxy filter {@link ProxyFilter}.
78       */
79      protected ProxyFilter getProxyFilter() {
80          return proxyIoSession.getProxyFilter();
81      }
82  
83      /**
84       * Returns the session.
85       */
86      protected IoSession getSession() {
87          return proxyIoSession.getSession();
88      }
89  
90      /**
91       * Returns the {@link ProxyIoSession} object.
92       */
93      public ProxyIoSession getProxyIoSession() {
94          return proxyIoSession;
95      }
96  
97      /**
98       * Writes data to the proxy server.
99       * 
100      * @param nextFilter the next filter
101      * @param data Data buffer to be written.
102      */
103     protected WriteFuture writeData(final NextFilter nextFilter,
104             final IoBuffer data) {
105         // write net data
106         ProxyHandshakeIoBuffer writeBuffer = new ProxyHandshakeIoBuffer(data);
107 
108         LOGGER.debug("   session write: {}", writeBuffer);
109 
110         WriteFuture writeFuture = new DefaultWriteFuture(getSession());
111         getProxyFilter().writeData(nextFilter, getSession(),
112                 new DefaultWriteRequest(writeBuffer, writeFuture), true);
113 
114         return writeFuture;
115     }
116 
117     /**
118      * Returns <code>true</code> if handshaking is complete and
119      * data can be sent through the proxy.
120      */
121     public boolean isHandshakeComplete() {
122         synchronized (this) {
123             return handshakeComplete;
124         }
125     }
126 
127     /**
128      * Signals that the handshake has finished.
129      */
130     protected final void setHandshakeComplete() {
131         synchronized (this) {
132             handshakeComplete = true;
133         }
134 
135         ProxyIoSession proxyIoSession = getProxyIoSession();
136         proxyIoSession.getConnector()
137                 .fireConnected(proxyIoSession.getSession())
138                 .awaitUninterruptibly();
139 
140         LOGGER.debug("  handshake completed");
141 
142         // Connected OK
143         try {
144             proxyIoSession.getEventQueue().flushPendingSessionEvents();
145             flushPendingWriteRequests();
146         } catch (Exception ex) {
147             LOGGER.error("Unable to flush pending write requests", ex);
148         }
149     }
150 
151     /**
152      * Send any write requests which were queued whilst waiting for handshaking to complete.
153      */
154     protected synchronized void flushPendingWriteRequests() throws Exception {
155         LOGGER.debug(" flushPendingWriteRequests()");
156 
157         if (writeRequestQueue == null) {
158             return;
159         }
160 
161         Event scheduledWrite;
162         while ((scheduledWrite = writeRequestQueue.poll()) != null) {
163             LOGGER.debug(" Flushing buffered write request: {}",
164                     scheduledWrite.data);
165 
166             getProxyFilter().filterWrite(scheduledWrite.nextFilter,
167                     getSession(), (WriteRequest) scheduledWrite.data);
168         }
169 
170         // Free queue
171         writeRequestQueue = null;
172     }
173 
174     /**
175      * Enqueue a message to be written once handshaking is complete.
176      */
177     public synchronized void enqueueWriteRequest(final NextFilter nextFilter,
178             final WriteRequest writeRequest) {
179         if (writeRequestQueue == null) {
180             writeRequestQueue = new LinkedList<Event>();
181         }
182 
183         writeRequestQueue.offer(new Event(nextFilter, writeRequest));
184     }
185 
186     /**
187      * Closes the session.
188      * 
189      * @param message the error message
190      * @param t the exception which caused the session closing
191      */
192     protected void closeSession(final String message, final Throwable t) {
193         if (t != null) {
194             LOGGER.error(message, t);
195             proxyIoSession.setAuthenticationFailed(true);
196         } else {
197             LOGGER.error(message);
198         }
199 
200         getSession().close(true);
201     }
202 
203     /**
204      * Closes the session.
205      * 
206      * @param message the error message
207      */
208     protected void closeSession(final String message) {
209         closeSession(message, null);
210     }
211 
212     /**
213      * Event wrapper class for enqueued events.
214      */
215     private final static class Event {
216         private final NextFilter nextFilter;
217 
218         private final Object data;
219 
220         Event(final NextFilter nextFilter, final Object data) {
221             this.nextFilter = nextFilter;
222             this.data = data;
223         }
224 
225         public Object getData() {
226             return data;
227         }
228 
229         public NextFilter getNextFilter() {
230             return nextFilter;
231         }
232     }
233 }