001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.proxy; 021 022import java.util.LinkedList; 023import java.util.Queue; 024 025import org.apache.mina.core.buffer.IoBuffer; 026import org.apache.mina.core.filterchain.IoFilter.NextFilter; 027import org.apache.mina.core.future.DefaultWriteFuture; 028import org.apache.mina.core.future.WriteFuture; 029import org.apache.mina.core.session.IoSession; 030import org.apache.mina.core.write.DefaultWriteRequest; 031import org.apache.mina.core.write.WriteRequest; 032import org.apache.mina.proxy.filter.ProxyFilter; 033import org.apache.mina.proxy.filter.ProxyHandshakeIoBuffer; 034import org.apache.mina.proxy.session.ProxyIoSession; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * AbstractProxyLogicHandler.java - Helper class to handle proxy handshaking logic. Derived classes 040 * implement proxy type specific logic. 041 * <p> 042 * Based upon SSLHandler from mina-filter-ssl. 043 * 044 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 045 * @since MINA 2.0.0-M3 046 */ 047public abstract class AbstractProxyLogicHandler implements ProxyLogicHandler { 048 049 private final static Logger LOGGER = LoggerFactory.getLogger(AbstractProxyLogicHandler.class); 050 051 /** 052 * Object that contains all the proxy authentication session informations. 053 */ 054 private ProxyIoSession proxyIoSession; 055 056 /** 057 * Queue of write events which occurred before the proxy handshake had completed. 058 */ 059 private Queue<Event> writeRequestQueue = null; 060 061 /** 062 * Has the handshake been completed. 063 */ 064 private boolean handshakeComplete = false; 065 066 /** 067 * Creates a new {@link AbstractProxyLogicHandler}. 068 * 069 * @param proxyIoSession {@link ProxyIoSession} in use. 070 */ 071 public AbstractProxyLogicHandler(ProxyIoSession proxyIoSession) { 072 this.proxyIoSession = proxyIoSession; 073 } 074 075 /** 076 * @return the proxy filter {@link ProxyFilter}. 077 */ 078 protected ProxyFilter getProxyFilter() { 079 return proxyIoSession.getProxyFilter(); 080 } 081 082 /** 083 * @return the session. 084 */ 085 protected IoSession getSession() { 086 return proxyIoSession.getSession(); 087 } 088 089 /** 090 * @return the {@link ProxyIoSession} object. 091 */ 092 public ProxyIoSession getProxyIoSession() { 093 return proxyIoSession; 094 } 095 096 /** 097 * Writes data to the proxy server. 098 * 099 * @param nextFilter the next filter 100 * @param data Data buffer to be written. 101 * @return A Future for the write operation 102 */ 103 protected WriteFuture writeData(final NextFilter nextFilter, final IoBuffer data) { 104 // write net data 105 ProxyHandshakeIoBuffer writeBuffer = new ProxyHandshakeIoBuffer(data); 106 107 LOGGER.debug(" session write: {}", writeBuffer); 108 109 WriteFuture writeFuture = new DefaultWriteFuture(getSession()); 110 getProxyFilter().writeData(nextFilter, getSession(), new DefaultWriteRequest(writeBuffer, writeFuture), true); 111 112 return writeFuture; 113 } 114 115 /** 116 * @return <tt>true</tt> if handshaking is complete and 117 * data can be sent through the proxy. 118 */ 119 public boolean isHandshakeComplete() { 120 synchronized (this) { 121 return handshakeComplete; 122 } 123 } 124 125 /** 126 * Signals that the handshake has finished. 127 */ 128 protected final void setHandshakeComplete() { 129 synchronized (this) { 130 handshakeComplete = true; 131 } 132 133 ProxyIoSession proxyIoSession = getProxyIoSession(); 134 proxyIoSession.getConnector().fireConnected(proxyIoSession.getSession()).awaitUninterruptibly(); 135 136 LOGGER.debug(" handshake completed"); 137 138 // Connected OK 139 try { 140 proxyIoSession.getEventQueue().flushPendingSessionEvents(); 141 flushPendingWriteRequests(); 142 } catch (Exception ex) { 143 LOGGER.error("Unable to flush pending write requests", ex); 144 } 145 } 146 147 /** 148 * Send any write requests which were queued whilst waiting for handshaking to complete. 149 * 150 * @throws Exception If we can't flush the pending write requests 151 */ 152 protected synchronized void flushPendingWriteRequests() throws Exception { 153 LOGGER.debug(" flushPendingWriteRequests()"); 154 155 if (writeRequestQueue == null) { 156 return; 157 } 158 159 Event scheduledWrite; 160 while ((scheduledWrite = writeRequestQueue.poll()) != null) { 161 LOGGER.debug(" Flushing buffered write request: {}", scheduledWrite.data); 162 163 getProxyFilter().filterWrite(scheduledWrite.nextFilter, getSession(), (WriteRequest) scheduledWrite.data); 164 } 165 166 // Free queue 167 writeRequestQueue = null; 168 } 169 170 /** 171 * Enqueue a message to be written once handshaking is complete. 172 */ 173 public synchronized void enqueueWriteRequest(final NextFilter nextFilter, final WriteRequest writeRequest) { 174 if (writeRequestQueue == null) { 175 writeRequestQueue = new LinkedList<Event>(); 176 } 177 178 writeRequestQueue.offer(new Event(nextFilter, writeRequest)); 179 } 180 181 /** 182 * Closes the session. 183 * 184 * @param message the error message 185 * @param t the exception which caused the session closing 186 */ 187 protected void closeSession(final String message, final Throwable t) { 188 if (t != null) { 189 LOGGER.error(message, t); 190 proxyIoSession.setAuthenticationFailed(true); 191 } else { 192 LOGGER.error(message); 193 } 194 195 getSession().closeNow(); 196 } 197 198 /** 199 * Closes the session. 200 * 201 * @param message the error message 202 */ 203 protected void closeSession(final String message) { 204 closeSession(message, null); 205 } 206 207 /** 208 * Event wrapper class for enqueued events. 209 */ 210 private final static class Event { 211 private final NextFilter nextFilter; 212 213 private final Object data; 214 215 Event(final NextFilter nextFilter, final Object data) { 216 this.nextFilter = nextFilter; 217 this.data = data; 218 } 219 } 220}