001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.mock;
018    
019    import java.io.File;
020    import java.util.ArrayList;
021    import java.util.Arrays;
022    import java.util.Collection;
023    import java.util.Date;
024    import java.util.HashMap;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Set;
028    import java.util.concurrent.ConcurrentHashMap;
029    import java.util.concurrent.CopyOnWriteArrayList;
030    import java.util.concurrent.CopyOnWriteArraySet;
031    import java.util.concurrent.CountDownLatch;
032    import java.util.concurrent.TimeUnit;
033    
034    import org.apache.camel.AsyncCallback;
035    import org.apache.camel.CamelContext;
036    import org.apache.camel.Component;
037    import org.apache.camel.Consumer;
038    import org.apache.camel.Endpoint;
039    import org.apache.camel.Exchange;
040    import org.apache.camel.ExchangePattern;
041    import org.apache.camel.Expression;
042    import org.apache.camel.Handler;
043    import org.apache.camel.Message;
044    import org.apache.camel.Predicate;
045    import org.apache.camel.Processor;
046    import org.apache.camel.Producer;
047    import org.apache.camel.builder.ProcessorBuilder;
048    import org.apache.camel.impl.DefaultAsyncProducer;
049    import org.apache.camel.impl.DefaultEndpoint;
050    import org.apache.camel.impl.InterceptSendToEndpoint;
051    import org.apache.camel.spi.BrowsableEndpoint;
052    import org.apache.camel.spi.UriEndpoint;
053    import org.apache.camel.spi.UriParam;
054    import org.apache.camel.util.CamelContextHelper;
055    import org.apache.camel.util.CaseInsensitiveMap;
056    import org.apache.camel.util.ExchangeHelper;
057    import org.apache.camel.util.ExpressionComparator;
058    import org.apache.camel.util.FileUtil;
059    import org.apache.camel.util.ObjectHelper;
060    import org.apache.camel.util.StopWatch;
061    import org.slf4j.Logger;
062    import org.slf4j.LoggerFactory;
063    
064    /**
065     * A Mock endpoint which provides a literate, fluent API for testing routes
066     * using a <a href="http://jmock.org/">JMock style</a> API.
067     * <p/>
068     * The mock endpoint have two set of methods
069     * <ul>
070     *   <li>expectedXXX or expectsXXX - To set pre conditions, before the test is executed</li>
071     *   <li>assertXXX - To assert assertions, after the test has been executed</li>
072     * </ul>
073     * Its <b>important</b> to know the difference between the two set. The former is used to
074     * set expectations before the test is being started (eg before the mock receives messages).
075     * The latter is used after the test has been executed, to verify the expectations; or
076     * other assertions which you can perform after the test has been completed.
077     *
078     * @version 
079     */
080    @UriEndpoint(scheme = "mock")
081    public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint {
082        private static final Logger LOG = LoggerFactory.getLogger(MockEndpoint.class);
083        // must be volatile so changes is visible between the thread which performs the assertions
084        // and the threads which process the exchanges when routing messages in Camel
085        protected volatile Processor reporter;
086        protected boolean copyOnExchange = true;
087        @UriParam
088        private volatile int expectedCount;
089        private volatile int counter;
090        private volatile Processor defaultProcessor;
091        private volatile Map<Integer, Processor> processors;
092        private volatile List<Exchange> receivedExchanges;
093        private volatile List<Throwable> failures;
094        private volatile List<Runnable> tests;
095        private volatile CountDownLatch latch;
096        @UriParam
097        private volatile long sleepForEmptyTest;
098        @UriParam
099        private volatile long resultWaitTime;
100        @UriParam
101        private volatile long resultMinimumWaitTime;
102        @UriParam
103        private volatile long assertPeriod;
104        @UriParam
105        private volatile int expectedMinimumCount;
106        private volatile List<?> expectedBodyValues;
107        private volatile List<Object> actualBodyValues;
108        private volatile Map<String, Object> expectedHeaderValues;
109        private volatile Map<String, Object> actualHeaderValues;
110        private volatile Map<String, Object> expectedPropertyValues;
111        private volatile Map<String, Object> actualPropertyValues;
112        @UriParam
113        private volatile int retainFirst;
114        @UriParam
115        private volatile int retainLast;
116    
117        public MockEndpoint(String endpointUri, Component component) {
118            super(endpointUri, component);
119            init();
120        }
121    
122        @Deprecated
123        public MockEndpoint(String endpointUri) {
124            super(endpointUri);
125            init();
126        }
127    
128        public MockEndpoint() {
129            this(null);
130        }
131    
132        /**
133         * A helper method to resolve the mock endpoint of the given URI on the given context
134         *
135         * @param context the camel context to try resolve the mock endpoint from
136         * @param uri the uri of the endpoint to resolve
137         * @return the endpoint
138         */
139        public static MockEndpoint resolve(CamelContext context, String uri) {
140            return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class);
141        }
142    
143        public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
144            long start = System.currentTimeMillis();
145            long left = unit.toMillis(timeout);
146            long end = start + left;
147            for (MockEndpoint endpoint : endpoints) {
148                if (!endpoint.await(left, TimeUnit.MILLISECONDS)) {
149                    throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out.");
150                }
151                left = end - System.currentTimeMillis();
152                if (left <= 0) {
153                    left = 0;
154                }
155            }
156        }
157    
158        public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
159            assertWait(timeout, unit, endpoints);
160            for (MockEndpoint endpoint : endpoints) {
161                endpoint.assertIsSatisfied();
162            }
163        }
164    
165        public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
166            for (MockEndpoint endpoint : endpoints) {
167                endpoint.assertIsSatisfied();
168            }
169        }
170    
171    
172        /**
173         * Asserts that all the expectations on any {@link MockEndpoint} instances registered
174         * in the given context are valid
175         *
176         * @param context the camel context used to find all the available endpoints to be asserted
177         */
178        public static void assertIsSatisfied(CamelContext context) throws InterruptedException {
179            ObjectHelper.notNull(context, "camelContext");
180            Collection<Endpoint> endpoints = context.getEndpoints();
181            for (Endpoint endpoint : endpoints) {
182                // if the endpoint was intercepted we should get the delegate
183                if (endpoint instanceof InterceptSendToEndpoint) {
184                    endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
185                }
186                if (endpoint instanceof MockEndpoint) {
187                    MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
188                    mockEndpoint.assertIsSatisfied();
189                }
190            }
191        }
192    
193        /**
194         * Asserts that all the expectations on any {@link MockEndpoint} instances registered
195         * in the given context are valid
196         *
197         * @param context the camel context used to find all the available endpoints to be asserted
198         * @param timeout timeout
199         * @param unit    time unit
200         */
201        public static void assertIsSatisfied(CamelContext context, long timeout, TimeUnit unit) throws InterruptedException {
202            ObjectHelper.notNull(context, "camelContext");
203            ObjectHelper.notNull(unit, "unit");
204            Collection<Endpoint> endpoints = context.getEndpoints();
205            long millis = unit.toMillis(timeout);
206            for (Endpoint endpoint : endpoints) {
207                // if the endpoint was intercepted we should get the delegate
208                if (endpoint instanceof InterceptSendToEndpoint) {
209                    endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
210                }
211                if (endpoint instanceof MockEndpoint) {
212                    MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
213                    mockEndpoint.setResultWaitTime(millis);
214                    mockEndpoint.assertIsSatisfied();
215                }
216            }
217        }
218    
219        /**
220         * Sets the assert period on all the expectations on any {@link MockEndpoint} instances registered
221         * in the given context.
222         *
223         * @param context the camel context used to find all the available endpoints
224         * @param period the period in millis
225         */
226        public static void setAssertPeriod(CamelContext context, long period) {
227            ObjectHelper.notNull(context, "camelContext");
228            Collection<Endpoint> endpoints = context.getEndpoints();
229            for (Endpoint endpoint : endpoints) {
230                // if the endpoint was intercepted we should get the delegate
231                if (endpoint instanceof InterceptSendToEndpoint) {
232                    endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
233                }
234                if (endpoint instanceof MockEndpoint) {
235                    MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
236                    mockEndpoint.setAssertPeriod(period);
237                }
238            }
239        }
240    
241        /**
242         * Reset all mock endpoints
243         *
244         * @param context the camel context used to find all the available endpoints to reset
245         */
246        public static void resetMocks(CamelContext context) {
247            ObjectHelper.notNull(context, "camelContext");
248            Collection<Endpoint> endpoints = context.getEndpoints();
249            for (Endpoint endpoint : endpoints) {
250                // if the endpoint was intercepted we should get the delegate
251                if (endpoint instanceof InterceptSendToEndpoint) {
252                    endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
253                }
254                if (endpoint instanceof MockEndpoint) {
255                    MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
256                    mockEndpoint.reset();
257                }
258            }
259        }
260    
261        public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
262            for (MockEndpoint endpoint : endpoints) {
263                endpoint.setExpectedMessageCount(count);
264            }
265        }
266    
267        public List<Exchange> getExchanges() {
268            return getReceivedExchanges();
269        }
270    
271        public Consumer createConsumer(Processor processor) throws Exception {
272            throw new UnsupportedOperationException("You cannot consume from this endpoint");
273        }
274    
275        public Producer createProducer() throws Exception {
276            return new DefaultAsyncProducer(this) {
277                public boolean process(Exchange exchange, AsyncCallback callback) {
278                    onExchange(exchange);
279                    callback.done(true);
280                    return true;
281                }
282            };
283        }
284    
285        public void reset() {
286            init();
287        }
288    
289    
290        // Testing API
291        // -------------------------------------------------------------------------
292    
293        /**
294         * Handles the incoming exchange.
295         * <p/>
296         * This method turns this mock endpoint into a bean which you can use
297         * in the Camel routes, which allows you to inject MockEndpoint as beans
298         * in your routes and use the features of the mock to control the bean.
299         *
300         * @param exchange  the exchange
301         * @throws Exception can be thrown
302         */
303        @Handler
304        public void handle(Exchange exchange) throws Exception {
305            onExchange(exchange);
306        }
307    
308        /**
309         * Set the processor that will be invoked when the index
310         * message is received.
311         */
312        public void whenExchangeReceived(int index, Processor processor) {
313            this.processors.put(index, processor);
314        }
315    
316        /**
317         * Set the processor that will be invoked when the some message
318         * is received.
319         *
320         * This processor could be overwritten by
321         * {@link #whenExchangeReceived(int, Processor)} method.
322         */
323        public void whenAnyExchangeReceived(Processor processor) {
324            this.defaultProcessor = processor;
325        }
326        
327        /**
328         * Set the expression which value will be set to the message body
329         * @param expression which is use to set the message body 
330         */
331        public void returnReplyBody(Expression expression) {
332            this.defaultProcessor = ProcessorBuilder.setBody(expression);
333        }
334        
335        /**
336         * Set the expression which value will be set to the message header
337         * @param headerName that will be set value
338         * @param expression which is use to set the message header 
339         */
340        public void returnReplyHeader(String headerName, Expression expression) {
341            this.defaultProcessor = ProcessorBuilder.setHeader(headerName, expression);
342        }
343        
344    
345        /**
346         * Validates that all the available expectations on this endpoint are
347         * satisfied; or throw an exception
348         */
349        public void assertIsSatisfied() throws InterruptedException {
350            assertIsSatisfied(sleepForEmptyTest);
351        }
352    
353        /**
354         * Validates that all the available expectations on this endpoint are
355         * satisfied; or throw an exception
356         *
357         * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
358         *                should wait for the test to be true
359         */
360        public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
361            LOG.info("Asserting: " + this + " is satisfied");
362            doAssertIsSatisfied(timeoutForEmptyEndpoints);
363            if (assertPeriod > 0) {
364                // if an assert period was set then re-assert again to ensure the assertion is still valid
365                Thread.sleep(assertPeriod);
366                LOG.info("Re-asserting: " + this + " is satisfied after " + assertPeriod + " millis");
367                // do not use timeout when we re-assert
368                doAssertIsSatisfied(0);
369            }
370        }
371    
372        protected void doAssertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
373            if (expectedCount == 0) {
374                if (timeoutForEmptyEndpoints > 0) {
375                    LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received");
376                    Thread.sleep(timeoutForEmptyEndpoints);
377                }
378                assertEquals("Received message count", expectedCount, getReceivedCounter());
379            } else if (expectedCount > 0) {
380                if (expectedCount != getReceivedCounter()) {
381                    waitForCompleteLatch();
382                }
383                assertEquals("Received message count", expectedCount, getReceivedCounter());
384            } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
385                waitForCompleteLatch();
386            }
387    
388            if (expectedMinimumCount >= 0) {
389                int receivedCounter = getReceivedCounter();
390                assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedMinimumCount, expectedMinimumCount <= receivedCounter);
391            }
392    
393            for (Runnable test : tests) {
394                test.run();
395            }
396    
397            for (Throwable failure : failures) {
398                if (failure != null) {
399                    LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
400                    fail("Failed due to caught exception: " + failure);
401                }
402            }
403        }
404    
405        /**
406         * Validates that the assertions fail on this endpoint
407         */
408        public void assertIsNotSatisfied() throws InterruptedException {
409            boolean failed = false;
410            try {
411                assertIsSatisfied();
412                // did not throw expected error... fail!
413                failed = true;
414            } catch (AssertionError e) {
415                LOG.info("Caught expected failure: " + e);
416            }
417            if (failed) {
418                // fail() throws the AssertionError to indicate the test failed. 
419                fail("Expected assertion failure but test succeeded!");
420            }
421        }
422    
423        /**
424         * Validates that the assertions fail on this endpoint
425    
426         * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
427         *        should wait for the test to be true
428         */
429        public void assertIsNotSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
430            boolean failed = false;
431            try {
432                assertIsSatisfied(timeoutForEmptyEndpoints);
433                // did not throw expected error... fail!
434                failed = true;
435            } catch (AssertionError e) {
436                LOG.info("Caught expected failure: " + e);
437            }
438            if (failed) { 
439                // fail() throws the AssertionError to indicate the test failed. 
440                fail("Expected assertion failure but test succeeded!");
441            }
442        }    
443        
444        /**
445         * Specifies the expected number of message exchanges that should be
446         * received by this endpoint
447         *
448         * If you want to assert that <b>exactly</b> n messages arrives to this mock
449         * endpoint, then see also the {@link #setAssertPeriod(long)} method for further details.
450         *
451         * @param expectedCount the number of message exchanges that should be
452         *                expected by this endpoint
453         * @see #setAssertPeriod(long)
454         */
455        public void expectedMessageCount(int expectedCount) {
456            setExpectedMessageCount(expectedCount);
457        }
458    
459        /**
460         * Sets a grace period after which the mock endpoint will re-assert
461         * to ensure the preliminary assertion is still valid.
462         * <p/>
463         * This is used for example to assert that <b>exactly</b> a number of messages 
464         * arrives. For example if {@link #expectedMessageCount(int)} was set to 5, then
465         * the assertion is satisfied when 5 or more message arrives. To ensure that
466         * exactly 5 messages arrives, then you would need to wait a little period
467         * to ensure no further message arrives. This is what you can use this
468         * {@link #setAssertPeriod(long)} method for.
469         * <p/>
470         * By default this period is disabled.
471         *
472         * @param period grace period in millis
473         */
474        public void setAssertPeriod(long period) {
475            this.assertPeriod = period;
476        }
477    
478        /**
479         * Specifies the minimum number of expected message exchanges that should be
480         * received by this endpoint
481         *
482         * @param expectedCount the number of message exchanges that should be
483         *                expected by this endpoint
484         */
485        public void expectedMinimumMessageCount(int expectedCount) {
486            setMinimumExpectedMessageCount(expectedCount);
487        }
488    
489        /**
490         * Sets an expectation that the given header name & value are received by this endpoint
491         * <p/>
492         * You can set multiple expectations for different header names.
493         * If you set a value of <tt>null</tt> that means we accept either the header is absent, or its value is <tt>null</tt>
494         */
495        public void expectedHeaderReceived(final String name, final Object value) {
496            if (expectedHeaderValues == null) {
497                expectedHeaderValues = new CaseInsensitiveMap();
498                // we just wants to expects to be called once
499                expects(new Runnable() {
500                    public void run() {
501                        for (int i = 0; i < getReceivedExchanges().size(); i++) {
502                            Exchange exchange = getReceivedExchange(i);
503                            for (Map.Entry<String, Object> entry : expectedHeaderValues.entrySet()) {
504                                String key = entry.getKey();
505                                Object expectedValue = entry.getValue();
506    
507                                // we accept that an expectedValue of null also means that the header may be absent
508                                if (expectedValue != null) {
509                                    assertTrue("Exchange " + i + " has no headers", exchange.getIn().hasHeaders());
510                                    boolean hasKey = exchange.getIn().getHeaders().containsKey(key);
511                                    assertTrue("No header with name " + key + " found for message: " + i, hasKey);
512                                }
513    
514                                Object actualValue = exchange.getIn().getHeader(key);
515                                actualValue = extractActualValue(exchange, actualValue, expectedValue);
516    
517                                assertEquals("Header with name " + key + " for message: " + i, expectedValue, actualValue);
518                            }
519                        }
520                    }
521                });
522            }
523            expectedHeaderValues.put(name, value);
524        }
525    
526        /**
527         * Adds an expectation that the given header values are received by this
528         * endpoint in any order
529         */
530        public void expectedHeaderValuesReceivedInAnyOrder(final String name, final List<?> values) {
531            expectedMessageCount(values.size());
532    
533            expects(new Runnable() {
534                public void run() {
535                    // these are the expected values to find
536                    final Set<Object> actualHeaderValues = new CopyOnWriteArraySet<Object>(values);
537    
538                    for (int i = 0; i < getReceivedExchanges().size(); i++) {
539                        Exchange exchange = getReceivedExchange(i);
540    
541                        Object actualValue = exchange.getIn().getHeader(name);
542                        for (Object expectedValue : actualHeaderValues) {
543                            actualValue = extractActualValue(exchange, actualValue, expectedValue);
544                            // remove any found values
545                            actualHeaderValues.remove(actualValue);
546                        }
547                    }
548    
549                    // should be empty, as we should find all the values
550                    assertTrue("Expected " + values.size() + " headers with key[" + name + "], received " + (values.size() - actualHeaderValues.size())
551                            + " headers. Expected header values: " + actualHeaderValues, actualHeaderValues.isEmpty());
552                }
553            });
554        }
555    
556        /**
557         * Adds an expectation that the given header values are received by this
558         * endpoint in any order
559         */
560        public void expectedHeaderValuesReceivedInAnyOrder(String name, Object... values) {
561            List<Object> valueList = new ArrayList<Object>();
562            valueList.addAll(Arrays.asList(values));
563            expectedHeaderValuesReceivedInAnyOrder(name, valueList);
564        }
565    
566        /**
567         * Sets an expectation that the given property name & value are received by this endpoint
568         * <p/>
569         * You can set multiple expectations for different property names.
570         * If you set a value of <tt>null</tt> that means we accept either the property is absent, or its value is <tt>null</tt>
571         */
572        public void expectedPropertyReceived(final String name, final Object value) {
573            if (expectedPropertyValues == null) {
574                expectedPropertyValues = new ConcurrentHashMap<String, Object>();
575            }
576            if (value != null) {
577                // ConcurrentHashMap cannot store null values
578                expectedPropertyValues.put(name, value);
579            }
580    
581            expects(new Runnable() {
582                public void run() {
583                    for (int i = 0; i < getReceivedExchanges().size(); i++) {
584                        Exchange exchange = getReceivedExchange(i);
585                        for (Map.Entry<String, Object> entry : expectedPropertyValues.entrySet()) {
586                            String key = entry.getKey();
587                            Object expectedValue = entry.getValue();
588    
589                            // we accept that an expectedValue of null also means that the header may be absent
590                            if (expectedValue != null) {
591                                assertTrue("Exchange " + i + " has no properties", !exchange.getProperties().isEmpty());
592                                boolean hasKey = exchange.getProperties().containsKey(key);
593                                assertTrue("No property with name " + key + " found for message: " + i, hasKey);
594                            }
595    
596                            Object actualValue = exchange.getProperty(key);
597                            actualValue = extractActualValue(exchange, actualValue, expectedValue);
598    
599                            assertEquals("Property with name " + key + " for message: " + i, expectedValue, actualValue);
600                        }
601                    }
602                }
603            });
604        }
605    
606        /**
607         * Adds an expectation that the given body values are received by this
608         * endpoint in the specified order
609         */
610        public void expectedBodiesReceived(final List<?> bodies) {
611            expectedMessageCount(bodies.size());
612            this.expectedBodyValues = bodies;
613            this.actualBodyValues = new ArrayList<Object>();
614    
615            expects(new Runnable() {
616                public void run() {
617                    for (int i = 0; i < expectedBodyValues.size(); i++) {
618                        Exchange exchange = getReceivedExchange(i);
619                        assertTrue("No exchange received for counter: " + i, exchange != null);
620    
621                        Object expectedBody = expectedBodyValues.get(i);
622                        Object actualBody = null;
623                        if (i < actualBodyValues.size()) {
624                            actualBody = actualBodyValues.get(i);
625                        }
626                        actualBody = extractActualValue(exchange, actualBody, expectedBody);
627    
628                        assertEquals("Body of message: " + i, expectedBody, actualBody);
629                    }
630                }
631            });
632        }
633    
634        private Object extractActualValue(Exchange exchange, Object actualValue, Object expectedValue) {
635            if (actualValue == null) {
636                return null;
637            }
638    
639            if (actualValue instanceof Expression) {
640                actualValue = ((Expression)actualValue).evaluate(exchange, expectedValue != null ? expectedValue.getClass() : Object.class);
641            } else if (actualValue instanceof Predicate) {
642                actualValue = ((Predicate)actualValue).matches(exchange);
643            } else if (expectedValue != null) {
644                String from = actualValue.getClass().getName();
645                String to = expectedValue.getClass().getName();
646                actualValue = getCamelContext().getTypeConverter().convertTo(expectedValue.getClass(), exchange, actualValue);
647                assertTrue("There is no type conversion possible from " + from + " to " + to, actualValue != null);
648            }
649            return actualValue;
650        }
651    
652        /**
653         * Sets an expectation that the given predicates matches the received messages by this endpoint
654         */
655        public void expectedMessagesMatches(Predicate... predicates) {
656            for (int i = 0; i < predicates.length; i++) {
657                final int messageIndex = i;
658                final Predicate predicate = predicates[i];
659                final AssertionClause clause = new AssertionClause(this) {
660                    public void run() {
661                        addPredicate(predicate);
662                        applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
663                    }
664                };
665                expects(clause);
666            }
667        }
668    
669        /**
670         * Sets an expectation that the given body values are received by this endpoint
671         */
672        public void expectedBodiesReceived(Object... bodies) {
673            List<Object> bodyList = new ArrayList<Object>();
674            bodyList.addAll(Arrays.asList(bodies));
675            expectedBodiesReceived(bodyList);
676        }
677    
678        /**
679         * Adds an expectation that the given body value are received by this endpoint
680         */
681        public AssertionClause expectedBodyReceived() {
682            expectedMessageCount(1);
683            final AssertionClause clause = new AssertionClause(this) {
684                public void run() {
685                    Exchange exchange = getReceivedExchange(0);
686                    assertTrue("No exchange received for counter: " + 0, exchange != null);
687    
688                    Object actualBody = exchange.getIn().getBody();
689                    Expression exp = createExpression(getCamelContext());
690                    Object expectedBody = exp.evaluate(exchange, Object.class);
691    
692                    assertEquals("Body of message: " + 0, expectedBody, actualBody);
693                }
694            };
695            expects(clause);
696            return clause;
697        }
698    
699        /**
700         * Adds an expectation that the given body values are received by this
701         * endpoint in any order
702         */
703        public void expectedBodiesReceivedInAnyOrder(final List<?> bodies) {
704            expectedMessageCount(bodies.size());
705            this.expectedBodyValues = bodies;
706            this.actualBodyValues = new ArrayList<Object>();
707    
708            expects(new Runnable() {
709                public void run() {
710                    List<Object> actualBodyValuesSet = new ArrayList<Object>(actualBodyValues);
711                    for (int i = 0; i < expectedBodyValues.size(); i++) {
712                        Exchange exchange = getReceivedExchange(i);
713                        assertTrue("No exchange received for counter: " + i, exchange != null);
714    
715                        Object expectedBody = expectedBodyValues.get(i);
716                        assertTrue("Message with body " + expectedBody + " was expected but not found in " + actualBodyValuesSet, actualBodyValuesSet.remove(expectedBody));
717                    }
718                }
719            });
720        }
721    
722        /**
723         * Adds an expectation that the given body values are received by this
724         * endpoint in any order
725         */
726        public void expectedBodiesReceivedInAnyOrder(Object... bodies) {
727            List<Object> bodyList = new ArrayList<Object>();
728            bodyList.addAll(Arrays.asList(bodies));
729            expectedBodiesReceivedInAnyOrder(bodyList);
730        }
731    
732        /**
733         * Adds an expectation that a file exists with the given name
734         *
735         * @param name name of file, will cater for / and \ on different OS platforms
736         */
737        public void expectedFileExists(final String name) {
738            expectedFileExists(name, null);
739        }
740    
741        /**
742         * Adds an expectation that a file exists with the given name
743         * <p/>
744         * Will wait at most 5 seconds while checking for the existence of the file.
745         *
746         * @param name name of file, will cater for / and \ on different OS platforms
747         * @param content content of file to compare, can be <tt>null</tt> to not compare content
748         */
749        public void expectedFileExists(final String name, final String content) {
750            final File file = new File(FileUtil.normalizePath(name));
751    
752            expects(new Runnable() {
753                public void run() {
754                    // wait at most 5 seconds for the file to exists
755                    final long timeout = System.currentTimeMillis() + 5000;
756    
757                    boolean stop = false;
758                    while (!stop && !file.exists()) {
759                        try {
760                            Thread.sleep(50);
761                        } catch (InterruptedException e) {
762                            // ignore
763                        }
764                        stop = System.currentTimeMillis() > timeout;
765                    }
766    
767                    assertTrue("The file should exists: " + name, file.exists());
768    
769                    if (content != null) {
770                        String body = getCamelContext().getTypeConverter().convertTo(String.class, file);
771                        assertEquals("Content of file: " + name, content, body);
772                    }
773                }
774            });
775        }
776    
777        /**
778         * Adds an expectation that messages received should have the given exchange pattern
779         */
780        public void expectedExchangePattern(final ExchangePattern exchangePattern) {
781            expectedMessagesMatches(new Predicate() {
782                public boolean matches(Exchange exchange) {
783                    return exchange.getPattern().equals(exchangePattern);
784                }
785            });
786        }
787    
788        /**
789         * Adds an expectation that messages received should have ascending values
790         * of the given expression such as a user generated counter value
791         */
792        public void expectsAscending(final Expression expression) {
793            expects(new Runnable() {
794                public void run() {
795                    assertMessagesAscending(expression);
796                }
797            });
798        }
799    
800        /**
801         * Adds an expectation that messages received should have ascending values
802         * of the given expression such as a user generated counter value
803         */
804        public AssertionClause expectsAscending() {
805            final AssertionClause clause = new AssertionClause(this) {
806                public void run() {
807                    assertMessagesAscending(createExpression(getCamelContext()));
808                }
809            };
810            expects(clause);
811            return clause;
812        }
813    
814        /**
815         * Adds an expectation that messages received should have descending values
816         * of the given expression such as a user generated counter value
817         */
818        public void expectsDescending(final Expression expression) {
819            expects(new Runnable() {
820                public void run() {
821                    assertMessagesDescending(expression);
822                }
823            });
824        }
825    
826        /**
827         * Adds an expectation that messages received should have descending values
828         * of the given expression such as a user generated counter value
829         */
830        public AssertionClause expectsDescending() {
831            final AssertionClause clause = new AssertionClause(this) {
832                public void run() {
833                    assertMessagesDescending(createExpression(getCamelContext()));
834                }
835            };
836            expects(clause);
837            return clause;
838        }
839    
840        /**
841         * Adds an expectation that no duplicate messages should be received using
842         * the expression to determine the message ID
843         *
844         * @param expression the expression used to create a unique message ID for
845         *                message comparison (which could just be the message
846         *                payload if the payload can be tested for uniqueness using
847         *                {@link Object#equals(Object)} and
848         *                {@link Object#hashCode()}
849         */
850        public void expectsNoDuplicates(final Expression expression) {
851            expects(new Runnable() {
852                public void run() {
853                    assertNoDuplicates(expression);
854                }
855            });
856        }
857    
858        /**
859         * Adds an expectation that no duplicate messages should be received using
860         * the expression to determine the message ID
861         */
862        public AssertionClause expectsNoDuplicates() {
863            final AssertionClause clause = new AssertionClause(this) {
864                public void run() {
865                    assertNoDuplicates(createExpression(getCamelContext()));
866                }
867            };
868            expects(clause);
869            return clause;
870        }
871    
872        /**
873         * Asserts that the messages have ascending values of the given expression
874         */
875        public void assertMessagesAscending(Expression expression) {
876            assertMessagesSorted(expression, true);
877        }
878    
879        /**
880         * Asserts that the messages have descending values of the given expression
881         */
882        public void assertMessagesDescending(Expression expression) {
883            assertMessagesSorted(expression, false);
884        }
885    
886        protected void assertMessagesSorted(Expression expression, boolean ascending) {
887            String type = ascending ? "ascending" : "descending";
888            ExpressionComparator comparator = new ExpressionComparator(expression);
889            List<Exchange> list = getReceivedExchanges();
890            for (int i = 1; i < list.size(); i++) {
891                int j = i - 1;
892                Exchange e1 = list.get(j);
893                Exchange e2 = list.get(i);
894                int result = comparator.compare(e1, e2);
895                if (result == 0) {
896                    fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: "
897                        + expression.evaluate(e1, Object.class) + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
898                } else {
899                    if (!ascending) {
900                        result = result * -1;
901                    }
902                    if (result > 0) {
903                        fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1, Object.class)
904                            + " and message " + i + " has value: " + expression.evaluate(e2, Object.class) + " for expression: "
905                            + expression + ". Exchanges: " + e1 + " and " + e2);
906                    }
907                }
908            }
909        }
910    
911        public void assertNoDuplicates(Expression expression) {
912            Map<Object, Exchange> map = new HashMap<Object, Exchange>();
913            List<Exchange> list = getReceivedExchanges();
914            for (int i = 0; i < list.size(); i++) {
915                Exchange e2 = list.get(i);
916                Object key = expression.evaluate(e2, Object.class);
917                Exchange e1 = map.get(key);
918                if (e1 != null) {
919                    fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
920                } else {
921                    map.put(key, e2);
922                }
923            }
924        }
925    
926        /**
927         * Adds the expectation which will be invoked when enough messages are received
928         */
929        public void expects(Runnable runnable) {
930            tests.add(runnable);
931        }
932    
933        /**
934         * Adds an assertion to the given message index
935         *
936         * @param messageIndex the number of the message
937         * @return the assertion clause
938         */
939        public AssertionClause message(final int messageIndex) {
940            final AssertionClause clause = new AssertionClause(this) {
941                public void run() {
942                    applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
943                }
944            };
945            expects(clause);
946            return clause;
947        }
948    
949        /**
950         * Adds an assertion to all the received messages
951         *
952         * @return the assertion clause
953         */
954        public AssertionClause allMessages() {
955            final AssertionClause clause = new AssertionClause(this) {
956                public void run() {
957                    List<Exchange> list = getReceivedExchanges();
958                    int index = 0;
959                    for (Exchange exchange : list) {
960                        applyAssertionOn(MockEndpoint.this, index++, exchange);
961                    }
962                }
963            };
964            expects(clause);
965            return clause;
966        }
967    
968        /**
969         * Asserts that the given index of message is received (starting at zero)
970         */
971        public Exchange assertExchangeReceived(int index) {
972            int count = getReceivedCounter();
973            assertTrue("Not enough messages received. Was: " + count, count > index);
974            return getReceivedExchange(index);
975        }
976    
977        // Properties
978        // -------------------------------------------------------------------------
979        public List<Throwable> getFailures() {
980            return failures;
981        }
982    
983        public int getReceivedCounter() {
984            return counter;
985        }
986    
987        public List<Exchange> getReceivedExchanges() {
988            return receivedExchanges;
989        }
990    
991        public int getExpectedCount() {
992            return expectedCount;
993        }
994    
995        public long getSleepForEmptyTest() {
996            return sleepForEmptyTest;
997        }
998    
999        /**
1000         * Allows a sleep to be specified to wait to check that this endpoint really
1001         * is empty when {@link #expectedMessageCount(int)} is called with zero
1002         *
1003         * @param sleepForEmptyTest the milliseconds to sleep for to determine that
1004         *                this endpoint really is empty
1005         */
1006        public void setSleepForEmptyTest(long sleepForEmptyTest) {
1007            this.sleepForEmptyTest = sleepForEmptyTest;
1008        }
1009    
1010        public long getResultWaitTime() {
1011            return resultWaitTime;
1012        }
1013    
1014        /**
1015         * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will
1016         * wait on a latch until it is satisfied
1017         */
1018        public void setResultWaitTime(long resultWaitTime) {
1019            this.resultWaitTime = resultWaitTime;
1020        }
1021    
1022        /**
1023         * Sets the minimum expected amount of time (in millis) the {@link #assertIsSatisfied()} will
1024         * wait on a latch until it is satisfied
1025         */
1026        public void setMinimumResultWaitTime(long resultMinimumWaitTime) {
1027            this.resultMinimumWaitTime = resultMinimumWaitTime;
1028        }
1029    
1030        /**
1031         * Specifies the expected number of message exchanges that should be
1032         * received by this endpoint.
1033         * <p/>
1034         * If you want to assert that <b>exactly</b> n'th message arrives to this mock
1035         * endpoint, then see also the {@link #setAssertPeriod(long)} method for further details.
1036         *
1037         * @param expectedCount the number of message exchanges that should be
1038         *                expected by this endpoint
1039         * @see #setAssertPeriod(long)                      
1040         */
1041        public void setExpectedMessageCount(int expectedCount) {
1042            this.expectedCount = expectedCount;
1043            if (expectedCount <= 0) {
1044                latch = null;
1045            } else {
1046                latch = new CountDownLatch(expectedCount);
1047            }
1048        }
1049    
1050        /**
1051         * Specifies the minimum number of expected message exchanges that should be
1052         * received by this endpoint
1053         *
1054         * @param expectedCount the number of message exchanges that should be
1055         *                expected by this endpoint
1056         */
1057        public void setMinimumExpectedMessageCount(int expectedCount) {
1058            this.expectedMinimumCount = expectedCount;
1059            if (expectedCount <= 0) {
1060                latch = null;
1061            } else {
1062                latch = new CountDownLatch(expectedMinimumCount);
1063            }
1064        }
1065    
1066        public Processor getReporter() {
1067            return reporter;
1068        }
1069    
1070        /**
1071         * Allows a processor to added to the endpoint to report on progress of the test
1072         */
1073        public void setReporter(Processor reporter) {
1074            this.reporter = reporter;
1075        }
1076    
1077        /**
1078         * Specifies to only retain the first n'th number of received {@link Exchange}s.
1079         * <p/>
1080         * This is used when testing with big data, to reduce memory consumption by not storing
1081         * copies of every {@link Exchange} this mock endpoint receives.
1082         * <p/>
1083         * <b>Important:</b> When using this limitation, then the {@link #getReceivedCounter()}
1084         * will still return the actual number of received {@link Exchange}s. For example
1085         * if we have received 5000 {@link Exchange}s, and have configured to only retain the first
1086         * 10 {@link Exchange}s, then the {@link #getReceivedCounter()} will still return <tt>5000</tt>
1087         * but there is only the first 10 {@link Exchange}s in the {@link #getExchanges()} and
1088         * {@link #getReceivedExchanges()} methods.
1089         * <p/>
1090         * When using this method, then some of the other expectation methods is not supported,
1091         * for example the {@link #expectedBodiesReceived(Object...)} sets a expectation on the first
1092         * number of bodies received.
1093         * <p/>
1094         * You can configure both {@link #setRetainFirst(int)} and {@link #setRetainLast(int)} methods,
1095         * to limit both the first and last received.
1096         * 
1097         * @param retainFirst  to limit and only keep the first n'th received {@link Exchange}s, use
1098         *                     <tt>0</tt> to not retain any messages, or <tt>-1</tt> to retain all.
1099         * @see #setRetainLast(int)
1100         */
1101        public void setRetainFirst(int retainFirst) {
1102            this.retainFirst = retainFirst;
1103        }
1104    
1105        /**
1106         * Specifies to only retain the last n'th number of received {@link Exchange}s.
1107         * <p/>
1108         * This is used when testing with big data, to reduce memory consumption by not storing
1109         * copies of every {@link Exchange} this mock endpoint receives.
1110         * <p/>
1111         * <b>Important:</b> When using this limitation, then the {@link #getReceivedCounter()}
1112         * will still return the actual number of received {@link Exchange}s. For example
1113         * if we have received 5000 {@link Exchange}s, and have configured to only retain the last
1114         * 20 {@link Exchange}s, then the {@link #getReceivedCounter()} will still return <tt>5000</tt>
1115         * but there is only the last 20 {@link Exchange}s in the {@link #getExchanges()} and
1116         * {@link #getReceivedExchanges()} methods.
1117         * <p/>
1118         * When using this method, then some of the other expectation methods is not supported,
1119         * for example the {@link #expectedBodiesReceived(Object...)} sets a expectation on the first
1120         * number of bodies received.
1121         * <p/>
1122         * You can configure both {@link #setRetainFirst(int)} and {@link #setRetainLast(int)} methods,
1123         * to limit both the first and last received.
1124         *
1125         * @param retainLast  to limit and only keep the last n'th received {@link Exchange}s, use
1126         *                     <tt>0</tt> to not retain any messages, or <tt>-1</tt> to retain all.
1127         * @see #setRetainFirst(int)
1128         */
1129        public void setRetainLast(int retainLast) {
1130            this.retainLast = retainLast;
1131        }
1132    
1133        // Implementation methods
1134        // -------------------------------------------------------------------------
1135        private void init() {
1136            expectedCount = -1;
1137            counter = 0;
1138            defaultProcessor = null;
1139            processors = new HashMap<Integer, Processor>();
1140            receivedExchanges = new CopyOnWriteArrayList<Exchange>();
1141            failures = new CopyOnWriteArrayList<Throwable>();
1142            tests = new CopyOnWriteArrayList<Runnable>();
1143            latch = null;
1144            sleepForEmptyTest = 0;
1145            resultWaitTime = 0;
1146            resultMinimumWaitTime = 0L;
1147            assertPeriod = 0L;
1148            expectedMinimumCount = -1;
1149            expectedBodyValues = null;
1150            actualBodyValues = new ArrayList<Object>();
1151            expectedHeaderValues = null;
1152            actualHeaderValues = null;
1153            expectedPropertyValues = null;
1154            actualPropertyValues = null;
1155            retainFirst = -1;
1156            retainLast = -1;
1157        }
1158    
1159        protected synchronized void onExchange(Exchange exchange) {
1160            try {
1161                if (reporter != null) {
1162                    reporter.process(exchange);
1163                }
1164                Exchange copy = exchange;
1165                if (copyOnExchange) {
1166                    // copy the exchange so the mock stores the copy and not the actual exchange
1167                    copy = ExchangeHelper.createCopy(exchange, true);
1168                }
1169                performAssertions(exchange, copy);
1170            } catch (Throwable e) {
1171                // must catch java.lang.Throwable as AssertionError extends java.lang.Error
1172                failures.add(e);
1173            } finally {
1174                // make sure latch is counted down to avoid test hanging forever
1175                if (latch != null) {
1176                    latch.countDown();
1177                }
1178            }
1179        }
1180    
1181        /**
1182         * Performs the assertions on the incoming exchange.
1183         *
1184         * @param exchange   the actual exchange
1185         * @param copy       a copy of the exchange (only store this)
1186         * @throws Exception can be thrown if something went wrong
1187         */
1188        protected void performAssertions(Exchange exchange, Exchange copy) throws Exception {
1189            Message in = copy.getIn();
1190            Object actualBody = in.getBody();
1191    
1192            if (expectedHeaderValues != null) {
1193                if (actualHeaderValues == null) {
1194                    actualHeaderValues = new CaseInsensitiveMap();
1195                }
1196                if (in.hasHeaders()) {
1197                    actualHeaderValues.putAll(in.getHeaders());
1198                }
1199            }
1200    
1201            if (expectedPropertyValues != null) {
1202                if (actualPropertyValues == null) {
1203                    actualPropertyValues = new ConcurrentHashMap<String, Object>();
1204                }
1205                actualPropertyValues.putAll(copy.getProperties());
1206            }
1207    
1208            if (expectedBodyValues != null) {
1209                int index = actualBodyValues.size();
1210                if (expectedBodyValues.size() > index) {
1211                    Object expectedBody = expectedBodyValues.get(index);
1212                    if (expectedBody != null) {
1213                        // prefer to convert body early, for example when using files
1214                        // we need to read the content at this time
1215                        Object body = in.getBody(expectedBody.getClass());
1216                        if (body != null) {
1217                            actualBody = body;
1218                        }
1219                    }
1220                    actualBodyValues.add(actualBody);
1221                }
1222            }
1223    
1224            // let counter be 0 index-based in the logs
1225            if (LOG.isDebugEnabled()) {
1226                String msg = getEndpointUri() + " >>>> " + counter + " : " + copy + " with body: " + actualBody;
1227                if (copy.getIn().hasHeaders()) {
1228                    msg += " and headers:" + copy.getIn().getHeaders();
1229                }
1230                LOG.debug(msg);
1231            }
1232    
1233            // record timestamp when exchange was received
1234            copy.setProperty(Exchange.RECEIVED_TIMESTAMP, new Date());
1235    
1236            // add a copy of the received exchange
1237            addReceivedExchange(copy);
1238            // and then increment counter after adding received exchange
1239            ++counter;
1240    
1241            Processor processor = processors.get(getReceivedCounter()) != null
1242                    ? processors.get(getReceivedCounter()) : defaultProcessor;
1243    
1244            if (processor != null) {
1245                try {
1246                    // must process the incoming exchange and NOT the copy as the idea
1247                    // is the end user can manipulate the exchange
1248                    processor.process(exchange);
1249                } catch (Exception e) {
1250                    // set exceptions on exchange so we can throw exceptions to simulate errors
1251                    exchange.setException(e);
1252                }
1253            }
1254        }
1255    
1256        /**
1257         * Adds the received exchange.
1258         * 
1259         * @param copy  a copy of the received exchange
1260         */
1261        protected void addReceivedExchange(Exchange copy) {
1262            if (retainFirst == 0 && retainLast == 0) {
1263                // do not retain any messages at all
1264            } else if (retainFirst < 0 && retainLast < 0) {
1265                // no limitation so keep them all
1266                receivedExchanges.add(copy);
1267            } else {
1268                // okay there is some sort of limitations, so figure out what to retain
1269                if (retainFirst > 0 && counter < retainFirst) {
1270                    // store a copy as its within the retain first limitation
1271                    receivedExchanges.add(copy);
1272                } else if (retainLast > 0) {
1273                    // remove the oldest from the last retained boundary,
1274                    int index = receivedExchanges.size() - retainLast;
1275                    if (index >= 0) {
1276                        // but must be outside the first range as well
1277                        // otherwise we should not remove the oldest
1278                        if (retainFirst <= 0 || retainFirst <= index) {
1279                            receivedExchanges.remove(index);
1280                        }
1281                    }
1282                    // store a copy of the last n'th received
1283                    receivedExchanges.add(copy);
1284                }
1285            }
1286        }
1287    
1288        protected void waitForCompleteLatch() throws InterruptedException {
1289            if (latch == null) {
1290                fail("Should have a latch!");
1291            }
1292    
1293            StopWatch watch = new StopWatch();
1294            waitForCompleteLatch(resultWaitTime);
1295            long delta = watch.stop();
1296            LOG.debug("Took {} millis to complete latch", delta);
1297    
1298            if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) {
1299                fail("Expected minimum " + resultMinimumWaitTime
1300                    + " millis waiting on the result, but was faster with " + delta + " millis.");
1301            }
1302        }
1303    
1304        protected void waitForCompleteLatch(long timeout) throws InterruptedException {
1305            // Wait for a default 10 seconds if resultWaitTime is not set
1306            long waitTime = timeout == 0 ? 10000L : timeout;
1307    
1308            // now let's wait for the results
1309            LOG.debug("Waiting on the latch for: " + timeout + " millis");
1310            latch.await(waitTime, TimeUnit.MILLISECONDS);
1311        }
1312    
1313        protected void assertEquals(String message, Object expectedValue, Object actualValue) {
1314            if (!ObjectHelper.equal(expectedValue, actualValue)) {
1315                fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
1316            }
1317        }
1318    
1319        protected void assertTrue(String message, boolean predicate) {
1320            if (!predicate) {
1321                fail(message);
1322            }
1323        }
1324    
1325        protected void fail(Object message) {
1326            if (LOG.isDebugEnabled()) {
1327                List<Exchange> list = getReceivedExchanges();
1328                int index = 0;
1329                for (Exchange exchange : list) {
1330                    LOG.debug("{} failed and received[{}]: {}", new Object[]{getEndpointUri(), ++index, exchange});
1331                }
1332            }
1333            throw new AssertionError(getEndpointUri() + " " + message);
1334        }
1335    
1336        public int getExpectedMinimumCount() {
1337            return expectedMinimumCount;
1338        }
1339    
1340        public void await() throws InterruptedException {
1341            if (latch != null) {
1342                latch.await();
1343            }
1344        }
1345    
1346        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
1347            if (latch != null) {
1348                return latch.await(timeout, unit);
1349            }
1350            return true;
1351        }
1352    
1353        public boolean isSingleton() {
1354            return true;
1355        }
1356    
1357        public boolean isLenientProperties() {
1358            return true;
1359        }
1360    
1361        private Exchange getReceivedExchange(int index) {
1362            if (index <= receivedExchanges.size() - 1) {
1363                return receivedExchanges.get(index);
1364            } else {
1365                return null;
1366            }
1367        }
1368    
1369    }