001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.filter.codec; 021 022import java.net.SocketAddress; 023import java.util.Queue; 024 025import org.apache.mina.core.buffer.IoBuffer; 026import org.apache.mina.core.file.FileRegion; 027import org.apache.mina.core.filterchain.IoFilter; 028import org.apache.mina.core.filterchain.IoFilterAdapter; 029import org.apache.mina.core.filterchain.IoFilterChain; 030import org.apache.mina.core.future.DefaultWriteFuture; 031import org.apache.mina.core.future.WriteFuture; 032import org.apache.mina.core.session.AttributeKey; 033import org.apache.mina.core.session.IoSession; 034import org.apache.mina.core.write.DefaultWriteRequest; 035import org.apache.mina.core.write.NothingWrittenException; 036import org.apache.mina.core.write.WriteRequest; 037import org.apache.mina.core.write.WriteRequestWrapper; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * An {@link IoFilter} which translates binary or protocol specific data into 043 * message objects and vice versa using {@link ProtocolCodecFactory}, 044 * {@link ProtocolEncoder}, or {@link ProtocolDecoder}. 045 * 046 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 047 * @org.apache.xbean.XBean 048 */ 049public class ProtocolCodecFilter extends IoFilterAdapter { 050 /** A logger for this class */ 051 private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class); 052 053 private static final Class<?>[] EMPTY_PARAMS = new Class[0]; 054 055 private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]); 056 057 private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder"); 058 059 private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder"); 060 061 private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut"); 062 063 private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut"); 064 065 /** The factory responsible for creating the encoder and decoder */ 066 private final ProtocolCodecFactory factory; 067 068 /** 069 * Creates a new instance of ProtocolCodecFilter, associating a factory 070 * for the creation of the encoder and decoder. 071 * 072 * @param factory The associated factory 073 */ 074 public ProtocolCodecFilter(ProtocolCodecFactory factory) { 075 if (factory == null) { 076 throw new IllegalArgumentException("factory"); 077 } 078 079 this.factory = factory; 080 } 081 082 /** 083 * Creates a new instance of ProtocolCodecFilter, without any factory. 084 * The encoder/decoder factory will be created as an inner class, using 085 * the two parameters (encoder and decoder). 086 * 087 * @param encoder The class responsible for encoding the message 088 * @param decoder The class responsible for decoding the message 089 */ 090 public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) { 091 if (encoder == null) { 092 throw new IllegalArgumentException("encoder"); 093 } 094 if (decoder == null) { 095 throw new IllegalArgumentException("decoder"); 096 } 097 098 // Create the inner Factory based on the two parameters 099 this.factory = new ProtocolCodecFactory() { 100 public ProtocolEncoder getEncoder(IoSession session) { 101 return encoder; 102 } 103 104 public ProtocolDecoder getDecoder(IoSession session) { 105 return decoder; 106 } 107 }; 108 } 109 110 /** 111 * Creates a new instance of ProtocolCodecFilter, without any factory. 112 * The encoder/decoder factory will be created as an inner class, using 113 * the two parameters (encoder and decoder), which are class names. Instances 114 * for those classes will be created in this constructor. 115 * 116 * @param encoderClass The class responsible for encoding the message 117 * @param decoderClass The class responsible for decoding the message 118 */ 119 public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass, 120 final Class<? extends ProtocolDecoder> decoderClass) { 121 if (encoderClass == null) { 122 throw new IllegalArgumentException("encoderClass"); 123 } 124 if (decoderClass == null) { 125 throw new IllegalArgumentException("decoderClass"); 126 } 127 if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) { 128 throw new IllegalArgumentException("encoderClass: " + encoderClass.getName()); 129 } 130 if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) { 131 throw new IllegalArgumentException("decoderClass: " + decoderClass.getName()); 132 } 133 try { 134 encoderClass.getConstructor(EMPTY_PARAMS); 135 } catch (NoSuchMethodException e) { 136 throw new IllegalArgumentException("encoderClass doesn't have a public default constructor."); 137 } 138 try { 139 decoderClass.getConstructor(EMPTY_PARAMS); 140 } catch (NoSuchMethodException e) { 141 throw new IllegalArgumentException("decoderClass doesn't have a public default constructor."); 142 } 143 144 final ProtocolEncoder encoder; 145 146 try { 147 encoder = encoderClass.newInstance(); 148 } catch (Exception e) { 149 throw new IllegalArgumentException("encoderClass cannot be initialized"); 150 } 151 152 final ProtocolDecoder decoder; 153 154 try { 155 decoder = decoderClass.newInstance(); 156 } catch (Exception e) { 157 throw new IllegalArgumentException("decoderClass cannot be initialized"); 158 } 159 160 // Create the inner factory based on the two parameters. 161 this.factory = new ProtocolCodecFactory() { 162 public ProtocolEncoder getEncoder(IoSession session) throws Exception { 163 return encoder; 164 } 165 166 public ProtocolDecoder getDecoder(IoSession session) throws Exception { 167 return decoder; 168 } 169 }; 170 } 171 172 /** 173 * Get the encoder instance from a given session. 174 * 175 * @param session The associated session we will get the encoder from 176 * @return The encoder instance, if any 177 */ 178 public ProtocolEncoder getEncoder(IoSession session) { 179 return (ProtocolEncoder) session.getAttribute(ENCODER); 180 } 181 182 @Override 183 public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { 184 if (parent.contains(this)) { 185 throw new IllegalArgumentException( 186 "You can't add the same filter instance more than once. Create another instance and add it."); 187 } 188 } 189 190 @Override 191 public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { 192 // Clean everything 193 disposeCodec(parent.getSession()); 194 } 195 196 /** 197 * Process the incoming message, calling the session decoder. As the incoming 198 * buffer might contains more than one messages, we have to loop until the decoder 199 * throws an exception. 200 * 201 * while ( buffer not empty ) 202 * try 203 * decode ( buffer ) 204 * catch 205 * break; 206 * 207 */ 208 @Override 209 public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { 210 LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId()); 211 212 if (!(message instanceof IoBuffer)) { 213 nextFilter.messageReceived(session, message); 214 return; 215 } 216 217 IoBuffer in = (IoBuffer) message; 218 ProtocolDecoder decoder = factory.getDecoder(session); 219 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter); 220 221 // Loop until we don't have anymore byte in the buffer, 222 // or until the decoder throws an unrecoverable exception or 223 // can't decoder a message, because there are not enough 224 // data in the buffer 225 while (in.hasRemaining()) { 226 int oldPos = in.position(); 227 try { 228 synchronized (session) { 229 // Call the decoder with the read bytes 230 decoder.decode(session, in, decoderOut); 231 } 232 // Finish decoding if no exception was thrown. 233 decoderOut.flush(nextFilter, session); 234 } catch (Exception e) { 235 ProtocolDecoderException pde; 236 if (e instanceof ProtocolDecoderException) { 237 pde = (ProtocolDecoderException) e; 238 } else { 239 pde = new ProtocolDecoderException(e); 240 } 241 if (pde.getHexdump() == null) { 242 // Generate a message hex dump 243 int curPos = in.position(); 244 in.position(oldPos); 245 pde.setHexdump(in.getHexDump()); 246 in.position(curPos); 247 } 248 // Fire the exceptionCaught event. 249 decoderOut.flush(nextFilter, session); 250 nextFilter.exceptionCaught(session, pde); 251 // Retry only if the type of the caught exception is 252 // recoverable and the buffer position has changed. 253 // We check buffer position additionally to prevent an 254 // infinite loop. 255 if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) { 256 break; 257 } 258 } 259 } 260 } 261 262 @Override 263 public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { 264 if (writeRequest instanceof EncodedWriteRequest) { 265 return; 266 } 267 268 if (writeRequest instanceof MessageWriteRequest) { 269 MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest; 270 nextFilter.messageSent(session, wrappedRequest.getParentRequest()); 271 } else { 272 nextFilter.messageSent(session, writeRequest); 273 } 274 } 275 276 @Override 277 public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { 278 Object message = writeRequest.getMessage(); 279 280 // Bypass the encoding if the message is contained in a IoBuffer, 281 // as it has already been encoded before 282 if ((message instanceof IoBuffer) || (message instanceof FileRegion)) { 283 nextFilter.filterWrite(session, writeRequest); 284 return; 285 } 286 287 // Get the encoder in the session 288 ProtocolEncoder encoder = factory.getEncoder(session); 289 290 ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest); 291 292 if (encoder == null) { 293 throw new ProtocolEncoderException("The encoder is null for the session " + session); 294 } 295 296 try { 297 // Now we can try to encode the response 298 encoder.encode(session, message, encoderOut); 299 300 // Send it directly 301 Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue(); 302 303 // Write all the encoded messages now 304 while (!bufferQueue.isEmpty()) { 305 Object encodedMessage = bufferQueue.poll(); 306 307 if (encodedMessage == null) { 308 break; 309 } 310 311 // Flush only when the buffer has remaining. 312 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) { 313 SocketAddress destination = writeRequest.getDestination(); 314 WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 315 316 nextFilter.filterWrite(session, encodedWriteRequest); 317 } 318 } 319 320 // Call the next filter 321 nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest)); 322 } catch (Exception e) { 323 ProtocolEncoderException pee; 324 325 // Generate the correct exception 326 if (e instanceof ProtocolEncoderException) { 327 pee = (ProtocolEncoderException) e; 328 } else { 329 pee = new ProtocolEncoderException(e); 330 } 331 332 throw pee; 333 } 334 } 335 336 @Override 337 public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { 338 // Call finishDecode() first when a connection is closed. 339 ProtocolDecoder decoder = factory.getDecoder(session); 340 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter); 341 342 try { 343 decoder.finishDecode(session, decoderOut); 344 } catch (Exception e) { 345 ProtocolDecoderException pde; 346 if (e instanceof ProtocolDecoderException) { 347 pde = (ProtocolDecoderException) e; 348 } else { 349 pde = new ProtocolDecoderException(e); 350 } 351 throw pde; 352 } finally { 353 // Dispose everything 354 disposeCodec(session); 355 decoderOut.flush(nextFilter, session); 356 } 357 358 // Call the next filter 359 nextFilter.sessionClosed(session); 360 } 361 362 private static class EncodedWriteRequest extends DefaultWriteRequest { 363 public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) { 364 super(encodedMessage, future, destination); 365 } 366 367 public boolean isEncoded() { 368 return true; 369 } 370 } 371 372 private static class MessageWriteRequest extends WriteRequestWrapper { 373 public MessageWriteRequest(WriteRequest writeRequest) { 374 super(writeRequest); 375 } 376 377 @Override 378 public Object getMessage() { 379 return EMPTY_BUFFER; 380 } 381 382 @Override 383 public String toString() { 384 return "MessageWriteRequest, parent : " + super.toString(); 385 } 386 } 387 388 private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput { 389 public ProtocolDecoderOutputImpl() { 390 // Do nothing 391 } 392 393 public void flush(NextFilter nextFilter, IoSession session) { 394 Queue<Object> messageQueue = getMessageQueue(); 395 396 while (!messageQueue.isEmpty()) { 397 nextFilter.messageReceived(session, messageQueue.poll()); 398 } 399 } 400 } 401 402 private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput { 403 private final IoSession session; 404 405 private final NextFilter nextFilter; 406 407 /** The WriteRequest destination */ 408 private final SocketAddress destination; 409 410 public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { 411 this.session = session; 412 this.nextFilter = nextFilter; 413 414 // Only store the destination, not the full WriteRequest. 415 destination = writeRequest.getDestination(); 416 } 417 418 public WriteFuture flush() { 419 Queue<Object> bufferQueue = getMessageQueue(); 420 WriteFuture future = null; 421 422 while (!bufferQueue.isEmpty()) { 423 Object encodedMessage = bufferQueue.poll(); 424 425 if (encodedMessage == null) { 426 break; 427 } 428 429 // Flush only when the buffer has remaining. 430 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) { 431 future = new DefaultWriteFuture(session); 432 nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination)); 433 } 434 } 435 436 if (future == null) { 437 // Creates an empty writeRequest containing the destination 438 WriteRequest writeRequest = new DefaultWriteRequest( 439 DefaultWriteRequest.EMPTY_MESSAGE, null, destination); 440 future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(writeRequest)); 441 } 442 443 return future; 444 } 445 } 446 447 //----------- Helper methods --------------------------------------------- 448 /** 449 * Dispose the encoder, decoder, and the callback for the decoded 450 * messages. 451 */ 452 private void disposeCodec(IoSession session) { 453 // We just remove the two instances of encoder/decoder to release resources 454 // from the session 455 disposeEncoder(session); 456 disposeDecoder(session); 457 458 // We also remove the callback 459 disposeDecoderOut(session); 460 } 461 462 /** 463 * Dispose the encoder, removing its instance from the 464 * session's attributes, and calling the associated 465 * dispose method. 466 */ 467 private void disposeEncoder(IoSession session) { 468 ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER); 469 if (encoder == null) { 470 return; 471 } 472 473 try { 474 encoder.dispose(session); 475 } catch (Exception e) { 476 LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')'); 477 } 478 } 479 480 /** 481 * Dispose the decoder, removing its instance from the 482 * session's attributes, and calling the associated 483 * dispose method. 484 */ 485 private void disposeDecoder(IoSession session) { 486 ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER); 487 if (decoder == null) { 488 return; 489 } 490 491 try { 492 decoder.dispose(session); 493 } catch (Exception e) { 494 LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')'); 495 } 496 } 497 498 /** 499 * Return a reference to the decoder callback. If it's not already created 500 * and stored into the session, we create a new instance. 501 */ 502 private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) { 503 ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT); 504 505 if (out == null) { 506 // Create a new instance, and stores it into the session 507 out = new ProtocolDecoderOutputImpl(); 508 session.setAttribute(DECODER_OUT, out); 509 } 510 511 return out; 512 } 513 514 private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { 515 ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT); 516 517 if (out == null) { 518 // Create a new instance, and stores it into the session 519 out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest); 520 session.setAttribute(ENCODER_OUT, out); 521 } 522 523 return out; 524 } 525 526 /** 527 * Remove the decoder callback from the session's attributes. 528 */ 529 private void disposeDecoderOut(IoSession session) { 530 session.removeAttribute(DECODER_OUT); 531 } 532}