001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.mock; 018 019 import java.io.File; 020 import java.util.ArrayList; 021 import java.util.Arrays; 022 import java.util.Collection; 023 import java.util.Date; 024 import java.util.HashMap; 025 import java.util.List; 026 import java.util.Map; 027 import java.util.Set; 028 import java.util.concurrent.ConcurrentHashMap; 029 import java.util.concurrent.CopyOnWriteArrayList; 030 import java.util.concurrent.CopyOnWriteArraySet; 031 import java.util.concurrent.CountDownLatch; 032 import java.util.concurrent.TimeUnit; 033 034 import org.apache.camel.AsyncCallback; 035 import org.apache.camel.CamelContext; 036 import org.apache.camel.Component; 037 import org.apache.camel.Consumer; 038 import org.apache.camel.Endpoint; 039 import org.apache.camel.Exchange; 040 import org.apache.camel.ExchangePattern; 041 import org.apache.camel.Expression; 042 import org.apache.camel.Handler; 043 import org.apache.camel.Message; 044 import org.apache.camel.Predicate; 045 import org.apache.camel.Processor; 046 import org.apache.camel.Producer; 047 import org.apache.camel.builder.ProcessorBuilder; 048 import org.apache.camel.impl.DefaultAsyncProducer; 049 import org.apache.camel.impl.DefaultEndpoint; 050 import org.apache.camel.impl.InterceptSendToEndpoint; 051 import org.apache.camel.spi.BrowsableEndpoint; 052 import org.apache.camel.spi.UriEndpoint; 053 import org.apache.camel.spi.UriParam; 054 import org.apache.camel.util.CamelContextHelper; 055 import org.apache.camel.util.CaseInsensitiveMap; 056 import org.apache.camel.util.ExchangeHelper; 057 import org.apache.camel.util.ExpressionComparator; 058 import org.apache.camel.util.FileUtil; 059 import org.apache.camel.util.ObjectHelper; 060 import org.apache.camel.util.StopWatch; 061 import org.slf4j.Logger; 062 import org.slf4j.LoggerFactory; 063 064 /** 065 * A Mock endpoint which provides a literate, fluent API for testing routes 066 * using a <a href="http://jmock.org/">JMock style</a> API. 067 * <p/> 068 * The mock endpoint have two set of methods 069 * <ul> 070 * <li>expectedXXX or expectsXXX - To set pre conditions, before the test is executed</li> 071 * <li>assertXXX - To assert assertions, after the test has been executed</li> 072 * </ul> 073 * Its <b>important</b> to know the difference between the two set. The former is used to 074 * set expectations before the test is being started (eg before the mock receives messages). 075 * The latter is used after the test has been executed, to verify the expectations; or 076 * other assertions which you can perform after the test has been completed. 077 * 078 * @version 079 */ 080 @UriEndpoint(scheme = "mock") 081 public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint { 082 private static final Logger LOG = LoggerFactory.getLogger(MockEndpoint.class); 083 // must be volatile so changes is visible between the thread which performs the assertions 084 // and the threads which process the exchanges when routing messages in Camel 085 protected volatile Processor reporter; 086 protected boolean copyOnExchange = true; 087 @UriParam 088 private volatile int expectedCount; 089 private volatile int counter; 090 private volatile Processor defaultProcessor; 091 private volatile Map<Integer, Processor> processors; 092 private volatile List<Exchange> receivedExchanges; 093 private volatile List<Throwable> failures; 094 private volatile List<Runnable> tests; 095 private volatile CountDownLatch latch; 096 @UriParam 097 private volatile long sleepForEmptyTest; 098 @UriParam 099 private volatile long resultWaitTime; 100 @UriParam 101 private volatile long resultMinimumWaitTime; 102 @UriParam 103 private volatile long assertPeriod; 104 @UriParam 105 private volatile int expectedMinimumCount; 106 private volatile List<?> expectedBodyValues; 107 private volatile List<Object> actualBodyValues; 108 private volatile Map<String, Object> expectedHeaderValues; 109 private volatile Map<String, Object> actualHeaderValues; 110 private volatile Map<String, Object> expectedPropertyValues; 111 private volatile Map<String, Object> actualPropertyValues; 112 @UriParam 113 private volatile int retainFirst; 114 @UriParam 115 private volatile int retainLast; 116 117 public MockEndpoint(String endpointUri, Component component) { 118 super(endpointUri, component); 119 init(); 120 } 121 122 @Deprecated 123 public MockEndpoint(String endpointUri) { 124 super(endpointUri); 125 init(); 126 } 127 128 public MockEndpoint() { 129 this(null); 130 } 131 132 /** 133 * A helper method to resolve the mock endpoint of the given URI on the given context 134 * 135 * @param context the camel context to try resolve the mock endpoint from 136 * @param uri the uri of the endpoint to resolve 137 * @return the endpoint 138 */ 139 public static MockEndpoint resolve(CamelContext context, String uri) { 140 return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class); 141 } 142 143 public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 144 long start = System.currentTimeMillis(); 145 long left = unit.toMillis(timeout); 146 long end = start + left; 147 for (MockEndpoint endpoint : endpoints) { 148 if (!endpoint.await(left, TimeUnit.MILLISECONDS)) { 149 throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out."); 150 } 151 left = end - System.currentTimeMillis(); 152 if (left <= 0) { 153 left = 0; 154 } 155 } 156 } 157 158 public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 159 assertWait(timeout, unit, endpoints); 160 for (MockEndpoint endpoint : endpoints) { 161 endpoint.assertIsSatisfied(); 162 } 163 } 164 165 public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException { 166 for (MockEndpoint endpoint : endpoints) { 167 endpoint.assertIsSatisfied(); 168 } 169 } 170 171 172 /** 173 * Asserts that all the expectations on any {@link MockEndpoint} instances registered 174 * in the given context are valid 175 * 176 * @param context the camel context used to find all the available endpoints to be asserted 177 */ 178 public static void assertIsSatisfied(CamelContext context) throws InterruptedException { 179 ObjectHelper.notNull(context, "camelContext"); 180 Collection<Endpoint> endpoints = context.getEndpoints(); 181 for (Endpoint endpoint : endpoints) { 182 // if the endpoint was intercepted we should get the delegate 183 if (endpoint instanceof InterceptSendToEndpoint) { 184 endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate(); 185 } 186 if (endpoint instanceof MockEndpoint) { 187 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 188 mockEndpoint.assertIsSatisfied(); 189 } 190 } 191 } 192 193 /** 194 * Asserts that all the expectations on any {@link MockEndpoint} instances registered 195 * in the given context are valid 196 * 197 * @param context the camel context used to find all the available endpoints to be asserted 198 * @param timeout timeout 199 * @param unit time unit 200 */ 201 public static void assertIsSatisfied(CamelContext context, long timeout, TimeUnit unit) throws InterruptedException { 202 ObjectHelper.notNull(context, "camelContext"); 203 ObjectHelper.notNull(unit, "unit"); 204 Collection<Endpoint> endpoints = context.getEndpoints(); 205 long millis = unit.toMillis(timeout); 206 for (Endpoint endpoint : endpoints) { 207 // if the endpoint was intercepted we should get the delegate 208 if (endpoint instanceof InterceptSendToEndpoint) { 209 endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate(); 210 } 211 if (endpoint instanceof MockEndpoint) { 212 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 213 mockEndpoint.setResultWaitTime(millis); 214 mockEndpoint.assertIsSatisfied(); 215 } 216 } 217 } 218 219 /** 220 * Sets the assert period on all the expectations on any {@link MockEndpoint} instances registered 221 * in the given context. 222 * 223 * @param context the camel context used to find all the available endpoints 224 * @param period the period in millis 225 */ 226 public static void setAssertPeriod(CamelContext context, long period) { 227 ObjectHelper.notNull(context, "camelContext"); 228 Collection<Endpoint> endpoints = context.getEndpoints(); 229 for (Endpoint endpoint : endpoints) { 230 // if the endpoint was intercepted we should get the delegate 231 if (endpoint instanceof InterceptSendToEndpoint) { 232 endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate(); 233 } 234 if (endpoint instanceof MockEndpoint) { 235 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 236 mockEndpoint.setAssertPeriod(period); 237 } 238 } 239 } 240 241 /** 242 * Reset all mock endpoints 243 * 244 * @param context the camel context used to find all the available endpoints to reset 245 */ 246 public static void resetMocks(CamelContext context) { 247 ObjectHelper.notNull(context, "camelContext"); 248 Collection<Endpoint> endpoints = context.getEndpoints(); 249 for (Endpoint endpoint : endpoints) { 250 // if the endpoint was intercepted we should get the delegate 251 if (endpoint instanceof InterceptSendToEndpoint) { 252 endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate(); 253 } 254 if (endpoint instanceof MockEndpoint) { 255 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 256 mockEndpoint.reset(); 257 } 258 } 259 } 260 261 public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException { 262 for (MockEndpoint endpoint : endpoints) { 263 endpoint.setExpectedMessageCount(count); 264 } 265 } 266 267 public List<Exchange> getExchanges() { 268 return getReceivedExchanges(); 269 } 270 271 public Consumer createConsumer(Processor processor) throws Exception { 272 throw new UnsupportedOperationException("You cannot consume from this endpoint"); 273 } 274 275 public Producer createProducer() throws Exception { 276 return new DefaultAsyncProducer(this) { 277 public boolean process(Exchange exchange, AsyncCallback callback) { 278 onExchange(exchange); 279 callback.done(true); 280 return true; 281 } 282 }; 283 } 284 285 public void reset() { 286 init(); 287 } 288 289 290 // Testing API 291 // ------------------------------------------------------------------------- 292 293 /** 294 * Handles the incoming exchange. 295 * <p/> 296 * This method turns this mock endpoint into a bean which you can use 297 * in the Camel routes, which allows you to inject MockEndpoint as beans 298 * in your routes and use the features of the mock to control the bean. 299 * 300 * @param exchange the exchange 301 * @throws Exception can be thrown 302 */ 303 @Handler 304 public void handle(Exchange exchange) throws Exception { 305 onExchange(exchange); 306 } 307 308 /** 309 * Set the processor that will be invoked when the index 310 * message is received. 311 */ 312 public void whenExchangeReceived(int index, Processor processor) { 313 this.processors.put(index, processor); 314 } 315 316 /** 317 * Set the processor that will be invoked when the some message 318 * is received. 319 * 320 * This processor could be overwritten by 321 * {@link #whenExchangeReceived(int, Processor)} method. 322 */ 323 public void whenAnyExchangeReceived(Processor processor) { 324 this.defaultProcessor = processor; 325 } 326 327 /** 328 * Set the expression which value will be set to the message body 329 * @param expression which is use to set the message body 330 */ 331 public void returnReplyBody(Expression expression) { 332 this.defaultProcessor = ProcessorBuilder.setBody(expression); 333 } 334 335 /** 336 * Set the expression which value will be set to the message header 337 * @param headerName that will be set value 338 * @param expression which is use to set the message header 339 */ 340 public void returnReplyHeader(String headerName, Expression expression) { 341 this.defaultProcessor = ProcessorBuilder.setHeader(headerName, expression); 342 } 343 344 345 /** 346 * Validates that all the available expectations on this endpoint are 347 * satisfied; or throw an exception 348 */ 349 public void assertIsSatisfied() throws InterruptedException { 350 assertIsSatisfied(sleepForEmptyTest); 351 } 352 353 /** 354 * Validates that all the available expectations on this endpoint are 355 * satisfied; or throw an exception 356 * 357 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we 358 * should wait for the test to be true 359 */ 360 public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 361 LOG.info("Asserting: " + this + " is satisfied"); 362 doAssertIsSatisfied(timeoutForEmptyEndpoints); 363 if (assertPeriod > 0) { 364 // if an assert period was set then re-assert again to ensure the assertion is still valid 365 Thread.sleep(assertPeriod); 366 LOG.info("Re-asserting: " + this + " is satisfied after " + assertPeriod + " millis"); 367 // do not use timeout when we re-assert 368 doAssertIsSatisfied(0); 369 } 370 } 371 372 protected void doAssertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 373 if (expectedCount == 0) { 374 if (timeoutForEmptyEndpoints > 0) { 375 LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received"); 376 Thread.sleep(timeoutForEmptyEndpoints); 377 } 378 assertEquals("Received message count", expectedCount, getReceivedCounter()); 379 } else if (expectedCount > 0) { 380 if (expectedCount != getReceivedCounter()) { 381 waitForCompleteLatch(); 382 } 383 assertEquals("Received message count", expectedCount, getReceivedCounter()); 384 } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) { 385 waitForCompleteLatch(); 386 } 387 388 if (expectedMinimumCount >= 0) { 389 int receivedCounter = getReceivedCounter(); 390 assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedMinimumCount, expectedMinimumCount <= receivedCounter); 391 } 392 393 for (Runnable test : tests) { 394 test.run(); 395 } 396 397 for (Throwable failure : failures) { 398 if (failure != null) { 399 LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure); 400 fail("Failed due to caught exception: " + failure); 401 } 402 } 403 } 404 405 /** 406 * Validates that the assertions fail on this endpoint 407 */ 408 public void assertIsNotSatisfied() throws InterruptedException { 409 boolean failed = false; 410 try { 411 assertIsSatisfied(); 412 // did not throw expected error... fail! 413 failed = true; 414 } catch (AssertionError e) { 415 LOG.info("Caught expected failure: " + e); 416 } 417 if (failed) { 418 // fail() throws the AssertionError to indicate the test failed. 419 fail("Expected assertion failure but test succeeded!"); 420 } 421 } 422 423 /** 424 * Validates that the assertions fail on this endpoint 425 426 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we 427 * should wait for the test to be true 428 */ 429 public void assertIsNotSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 430 boolean failed = false; 431 try { 432 assertIsSatisfied(timeoutForEmptyEndpoints); 433 // did not throw expected error... fail! 434 failed = true; 435 } catch (AssertionError e) { 436 LOG.info("Caught expected failure: " + e); 437 } 438 if (failed) { 439 // fail() throws the AssertionError to indicate the test failed. 440 fail("Expected assertion failure but test succeeded!"); 441 } 442 } 443 444 /** 445 * Specifies the expected number of message exchanges that should be 446 * received by this endpoint 447 * 448 * If you want to assert that <b>exactly</b> n messages arrives to this mock 449 * endpoint, then see also the {@link #setAssertPeriod(long)} method for further details. 450 * 451 * @param expectedCount the number of message exchanges that should be 452 * expected by this endpoint 453 * @see #setAssertPeriod(long) 454 */ 455 public void expectedMessageCount(int expectedCount) { 456 setExpectedMessageCount(expectedCount); 457 } 458 459 /** 460 * Sets a grace period after which the mock endpoint will re-assert 461 * to ensure the preliminary assertion is still valid. 462 * <p/> 463 * This is used for example to assert that <b>exactly</b> a number of messages 464 * arrives. For example if {@link #expectedMessageCount(int)} was set to 5, then 465 * the assertion is satisfied when 5 or more message arrives. To ensure that 466 * exactly 5 messages arrives, then you would need to wait a little period 467 * to ensure no further message arrives. This is what you can use this 468 * {@link #setAssertPeriod(long)} method for. 469 * <p/> 470 * By default this period is disabled. 471 * 472 * @param period grace period in millis 473 */ 474 public void setAssertPeriod(long period) { 475 this.assertPeriod = period; 476 } 477 478 /** 479 * Specifies the minimum number of expected message exchanges that should be 480 * received by this endpoint 481 * 482 * @param expectedCount the number of message exchanges that should be 483 * expected by this endpoint 484 */ 485 public void expectedMinimumMessageCount(int expectedCount) { 486 setMinimumExpectedMessageCount(expectedCount); 487 } 488 489 /** 490 * Sets an expectation that the given header name & value are received by this endpoint 491 * <p/> 492 * You can set multiple expectations for different header names. 493 * If you set a value of <tt>null</tt> that means we accept either the header is absent, or its value is <tt>null</tt> 494 */ 495 public void expectedHeaderReceived(final String name, final Object value) { 496 if (expectedHeaderValues == null) { 497 expectedHeaderValues = new CaseInsensitiveMap(); 498 // we just wants to expects to be called once 499 expects(new Runnable() { 500 public void run() { 501 for (int i = 0; i < getReceivedExchanges().size(); i++) { 502 Exchange exchange = getReceivedExchange(i); 503 for (Map.Entry<String, Object> entry : expectedHeaderValues.entrySet()) { 504 String key = entry.getKey(); 505 Object expectedValue = entry.getValue(); 506 507 // we accept that an expectedValue of null also means that the header may be absent 508 if (expectedValue != null) { 509 assertTrue("Exchange " + i + " has no headers", exchange.getIn().hasHeaders()); 510 boolean hasKey = exchange.getIn().getHeaders().containsKey(key); 511 assertTrue("No header with name " + key + " found for message: " + i, hasKey); 512 } 513 514 Object actualValue = exchange.getIn().getHeader(key); 515 actualValue = extractActualValue(exchange, actualValue, expectedValue); 516 517 assertEquals("Header with name " + key + " for message: " + i, expectedValue, actualValue); 518 } 519 } 520 } 521 }); 522 } 523 expectedHeaderValues.put(name, value); 524 } 525 526 /** 527 * Adds an expectation that the given header values are received by this 528 * endpoint in any order 529 */ 530 public void expectedHeaderValuesReceivedInAnyOrder(final String name, final List<?> values) { 531 expectedMessageCount(values.size()); 532 533 expects(new Runnable() { 534 public void run() { 535 // these are the expected values to find 536 final Set<Object> actualHeaderValues = new CopyOnWriteArraySet<Object>(values); 537 538 for (int i = 0; i < getReceivedExchanges().size(); i++) { 539 Exchange exchange = getReceivedExchange(i); 540 541 Object actualValue = exchange.getIn().getHeader(name); 542 for (Object expectedValue : actualHeaderValues) { 543 actualValue = extractActualValue(exchange, actualValue, expectedValue); 544 // remove any found values 545 actualHeaderValues.remove(actualValue); 546 } 547 } 548 549 // should be empty, as we should find all the values 550 assertTrue("Expected " + values.size() + " headers with key[" + name + "], received " + (values.size() - actualHeaderValues.size()) 551 + " headers. Expected header values: " + actualHeaderValues, actualHeaderValues.isEmpty()); 552 } 553 }); 554 } 555 556 /** 557 * Adds an expectation that the given header values are received by this 558 * endpoint in any order 559 */ 560 public void expectedHeaderValuesReceivedInAnyOrder(String name, Object... values) { 561 List<Object> valueList = new ArrayList<Object>(); 562 valueList.addAll(Arrays.asList(values)); 563 expectedHeaderValuesReceivedInAnyOrder(name, valueList); 564 } 565 566 /** 567 * Sets an expectation that the given property name & value are received by this endpoint 568 * <p/> 569 * You can set multiple expectations for different property names. 570 * If you set a value of <tt>null</tt> that means we accept either the property is absent, or its value is <tt>null</tt> 571 */ 572 public void expectedPropertyReceived(final String name, final Object value) { 573 if (expectedPropertyValues == null) { 574 expectedPropertyValues = new ConcurrentHashMap<String, Object>(); 575 } 576 if (value != null) { 577 // ConcurrentHashMap cannot store null values 578 expectedPropertyValues.put(name, value); 579 } 580 581 expects(new Runnable() { 582 public void run() { 583 for (int i = 0; i < getReceivedExchanges().size(); i++) { 584 Exchange exchange = getReceivedExchange(i); 585 for (Map.Entry<String, Object> entry : expectedPropertyValues.entrySet()) { 586 String key = entry.getKey(); 587 Object expectedValue = entry.getValue(); 588 589 // we accept that an expectedValue of null also means that the header may be absent 590 if (expectedValue != null) { 591 assertTrue("Exchange " + i + " has no properties", !exchange.getProperties().isEmpty()); 592 boolean hasKey = exchange.getProperties().containsKey(key); 593 assertTrue("No property with name " + key + " found for message: " + i, hasKey); 594 } 595 596 Object actualValue = exchange.getProperty(key); 597 actualValue = extractActualValue(exchange, actualValue, expectedValue); 598 599 assertEquals("Property with name " + key + " for message: " + i, expectedValue, actualValue); 600 } 601 } 602 } 603 }); 604 } 605 606 /** 607 * Adds an expectation that the given body values are received by this 608 * endpoint in the specified order 609 */ 610 public void expectedBodiesReceived(final List<?> bodies) { 611 expectedMessageCount(bodies.size()); 612 this.expectedBodyValues = bodies; 613 this.actualBodyValues = new ArrayList<Object>(); 614 615 expects(new Runnable() { 616 public void run() { 617 for (int i = 0; i < expectedBodyValues.size(); i++) { 618 Exchange exchange = getReceivedExchange(i); 619 assertTrue("No exchange received for counter: " + i, exchange != null); 620 621 Object expectedBody = expectedBodyValues.get(i); 622 Object actualBody = null; 623 if (i < actualBodyValues.size()) { 624 actualBody = actualBodyValues.get(i); 625 } 626 actualBody = extractActualValue(exchange, actualBody, expectedBody); 627 628 assertEquals("Body of message: " + i, expectedBody, actualBody); 629 } 630 } 631 }); 632 } 633 634 private Object extractActualValue(Exchange exchange, Object actualValue, Object expectedValue) { 635 if (actualValue == null) { 636 return null; 637 } 638 639 if (actualValue instanceof Expression) { 640 actualValue = ((Expression)actualValue).evaluate(exchange, expectedValue != null ? expectedValue.getClass() : Object.class); 641 } else if (actualValue instanceof Predicate) { 642 actualValue = ((Predicate)actualValue).matches(exchange); 643 } else if (expectedValue != null) { 644 String from = actualValue.getClass().getName(); 645 String to = expectedValue.getClass().getName(); 646 actualValue = getCamelContext().getTypeConverter().convertTo(expectedValue.getClass(), exchange, actualValue); 647 assertTrue("There is no type conversion possible from " + from + " to " + to, actualValue != null); 648 } 649 return actualValue; 650 } 651 652 /** 653 * Sets an expectation that the given predicates matches the received messages by this endpoint 654 */ 655 public void expectedMessagesMatches(Predicate... predicates) { 656 for (int i = 0; i < predicates.length; i++) { 657 final int messageIndex = i; 658 final Predicate predicate = predicates[i]; 659 final AssertionClause clause = new AssertionClause(this) { 660 public void run() { 661 addPredicate(predicate); 662 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex)); 663 } 664 }; 665 expects(clause); 666 } 667 } 668 669 /** 670 * Sets an expectation that the given body values are received by this endpoint 671 */ 672 public void expectedBodiesReceived(Object... bodies) { 673 List<Object> bodyList = new ArrayList<Object>(); 674 bodyList.addAll(Arrays.asList(bodies)); 675 expectedBodiesReceived(bodyList); 676 } 677 678 /** 679 * Adds an expectation that the given body value are received by this endpoint 680 */ 681 public AssertionClause expectedBodyReceived() { 682 expectedMessageCount(1); 683 final AssertionClause clause = new AssertionClause(this) { 684 public void run() { 685 Exchange exchange = getReceivedExchange(0); 686 assertTrue("No exchange received for counter: " + 0, exchange != null); 687 688 Object actualBody = exchange.getIn().getBody(); 689 Expression exp = createExpression(getCamelContext()); 690 Object expectedBody = exp.evaluate(exchange, Object.class); 691 692 assertEquals("Body of message: " + 0, expectedBody, actualBody); 693 } 694 }; 695 expects(clause); 696 return clause; 697 } 698 699 /** 700 * Adds an expectation that the given body values are received by this 701 * endpoint in any order 702 */ 703 public void expectedBodiesReceivedInAnyOrder(final List<?> bodies) { 704 expectedMessageCount(bodies.size()); 705 this.expectedBodyValues = bodies; 706 this.actualBodyValues = new ArrayList<Object>(); 707 708 expects(new Runnable() { 709 public void run() { 710 List<Object> actualBodyValuesSet = new ArrayList<Object>(actualBodyValues); 711 for (int i = 0; i < expectedBodyValues.size(); i++) { 712 Exchange exchange = getReceivedExchange(i); 713 assertTrue("No exchange received for counter: " + i, exchange != null); 714 715 Object expectedBody = expectedBodyValues.get(i); 716 assertTrue("Message with body " + expectedBody + " was expected but not found in " + actualBodyValuesSet, actualBodyValuesSet.remove(expectedBody)); 717 } 718 } 719 }); 720 } 721 722 /** 723 * Adds an expectation that the given body values are received by this 724 * endpoint in any order 725 */ 726 public void expectedBodiesReceivedInAnyOrder(Object... bodies) { 727 List<Object> bodyList = new ArrayList<Object>(); 728 bodyList.addAll(Arrays.asList(bodies)); 729 expectedBodiesReceivedInAnyOrder(bodyList); 730 } 731 732 /** 733 * Adds an expectation that a file exists with the given name 734 * 735 * @param name name of file, will cater for / and \ on different OS platforms 736 */ 737 public void expectedFileExists(final String name) { 738 expectedFileExists(name, null); 739 } 740 741 /** 742 * Adds an expectation that a file exists with the given name 743 * <p/> 744 * Will wait at most 5 seconds while checking for the existence of the file. 745 * 746 * @param name name of file, will cater for / and \ on different OS platforms 747 * @param content content of file to compare, can be <tt>null</tt> to not compare content 748 */ 749 public void expectedFileExists(final String name, final String content) { 750 final File file = new File(FileUtil.normalizePath(name)); 751 752 expects(new Runnable() { 753 public void run() { 754 // wait at most 5 seconds for the file to exists 755 final long timeout = System.currentTimeMillis() + 5000; 756 757 boolean stop = false; 758 while (!stop && !file.exists()) { 759 try { 760 Thread.sleep(50); 761 } catch (InterruptedException e) { 762 // ignore 763 } 764 stop = System.currentTimeMillis() > timeout; 765 } 766 767 assertTrue("The file should exists: " + name, file.exists()); 768 769 if (content != null) { 770 String body = getCamelContext().getTypeConverter().convertTo(String.class, file); 771 assertEquals("Content of file: " + name, content, body); 772 } 773 } 774 }); 775 } 776 777 /** 778 * Adds an expectation that messages received should have the given exchange pattern 779 */ 780 public void expectedExchangePattern(final ExchangePattern exchangePattern) { 781 expectedMessagesMatches(new Predicate() { 782 public boolean matches(Exchange exchange) { 783 return exchange.getPattern().equals(exchangePattern); 784 } 785 }); 786 } 787 788 /** 789 * Adds an expectation that messages received should have ascending values 790 * of the given expression such as a user generated counter value 791 */ 792 public void expectsAscending(final Expression expression) { 793 expects(new Runnable() { 794 public void run() { 795 assertMessagesAscending(expression); 796 } 797 }); 798 } 799 800 /** 801 * Adds an expectation that messages received should have ascending values 802 * of the given expression such as a user generated counter value 803 */ 804 public AssertionClause expectsAscending() { 805 final AssertionClause clause = new AssertionClause(this) { 806 public void run() { 807 assertMessagesAscending(createExpression(getCamelContext())); 808 } 809 }; 810 expects(clause); 811 return clause; 812 } 813 814 /** 815 * Adds an expectation that messages received should have descending values 816 * of the given expression such as a user generated counter value 817 */ 818 public void expectsDescending(final Expression expression) { 819 expects(new Runnable() { 820 public void run() { 821 assertMessagesDescending(expression); 822 } 823 }); 824 } 825 826 /** 827 * Adds an expectation that messages received should have descending values 828 * of the given expression such as a user generated counter value 829 */ 830 public AssertionClause expectsDescending() { 831 final AssertionClause clause = new AssertionClause(this) { 832 public void run() { 833 assertMessagesDescending(createExpression(getCamelContext())); 834 } 835 }; 836 expects(clause); 837 return clause; 838 } 839 840 /** 841 * Adds an expectation that no duplicate messages should be received using 842 * the expression to determine the message ID 843 * 844 * @param expression the expression used to create a unique message ID for 845 * message comparison (which could just be the message 846 * payload if the payload can be tested for uniqueness using 847 * {@link Object#equals(Object)} and 848 * {@link Object#hashCode()} 849 */ 850 public void expectsNoDuplicates(final Expression expression) { 851 expects(new Runnable() { 852 public void run() { 853 assertNoDuplicates(expression); 854 } 855 }); 856 } 857 858 /** 859 * Adds an expectation that no duplicate messages should be received using 860 * the expression to determine the message ID 861 */ 862 public AssertionClause expectsNoDuplicates() { 863 final AssertionClause clause = new AssertionClause(this) { 864 public void run() { 865 assertNoDuplicates(createExpression(getCamelContext())); 866 } 867 }; 868 expects(clause); 869 return clause; 870 } 871 872 /** 873 * Asserts that the messages have ascending values of the given expression 874 */ 875 public void assertMessagesAscending(Expression expression) { 876 assertMessagesSorted(expression, true); 877 } 878 879 /** 880 * Asserts that the messages have descending values of the given expression 881 */ 882 public void assertMessagesDescending(Expression expression) { 883 assertMessagesSorted(expression, false); 884 } 885 886 protected void assertMessagesSorted(Expression expression, boolean ascending) { 887 String type = ascending ? "ascending" : "descending"; 888 ExpressionComparator comparator = new ExpressionComparator(expression); 889 List<Exchange> list = getReceivedExchanges(); 890 for (int i = 1; i < list.size(); i++) { 891 int j = i - 1; 892 Exchange e1 = list.get(j); 893 Exchange e2 = list.get(i); 894 int result = comparator.compare(e1, e2); 895 if (result == 0) { 896 fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " 897 + expression.evaluate(e1, Object.class) + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2); 898 } else { 899 if (!ascending) { 900 result = result * -1; 901 } 902 if (result > 0) { 903 fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1, Object.class) 904 + " and message " + i + " has value: " + expression.evaluate(e2, Object.class) + " for expression: " 905 + expression + ". Exchanges: " + e1 + " and " + e2); 906 } 907 } 908 } 909 } 910 911 public void assertNoDuplicates(Expression expression) { 912 Map<Object, Exchange> map = new HashMap<Object, Exchange>(); 913 List<Exchange> list = getReceivedExchanges(); 914 for (int i = 0; i < list.size(); i++) { 915 Exchange e2 = list.get(i); 916 Object key = expression.evaluate(e2, Object.class); 917 Exchange e1 = map.get(key); 918 if (e1 != null) { 919 fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2); 920 } else { 921 map.put(key, e2); 922 } 923 } 924 } 925 926 /** 927 * Adds the expectation which will be invoked when enough messages are received 928 */ 929 public void expects(Runnable runnable) { 930 tests.add(runnable); 931 } 932 933 /** 934 * Adds an assertion to the given message index 935 * 936 * @param messageIndex the number of the message 937 * @return the assertion clause 938 */ 939 public AssertionClause message(final int messageIndex) { 940 final AssertionClause clause = new AssertionClause(this) { 941 public void run() { 942 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex)); 943 } 944 }; 945 expects(clause); 946 return clause; 947 } 948 949 /** 950 * Adds an assertion to all the received messages 951 * 952 * @return the assertion clause 953 */ 954 public AssertionClause allMessages() { 955 final AssertionClause clause = new AssertionClause(this) { 956 public void run() { 957 List<Exchange> list = getReceivedExchanges(); 958 int index = 0; 959 for (Exchange exchange : list) { 960 applyAssertionOn(MockEndpoint.this, index++, exchange); 961 } 962 } 963 }; 964 expects(clause); 965 return clause; 966 } 967 968 /** 969 * Asserts that the given index of message is received (starting at zero) 970 */ 971 public Exchange assertExchangeReceived(int index) { 972 int count = getReceivedCounter(); 973 assertTrue("Not enough messages received. Was: " + count, count > index); 974 return getReceivedExchange(index); 975 } 976 977 // Properties 978 // ------------------------------------------------------------------------- 979 public List<Throwable> getFailures() { 980 return failures; 981 } 982 983 public int getReceivedCounter() { 984 return counter; 985 } 986 987 public List<Exchange> getReceivedExchanges() { 988 return receivedExchanges; 989 } 990 991 public int getExpectedCount() { 992 return expectedCount; 993 } 994 995 public long getSleepForEmptyTest() { 996 return sleepForEmptyTest; 997 } 998 999 /** 1000 * Allows a sleep to be specified to wait to check that this endpoint really 1001 * is empty when {@link #expectedMessageCount(int)} is called with zero 1002 * 1003 * @param sleepForEmptyTest the milliseconds to sleep for to determine that 1004 * this endpoint really is empty 1005 */ 1006 public void setSleepForEmptyTest(long sleepForEmptyTest) { 1007 this.sleepForEmptyTest = sleepForEmptyTest; 1008 } 1009 1010 public long getResultWaitTime() { 1011 return resultWaitTime; 1012 } 1013 1014 /** 1015 * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will 1016 * wait on a latch until it is satisfied 1017 */ 1018 public void setResultWaitTime(long resultWaitTime) { 1019 this.resultWaitTime = resultWaitTime; 1020 } 1021 1022 /** 1023 * Sets the minimum expected amount of time (in millis) the {@link #assertIsSatisfied()} will 1024 * wait on a latch until it is satisfied 1025 */ 1026 public void setMinimumResultWaitTime(long resultMinimumWaitTime) { 1027 this.resultMinimumWaitTime = resultMinimumWaitTime; 1028 } 1029 1030 /** 1031 * Specifies the expected number of message exchanges that should be 1032 * received by this endpoint. 1033 * <p/> 1034 * If you want to assert that <b>exactly</b> n'th message arrives to this mock 1035 * endpoint, then see also the {@link #setAssertPeriod(long)} method for further details. 1036 * 1037 * @param expectedCount the number of message exchanges that should be 1038 * expected by this endpoint 1039 * @see #setAssertPeriod(long) 1040 */ 1041 public void setExpectedMessageCount(int expectedCount) { 1042 this.expectedCount = expectedCount; 1043 if (expectedCount <= 0) { 1044 latch = null; 1045 } else { 1046 latch = new CountDownLatch(expectedCount); 1047 } 1048 } 1049 1050 /** 1051 * Specifies the minimum number of expected message exchanges that should be 1052 * received by this endpoint 1053 * 1054 * @param expectedCount the number of message exchanges that should be 1055 * expected by this endpoint 1056 */ 1057 public void setMinimumExpectedMessageCount(int expectedCount) { 1058 this.expectedMinimumCount = expectedCount; 1059 if (expectedCount <= 0) { 1060 latch = null; 1061 } else { 1062 latch = new CountDownLatch(expectedMinimumCount); 1063 } 1064 } 1065 1066 public Processor getReporter() { 1067 return reporter; 1068 } 1069 1070 /** 1071 * Allows a processor to added to the endpoint to report on progress of the test 1072 */ 1073 public void setReporter(Processor reporter) { 1074 this.reporter = reporter; 1075 } 1076 1077 /** 1078 * Specifies to only retain the first n'th number of received {@link Exchange}s. 1079 * <p/> 1080 * This is used when testing with big data, to reduce memory consumption by not storing 1081 * copies of every {@link Exchange} this mock endpoint receives. 1082 * <p/> 1083 * <b>Important:</b> When using this limitation, then the {@link #getReceivedCounter()} 1084 * will still return the actual number of received {@link Exchange}s. For example 1085 * if we have received 5000 {@link Exchange}s, and have configured to only retain the first 1086 * 10 {@link Exchange}s, then the {@link #getReceivedCounter()} will still return <tt>5000</tt> 1087 * but there is only the first 10 {@link Exchange}s in the {@link #getExchanges()} and 1088 * {@link #getReceivedExchanges()} methods. 1089 * <p/> 1090 * When using this method, then some of the other expectation methods is not supported, 1091 * for example the {@link #expectedBodiesReceived(Object...)} sets a expectation on the first 1092 * number of bodies received. 1093 * <p/> 1094 * You can configure both {@link #setRetainFirst(int)} and {@link #setRetainLast(int)} methods, 1095 * to limit both the first and last received. 1096 * 1097 * @param retainFirst to limit and only keep the first n'th received {@link Exchange}s, use 1098 * <tt>0</tt> to not retain any messages, or <tt>-1</tt> to retain all. 1099 * @see #setRetainLast(int) 1100 */ 1101 public void setRetainFirst(int retainFirst) { 1102 this.retainFirst = retainFirst; 1103 } 1104 1105 /** 1106 * Specifies to only retain the last n'th number of received {@link Exchange}s. 1107 * <p/> 1108 * This is used when testing with big data, to reduce memory consumption by not storing 1109 * copies of every {@link Exchange} this mock endpoint receives. 1110 * <p/> 1111 * <b>Important:</b> When using this limitation, then the {@link #getReceivedCounter()} 1112 * will still return the actual number of received {@link Exchange}s. For example 1113 * if we have received 5000 {@link Exchange}s, and have configured to only retain the last 1114 * 20 {@link Exchange}s, then the {@link #getReceivedCounter()} will still return <tt>5000</tt> 1115 * but there is only the last 20 {@link Exchange}s in the {@link #getExchanges()} and 1116 * {@link #getReceivedExchanges()} methods. 1117 * <p/> 1118 * When using this method, then some of the other expectation methods is not supported, 1119 * for example the {@link #expectedBodiesReceived(Object...)} sets a expectation on the first 1120 * number of bodies received. 1121 * <p/> 1122 * You can configure both {@link #setRetainFirst(int)} and {@link #setRetainLast(int)} methods, 1123 * to limit both the first and last received. 1124 * 1125 * @param retainLast to limit and only keep the last n'th received {@link Exchange}s, use 1126 * <tt>0</tt> to not retain any messages, or <tt>-1</tt> to retain all. 1127 * @see #setRetainFirst(int) 1128 */ 1129 public void setRetainLast(int retainLast) { 1130 this.retainLast = retainLast; 1131 } 1132 1133 // Implementation methods 1134 // ------------------------------------------------------------------------- 1135 private void init() { 1136 expectedCount = -1; 1137 counter = 0; 1138 defaultProcessor = null; 1139 processors = new HashMap<Integer, Processor>(); 1140 receivedExchanges = new CopyOnWriteArrayList<Exchange>(); 1141 failures = new CopyOnWriteArrayList<Throwable>(); 1142 tests = new CopyOnWriteArrayList<Runnable>(); 1143 latch = null; 1144 sleepForEmptyTest = 0; 1145 resultWaitTime = 0; 1146 resultMinimumWaitTime = 0L; 1147 assertPeriod = 0L; 1148 expectedMinimumCount = -1; 1149 expectedBodyValues = null; 1150 actualBodyValues = new ArrayList<Object>(); 1151 expectedHeaderValues = null; 1152 actualHeaderValues = null; 1153 expectedPropertyValues = null; 1154 actualPropertyValues = null; 1155 retainFirst = -1; 1156 retainLast = -1; 1157 } 1158 1159 protected synchronized void onExchange(Exchange exchange) { 1160 try { 1161 if (reporter != null) { 1162 reporter.process(exchange); 1163 } 1164 Exchange copy = exchange; 1165 if (copyOnExchange) { 1166 // copy the exchange so the mock stores the copy and not the actual exchange 1167 copy = ExchangeHelper.createCopy(exchange, true); 1168 } 1169 performAssertions(exchange, copy); 1170 } catch (Throwable e) { 1171 // must catch java.lang.Throwable as AssertionError extends java.lang.Error 1172 failures.add(e); 1173 } finally { 1174 // make sure latch is counted down to avoid test hanging forever 1175 if (latch != null) { 1176 latch.countDown(); 1177 } 1178 } 1179 } 1180 1181 /** 1182 * Performs the assertions on the incoming exchange. 1183 * 1184 * @param exchange the actual exchange 1185 * @param copy a copy of the exchange (only store this) 1186 * @throws Exception can be thrown if something went wrong 1187 */ 1188 protected void performAssertions(Exchange exchange, Exchange copy) throws Exception { 1189 Message in = copy.getIn(); 1190 Object actualBody = in.getBody(); 1191 1192 if (expectedHeaderValues != null) { 1193 if (actualHeaderValues == null) { 1194 actualHeaderValues = new CaseInsensitiveMap(); 1195 } 1196 if (in.hasHeaders()) { 1197 actualHeaderValues.putAll(in.getHeaders()); 1198 } 1199 } 1200 1201 if (expectedPropertyValues != null) { 1202 if (actualPropertyValues == null) { 1203 actualPropertyValues = new ConcurrentHashMap<String, Object>(); 1204 } 1205 actualPropertyValues.putAll(copy.getProperties()); 1206 } 1207 1208 if (expectedBodyValues != null) { 1209 int index = actualBodyValues.size(); 1210 if (expectedBodyValues.size() > index) { 1211 Object expectedBody = expectedBodyValues.get(index); 1212 if (expectedBody != null) { 1213 // prefer to convert body early, for example when using files 1214 // we need to read the content at this time 1215 Object body = in.getBody(expectedBody.getClass()); 1216 if (body != null) { 1217 actualBody = body; 1218 } 1219 } 1220 actualBodyValues.add(actualBody); 1221 } 1222 } 1223 1224 // let counter be 0 index-based in the logs 1225 if (LOG.isDebugEnabled()) { 1226 String msg = getEndpointUri() + " >>>> " + counter + " : " + copy + " with body: " + actualBody; 1227 if (copy.getIn().hasHeaders()) { 1228 msg += " and headers:" + copy.getIn().getHeaders(); 1229 } 1230 LOG.debug(msg); 1231 } 1232 1233 // record timestamp when exchange was received 1234 copy.setProperty(Exchange.RECEIVED_TIMESTAMP, new Date()); 1235 1236 // add a copy of the received exchange 1237 addReceivedExchange(copy); 1238 // and then increment counter after adding received exchange 1239 ++counter; 1240 1241 Processor processor = processors.get(getReceivedCounter()) != null 1242 ? processors.get(getReceivedCounter()) : defaultProcessor; 1243 1244 if (processor != null) { 1245 try { 1246 // must process the incoming exchange and NOT the copy as the idea 1247 // is the end user can manipulate the exchange 1248 processor.process(exchange); 1249 } catch (Exception e) { 1250 // set exceptions on exchange so we can throw exceptions to simulate errors 1251 exchange.setException(e); 1252 } 1253 } 1254 } 1255 1256 /** 1257 * Adds the received exchange. 1258 * 1259 * @param copy a copy of the received exchange 1260 */ 1261 protected void addReceivedExchange(Exchange copy) { 1262 if (retainFirst == 0 && retainLast == 0) { 1263 // do not retain any messages at all 1264 } else if (retainFirst < 0 && retainLast < 0) { 1265 // no limitation so keep them all 1266 receivedExchanges.add(copy); 1267 } else { 1268 // okay there is some sort of limitations, so figure out what to retain 1269 if (retainFirst > 0 && counter < retainFirst) { 1270 // store a copy as its within the retain first limitation 1271 receivedExchanges.add(copy); 1272 } else if (retainLast > 0) { 1273 // remove the oldest from the last retained boundary, 1274 int index = receivedExchanges.size() - retainLast; 1275 if (index >= 0) { 1276 // but must be outside the first range as well 1277 // otherwise we should not remove the oldest 1278 if (retainFirst <= 0 || retainFirst <= index) { 1279 receivedExchanges.remove(index); 1280 } 1281 } 1282 // store a copy of the last n'th received 1283 receivedExchanges.add(copy); 1284 } 1285 } 1286 } 1287 1288 protected void waitForCompleteLatch() throws InterruptedException { 1289 if (latch == null) { 1290 fail("Should have a latch!"); 1291 } 1292 1293 StopWatch watch = new StopWatch(); 1294 waitForCompleteLatch(resultWaitTime); 1295 long delta = watch.stop(); 1296 LOG.debug("Took {} millis to complete latch", delta); 1297 1298 if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) { 1299 fail("Expected minimum " + resultMinimumWaitTime 1300 + " millis waiting on the result, but was faster with " + delta + " millis."); 1301 } 1302 } 1303 1304 protected void waitForCompleteLatch(long timeout) throws InterruptedException { 1305 // Wait for a default 10 seconds if resultWaitTime is not set 1306 long waitTime = timeout == 0 ? 10000L : timeout; 1307 1308 // now let's wait for the results 1309 LOG.debug("Waiting on the latch for: " + timeout + " millis"); 1310 latch.await(waitTime, TimeUnit.MILLISECONDS); 1311 } 1312 1313 protected void assertEquals(String message, Object expectedValue, Object actualValue) { 1314 if (!ObjectHelper.equal(expectedValue, actualValue)) { 1315 fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">"); 1316 } 1317 } 1318 1319 protected void assertTrue(String message, boolean predicate) { 1320 if (!predicate) { 1321 fail(message); 1322 } 1323 } 1324 1325 protected void fail(Object message) { 1326 if (LOG.isDebugEnabled()) { 1327 List<Exchange> list = getReceivedExchanges(); 1328 int index = 0; 1329 for (Exchange exchange : list) { 1330 LOG.debug("{} failed and received[{}]: {}", new Object[]{getEndpointUri(), ++index, exchange}); 1331 } 1332 } 1333 throw new AssertionError(getEndpointUri() + " " + message); 1334 } 1335 1336 public int getExpectedMinimumCount() { 1337 return expectedMinimumCount; 1338 } 1339 1340 public void await() throws InterruptedException { 1341 if (latch != null) { 1342 latch.await(); 1343 } 1344 } 1345 1346 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 1347 if (latch != null) { 1348 return latch.await(timeout, unit); 1349 } 1350 return true; 1351 } 1352 1353 public boolean isSingleton() { 1354 return true; 1355 } 1356 1357 public boolean isLenientProperties() { 1358 return true; 1359 } 1360 1361 private Exchange getReceivedExchange(int index) { 1362 if (index <= receivedExchanges.size() - 1) { 1363 return receivedExchanges.get(index); 1364 } else { 1365 return null; 1366 } 1367 } 1368 1369 }