001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.filter.stream; 021 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertSame; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.net.InetSocketAddress; 028import java.net.SocketAddress; 029import java.security.MessageDigest; 030import java.util.LinkedList; 031import java.util.Queue; 032import java.util.Random; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.TimeUnit; 035 036import org.apache.mina.core.buffer.IoBuffer; 037import org.apache.mina.core.filterchain.IoFilter.NextFilter; 038import org.apache.mina.core.future.IoFutureListener; 039import org.apache.mina.core.future.WriteFuture; 040import org.apache.mina.core.service.IoHandlerAdapter; 041import org.apache.mina.core.session.DummySession; 042import org.apache.mina.core.session.IdleStatus; 043import org.apache.mina.core.session.IoSession; 044import org.apache.mina.core.write.DefaultWriteRequest; 045import org.apache.mina.core.write.WriteRequest; 046import org.apache.mina.transport.socket.nio.NioSocketAcceptor; 047import org.apache.mina.transport.socket.nio.NioSocketConnector; 048import org.apache.mina.util.AvailablePortFinder; 049import org.easymock.IArgumentMatcher; 050import org.easymock.EasyMock; 051import org.junit.Test; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * TODO Add documentation 057 * 058 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 059 */ 060public abstract class AbstractStreamWriteFilterTest<M, U extends AbstractStreamWriteFilter<M>> { 061 062 private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamWriteFilterTest.class); 063 064 protected final IoSession session = new DummySession(); 065 066 abstract protected U createFilter(); 067 068 abstract protected M createMessage(byte[] data) throws Exception; 069 070 @Test 071 public void testWriteEmptyFile() throws Exception { 072 AbstractStreamWriteFilter<M> filter = createFilter(); 073 M message = createMessage(new byte[0]); 074 075 WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture()); 076 077 NextFilter nextFilter = EasyMock.createMock(NextFilter.class); 078 /* 079 * Record expectations 080 */ 081 nextFilter.messageSent(session, writeRequest); 082 083 /* 084 * Replay. 085 */ 086 EasyMock.replay(nextFilter); 087 088 filter.filterWrite(nextFilter, session, writeRequest); 089 090 /* 091 * Verify. 092 */ 093 EasyMock.verify(nextFilter); 094 095 assertTrue(writeRequest.getFuture().isWritten()); 096 } 097 098 /** 099 * Tests that the filter just passes objects which aren't FileRegion's 100 * through to the next filter. 101 * 102 * @throws Exception when something goes wrong 103 */ 104 @Test 105 public void testWriteNonFileRegionMessage() throws Exception { 106 AbstractStreamWriteFilter<M> filter = createFilter(); 107 108 Object message = new Object(); 109 WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture()); 110 111 NextFilter nextFilter = EasyMock.createMock(NextFilter.class); 112 /* 113 * Record expectations 114 */ 115 nextFilter.filterWrite(session, writeRequest); 116 nextFilter.messageSent(session, writeRequest); 117 118 /* 119 * Replay. 120 */ 121 EasyMock.replay(nextFilter); 122 123 filter.filterWrite(nextFilter, session, writeRequest); 124 filter.messageSent(nextFilter, session, writeRequest); 125 126 /* 127 * Verify. 128 */ 129 EasyMock.verify(nextFilter); 130 } 131 132 /** 133 * Tests when the contents of the file fits into one write buffer. 134 * 135 * @throws Exception when something goes wrong 136 */ 137 @Test 138 public void testWriteSingleBufferFile() throws Exception { 139 byte[] data = new byte[] { 1, 2, 3, 4 }; 140 141 AbstractStreamWriteFilter<M> filter = createFilter(); 142 M message = createMessage(data); 143 144 WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture()); 145 146 NextFilter nextFilter = EasyMock.createMock(NextFilter.class); 147 /* 148 * Record expectations 149 */ 150 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(IoBuffer.wrap(data)))); 151 nextFilter.messageSent(session, writeRequest); 152 153 /* 154 * Replay. 155 */ 156 EasyMock.replay(nextFilter); 157 158 filter.filterWrite(nextFilter, session, writeRequest); 159 filter.messageSent(nextFilter, session, writeRequest); 160 161 /* 162 * Verify. 163 */ 164 EasyMock.verify(nextFilter); 165 166 assertTrue(writeRequest.getFuture().isWritten()); 167 } 168 169 /** 170 * Tests when the contents of the file doesn't fit into one write buffer. 171 * 172 * @throws Exception when something goes wrong 173 */ 174 @Test 175 public void testWriteSeveralBuffersStream() throws Exception { 176 AbstractStreamWriteFilter<M> filter = createFilter(); 177 filter.setWriteBufferSize(4); 178 179 byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; 180 byte[] chunk1 = new byte[] { 1, 2, 3, 4 }; 181 byte[] chunk2 = new byte[] { 5, 6, 7, 8 }; 182 byte[] chunk3 = new byte[] { 9, 10 }; 183 184 M message = createMessage(data); 185 WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture()); 186 187 WriteRequest chunk1Request = new DefaultWriteRequest(IoBuffer.wrap(chunk1)); 188 WriteRequest chunk2Request = new DefaultWriteRequest(IoBuffer.wrap(chunk2)); 189 WriteRequest chunk3Request = new DefaultWriteRequest(IoBuffer.wrap(chunk3)); 190 191 NextFilter nextFilter = EasyMock.createMock(NextFilter.class); 192 /* 193 * Record expectations 194 */ 195 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk1Request)); 196 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk2Request)); 197 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk3Request)); 198 nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(writeRequest)); 199 200 /* 201 * Replay. 202 */ 203 EasyMock.replay(nextFilter); 204 205 filter.filterWrite(nextFilter, session, writeRequest); 206 filter.messageSent(nextFilter, session, chunk1Request); 207 filter.messageSent(nextFilter, session, chunk2Request); 208 filter.messageSent(nextFilter, session, chunk3Request); 209 210 /* 211 * Verify. 212 */ 213 EasyMock.verify(nextFilter); 214 215 assertTrue(writeRequest.getFuture().isWritten()); 216 } 217 218 @Test 219 public void testWriteWhileWriteInProgress() throws Exception { 220 AbstractStreamWriteFilter<M> filter = createFilter(); 221 M message = createMessage(new byte[5]); 222 223 Queue<WriteRequest> queue = new LinkedList<WriteRequest>(); 224 225 /* 226 * Make up the situation. 227 */ 228 session.setAttribute(filter.CURRENT_STREAM, message); 229 session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue); 230 231 NextFilter nextFilter = EasyMock.createMock(NextFilter.class); 232 /* 233 * Replay. (We recorded *nothing* because nothing should occur.) 234 */ 235 EasyMock.replay(nextFilter); 236 237 WriteRequest wr = new DefaultWriteRequest(new Object(), new DummyWriteFuture()); 238 filter.filterWrite(nextFilter, session, wr); 239 assertEquals(1, queue.size()); 240 assertSame(wr, queue.poll()); 241 242 /* 243 * Verify. 244 */ 245 EasyMock.verify(nextFilter); 246 247 session.removeAttribute(filter.CURRENT_STREAM); 248 session.removeAttribute(filter.WRITE_REQUEST_QUEUE); 249 } 250 251 @Test 252 public void testWritesWriteRequestQueueWhenFinished() throws Exception { 253 AbstractStreamWriteFilter<M> filter = createFilter(); 254 M message = createMessage(new byte[0]); 255 256 WriteRequest wrs[] = new WriteRequest[] { new DefaultWriteRequest(new Object(), new DummyWriteFuture()), 257 new DefaultWriteRequest(new Object(), new DummyWriteFuture()), 258 new DefaultWriteRequest(new Object(), new DummyWriteFuture()) }; 259 Queue<WriteRequest> queue = new LinkedList<WriteRequest>(); 260 queue.add(wrs[0]); 261 queue.add(wrs[1]); 262 queue.add(wrs[2]); 263 264 /* 265 * Make up the situation. 266 */ 267 session.setAttribute(filter.CURRENT_STREAM, message); 268 session.setAttribute(filter.CURRENT_WRITE_REQUEST, new DefaultWriteRequest(message)); 269 session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue); 270 271 /* 272 * Record expectations 273 */ 274 NextFilter nextFilter = EasyMock.createMock(NextFilter.class); 275 nextFilter.filterWrite(session, wrs[0]); 276 nextFilter.filterWrite(session, wrs[1]); 277 nextFilter.filterWrite(session, wrs[2]); 278 nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(message))); 279 280 /* 281 * Replay. 282 */ 283 EasyMock.replay(nextFilter); 284 285 filter.messageSent(nextFilter, session, new DefaultWriteRequest(new Object())); 286 assertEquals(0, queue.size()); 287 288 /* 289 * Verify. 290 */ 291 EasyMock.verify(nextFilter); 292 } 293 294 /** 295 * Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the 296 * specified size. 297 */ 298 @Test 299 public void testSetWriteBufferSize() { 300 AbstractStreamWriteFilter<M> filter = createFilter(); 301 302 try { 303 filter.setWriteBufferSize(0); 304 fail("0 writeBuferSize specified. IllegalArgumentException expected."); 305 } catch (IllegalArgumentException iae) { 306 // Pass, exception was thrown 307 // Signifies a successful test execution 308 assertTrue(true); 309 } 310 311 try { 312 filter.setWriteBufferSize(-100); 313 fail("Negative writeBuferSize specified. IllegalArgumentException expected."); 314 } catch (IllegalArgumentException iae) { 315 // Pass, exception was thrown 316 // Signifies a successful test execution 317 assertTrue(true); 318 } 319 320 filter.setWriteBufferSize(1); 321 assertEquals(1, filter.getWriteBufferSize()); 322 filter.setWriteBufferSize(1024); 323 assertEquals(1024, filter.getWriteBufferSize()); 324 } 325 326 @Test 327 public void testWriteUsingSocketTransport() throws Exception { 328 NioSocketAcceptor acceptor = new NioSocketAcceptor(); 329 acceptor.setReuseAddress(true); 330 SocketAddress address = new InetSocketAddress("localhost", AvailablePortFinder.getNextAvailable()); 331 332 NioSocketConnector connector = new NioSocketConnector(); 333 334 // Generate 4MB of random data 335 byte[] data = new byte[4 * 1024 * 1024]; 336 new Random().nextBytes(data); 337 338 byte[] expectedMd5 = MessageDigest.getInstance("MD5").digest(data); 339 340 M message = createMessage(data); 341 342 SenderHandler sender = new SenderHandler(message); 343 ReceiverHandler receiver = new ReceiverHandler(data.length); 344 345 acceptor.setHandler(sender); 346 connector.setHandler(receiver); 347 348 acceptor.bind(address); 349 connector.connect(address); 350 sender.latch.await(); 351 receiver.latch.await(); 352 353 acceptor.dispose(); 354 355 assertEquals(data.length, receiver.bytesRead); 356 byte[] actualMd5 = receiver.digest.digest(); 357 assertEquals(expectedMd5.length, actualMd5.length); 358 for (int i = 0; i < expectedMd5.length; i++) { 359 assertEquals(expectedMd5[i], actualMd5[i]); 360 } 361 } 362 363 private class SenderHandler extends IoHandlerAdapter { 364 final CountDownLatch latch = new CountDownLatch(1); 365 366 private M message; 367 368 StreamWriteFilter streamWriteFilter = new StreamWriteFilter(); 369 370 SenderHandler(M message) { 371 this.message = message; 372 } 373 374 @Override 375 public void sessionCreated(IoSession session) throws Exception { 376 super.sessionCreated(session); 377 session.getFilterChain().addLast("codec", streamWriteFilter); 378 } 379 380 @Override 381 public void sessionOpened(IoSession session) throws Exception { 382 session.write(message); 383 } 384 385 @Override 386 public void exceptionCaught(IoSession session, Throwable cause) throws Exception { 387 LOGGER.error("SenderHandler: exceptionCaught", cause); 388 latch.countDown(); 389 } 390 391 @Override 392 public void sessionClosed(IoSession session) throws Exception { 393 LOGGER.info("SenderHandler: sessionClosed"); 394 latch.countDown(); 395 } 396 397 @Override 398 public void sessionIdle(IoSession session, IdleStatus status) throws Exception { 399 LOGGER.info("SenderHandler: sessionIdle"); 400 latch.countDown(); 401 } 402 403 @Override 404 public void messageSent(IoSession session, Object message) throws Exception { 405 LOGGER.info("SenderHandler: messageSent"); 406 if (message == this.message) { 407 LOGGER.info("message == this.message"); 408 latch.countDown(); 409 } 410 } 411 } 412 413 private static class ReceiverHandler extends IoHandlerAdapter { 414 final CountDownLatch latch = new CountDownLatch(1); 415 416 long bytesRead = 0; 417 418 long size = 0; 419 420 MessageDigest digest; 421 422 ReceiverHandler(long size) throws Exception { 423 this.size = size; 424 digest = MessageDigest.getInstance("MD5"); 425 } 426 427 @Override 428 public void sessionCreated(IoSession session) throws Exception { 429 super.sessionCreated(session); 430 431 session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 5); 432 } 433 434 @Override 435 public void sessionIdle(IoSession session, IdleStatus status) throws Exception { 436 LOGGER.info("ReceiverHandler: sessionIdle"); 437 session.close(true); 438 } 439 440 @Override 441 public void exceptionCaught(IoSession session, Throwable cause) throws Exception { 442 LOGGER.error("ReceiverHandler: exceptionCaught", cause); 443 latch.countDown(); 444 } 445 446 @Override 447 public void sessionClosed(IoSession session) throws Exception { 448 LOGGER.info("ReceiverHandler: sessionClosed"); 449 latch.countDown(); 450 } 451 452 @Override 453 public void messageReceived(IoSession session, Object message) throws Exception { 454 LOGGER.info("messageReceived"); 455 IoBuffer buf = (IoBuffer) message; 456 while (buf.hasRemaining()) { 457 digest.update(buf.get()); 458 bytesRead++; 459 } 460 LOGGER.info("messageReceived: bytesRead = {}", bytesRead); 461 if (bytesRead >= size) { 462 session.close(true); 463 } 464 } 465 } 466 467 public static WriteRequest eqWriteRequest(WriteRequest expected) { 468 EasyMock.reportMatcher(new WriteRequestMatcher(expected)); 469 return null; 470 } 471 472 private static class WriteRequestMatcher implements IArgumentMatcher { 473 private final WriteRequest expected; 474 475 public WriteRequestMatcher(WriteRequest expected) { 476 this.expected = expected; 477 } 478 479 public boolean matches(Object actual) { 480 if (actual instanceof WriteRequest) { 481 WriteRequest w2 = (WriteRequest) actual; 482 483 return expected.getMessage().equals(w2.getMessage()) 484 && expected.getFuture().isWritten() == w2.getFuture().isWritten(); 485 } 486 return false; 487 } 488 489 public void appendTo(StringBuffer buffer) { 490 buffer.append("Expected a WriteRequest with the message '").append(expected.getMessage()).append("'"); 491 } 492 } 493 494 private static class DummyWriteFuture implements WriteFuture { 495 private boolean written; 496 497 /** 498 * Default constructor 499 */ 500 public DummyWriteFuture() { 501 super(); 502 } 503 504 public boolean isWritten() { 505 return written; 506 } 507 508 public void setWritten() { 509 this.written = true; 510 } 511 512 public IoSession getSession() { 513 return null; 514 } 515 516 public void join() { 517 // Do nothing 518 } 519 520 public boolean join(long timeoutInMillis) { 521 return true; 522 } 523 524 public boolean isDone() { 525 return true; 526 } 527 528 public WriteFuture addListener(IoFutureListener<?> listener) { 529 return this; 530 } 531 532 public WriteFuture removeListener(IoFutureListener<?> listener) { 533 return this; 534 } 535 536 public WriteFuture await() throws InterruptedException { 537 return this; 538 } 539 540 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 541 return true; 542 } 543 544 public boolean await(long timeoutMillis) throws InterruptedException { 545 return true; 546 } 547 548 public WriteFuture awaitUninterruptibly() { 549 return this; 550 } 551 552 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { 553 return true; 554 } 555 556 public boolean awaitUninterruptibly(long timeoutMillis) { 557 return true; 558 } 559 560 public Throwable getException() { 561 return null; 562 } 563 564 public void setException(Throwable cause) { 565 throw new IllegalStateException(); 566 } 567 } 568 569}