001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.builder;
018    
019    import java.util.ArrayList;
020    import java.util.Arrays;
021    import java.util.EventObject;
022    import java.util.List;
023    import java.util.concurrent.ConcurrentHashMap;
024    import java.util.concurrent.ConcurrentMap;
025    import java.util.concurrent.CountDownLatch;
026    import java.util.concurrent.TimeUnit;
027    import java.util.concurrent.atomic.AtomicBoolean;
028    import java.util.concurrent.atomic.AtomicInteger;
029    
030    import org.apache.camel.CamelContext;
031    import org.apache.camel.Endpoint;
032    import org.apache.camel.Exchange;
033    import org.apache.camel.Expression;
034    import org.apache.camel.Predicate;
035    import org.apache.camel.Producer;
036    import org.apache.camel.component.direct.DirectEndpoint;
037    import org.apache.camel.component.mock.MockEndpoint;
038    import org.apache.camel.management.event.ExchangeCompletedEvent;
039    import org.apache.camel.management.event.ExchangeCreatedEvent;
040    import org.apache.camel.management.event.ExchangeFailedEvent;
041    import org.apache.camel.management.event.ExchangeSentEvent;
042    import org.apache.camel.spi.EventNotifier;
043    import org.apache.camel.support.EventNotifierSupport;
044    import org.apache.camel.util.EndpointHelper;
045    import org.apache.camel.util.ObjectHelper;
046    import org.apache.camel.util.ServiceHelper;
047    import org.slf4j.Logger;
048    import org.slf4j.LoggerFactory;
049    
050    /**
051     * A builder to build an expression based on {@link org.apache.camel.spi.EventNotifier} notifications
052     * about {@link Exchange} being routed.
053     * <p/>
054     * This builder can be used for testing purposes where you want to know when a test is supposed to be done.
055     * The idea is that you can build an expression that explains when the test is done. For example when Camel
056     * have finished routing 5 messages. You can then in your test await for this condition to occur.
057     *
058     * @version 
059     */
060    public class NotifyBuilder {
061    
062        private static final Logger LOG = LoggerFactory.getLogger(NotifyBuilder.class);
063    
064        private final CamelContext context;
065    
066        // notifier to hook into Camel to listen for events
067        private final EventNotifier eventNotifier;
068    
069        // the predicates build with this builder
070        private final List<EventPredicateHolder> predicates = new ArrayList<EventPredicateHolder>();
071    
072        // latch to be used to signal predicates matches
073        private CountDownLatch latch = new CountDownLatch(1);
074    
075        // the current state while building an event predicate where we use a stack and the operation
076        private final List<EventPredicate> stack = new ArrayList<EventPredicate>();
077        private EventOperation operation;
078        private boolean created;
079        // keep state of how many wereSentTo we have added
080        private int wereSentToIndex;
081    
082        // computed value whether all the predicates matched
083        private volatile boolean matches;
084    
085        /**
086         * Creates a new builder.
087         *
088         * @param context the Camel context
089         */
090        public NotifyBuilder(CamelContext context) {
091            this.context = context;
092            eventNotifier = new ExchangeNotifier();
093            try {
094                ServiceHelper.startService(eventNotifier);
095            } catch (Exception e) {
096                throw ObjectHelper.wrapRuntimeCamelException(e);
097            }
098            context.getManagementStrategy().addEventNotifier(eventNotifier);
099        }
100    
101        /**
102         * Optionally a <tt>from</tt> endpoint which means that this expression should only be based
103         * on {@link Exchange} which is originated from the particular endpoint(s).
104         *
105         * @param endpointUri uri of endpoint or pattern (see the EndpointHelper javadoc)
106         * @return the builder
107         * @see org.apache.camel.util.EndpointHelper#matchEndpoint(org.apache.camel.CamelContext, String, String)
108         */
109        public NotifyBuilder from(final String endpointUri) {
110            stack.add(new EventPredicateSupport() {
111    
112                @Override
113                public boolean isAbstract() {
114                    // is abstract as its a filter
115                    return true;
116                }
117    
118                @Override
119                public boolean onExchange(Exchange exchange) {
120                    // filter non matching exchanges
121                    return EndpointHelper.matchEndpoint(context, exchange.getFromEndpoint().getEndpointUri(), endpointUri);
122                }
123    
124                public boolean matches() {
125                    // should be true as we use the onExchange to filter
126                    return true;
127                }
128    
129                @Override
130                public String toString() {
131                    return "from(" + endpointUri + ")";
132                }
133            });
134            return this;
135        }
136    
137        /**
138         * Optionally a <tt>from</tt> route which means that this expression should only be based
139         * on {@link Exchange} which is originated from the particular route(s).
140         *
141         * @param routeId id of route or pattern (see the EndpointHelper javadoc)
142         * @return the builder
143         * @see org.apache.camel.util.EndpointHelper#matchEndpoint(org.apache.camel.CamelContext, String, String)
144         */
145        public NotifyBuilder fromRoute(final String routeId) {
146            stack.add(new EventPredicateSupport() {
147    
148                @Override
149                public boolean isAbstract() {
150                    // is abstract as its a filter
151                    return true;
152                }
153    
154                @Override
155                public boolean onExchange(Exchange exchange) {
156                    String id = EndpointHelper.getRouteIdFromEndpoint(exchange.getFromEndpoint());
157                    // filter non matching exchanges
158                    return EndpointHelper.matchPattern(id, routeId);
159                }
160    
161                public boolean matches() {
162                    // should be true as we use the onExchange to filter
163                    return true;
164                }
165    
166                @Override
167                public String toString() {
168                    return "fromRoute(" + routeId + ")";
169                }
170            });
171            return this;
172        }
173    
174        private NotifyBuilder fromRoutesOnly() {
175            // internal and should always be in top of stack
176            stack.add(0, new EventPredicateSupport() {
177    
178                @Override
179                public boolean isAbstract() {
180                    // is abstract as its a filter
181                    return true;
182                }
183    
184                @Override
185                public boolean onExchange(Exchange exchange) {
186                    // always accept direct endpoints as they are a special case as it will create the UoW beforehand
187                    // and just continue to route that on the consumer side, which causes the EventNotifier not to
188                    // emit events when the consumer received the exchange, as its already done. For example by
189                    // ProducerTemplate which creates the UoW before producing messages.
190                    if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint() instanceof DirectEndpoint) {
191                        return true;
192                    }
193                    return EndpointHelper.matchPattern(exchange.getFromRouteId(), "*");
194                }
195    
196                public boolean matches() {
197                    // should be true as we use the onExchange to filter
198                    return true;
199                }
200    
201                @Override
202                public String toString() {
203                    // we dont want any to string output as this is an internal predicate to match only from routes
204                    return "";
205                }
206            });
207            return this;
208        }
209    
210        /**
211         * Optionally a filter to only allow matching {@link Exchange} to be used for matching.
212         *
213         * @param predicate the predicate to use for the filter
214         * @return the builder
215         */
216        public NotifyBuilder filter(final Predicate predicate) {
217            stack.add(new EventPredicateSupport() {
218    
219                @Override
220                public boolean isAbstract() {
221                    // is abstract as its a filter
222                    return true;
223                }
224    
225                @Override
226                public boolean onExchange(Exchange exchange) {
227                    // filter non matching exchanges
228                    return predicate.matches(exchange);
229                }
230    
231                public boolean matches() {
232                    // should be true as we use the onExchange to filter
233                    return true;
234                }
235    
236                @Override
237                public String toString() {
238                    return "filter(" + predicate + ")";
239                }
240            });
241            return this;
242        }
243    
244        /**
245         * Optionally a filter to only allow matching {@link Exchange} to be used for matching.
246         *
247         * @return the builder
248         */
249        public ExpressionClauseSupport<NotifyBuilder> filter() {
250            final ExpressionClauseSupport<NotifyBuilder> clause = new ExpressionClauseSupport<NotifyBuilder>(this);
251            stack.add(new EventPredicateSupport() {
252    
253                @Override
254                public boolean isAbstract() {
255                    // is abstract as its a filter
256                    return true;
257                }
258    
259                @Override
260                public boolean onExchange(Exchange exchange) {
261                    // filter non matching exchanges
262                    Expression exp = clause.createExpression(exchange.getContext());
263                    return exp.evaluate(exchange, Boolean.class);
264                }
265    
266                public boolean matches() {
267                    // should be true as we use the onExchange to filter
268                    return true;
269                }
270    
271                @Override
272                public String toString() {
273                    return "filter(" + clause + ")";
274                }
275            });
276            return clause;
277        }
278    
279        /**
280         * Optionally a <tt>sent to</tt> endpoint which means that this expression should only be based
281         * on {@link Exchange} which has been sent to the given endpoint uri.
282         * <p/>
283         * Notice the {@link Exchange} may have been sent to other endpoints as well. This condition will match
284         * if the {@link Exchange} has been sent at least once to the given endpoint.
285         *
286         * @param endpointUri uri of endpoint or pattern (see the EndpointHelper javadoc)
287         * @return the builder
288         * @see org.apache.camel.util.EndpointHelper#matchEndpoint(org.apache.camel.CamelContext, String, String)
289         */
290        public NotifyBuilder wereSentTo(final String endpointUri) {
291            // insert in start of stack but after the previous wereSentTo
292            stack.add(wereSentToIndex++, new EventPredicateSupport() {
293                private ConcurrentMap<String, String> sentTo = new ConcurrentHashMap<String, String>();
294    
295                @Override
296                public boolean isAbstract() {
297                    // is abstract as its a filter
298                    return true;
299                }
300    
301                @Override
302                public boolean onExchangeSent(Exchange exchange, Endpoint endpoint, long timeTaken) {
303                    if (EndpointHelper.matchEndpoint(context, endpoint.getEndpointUri(), endpointUri)) {
304                        sentTo.put(exchange.getExchangeId(), exchange.getExchangeId());
305                    }
306                    return onExchange(exchange);
307                }
308    
309                @Override
310                public boolean onExchange(Exchange exchange) {
311                    // filter only when sentTo
312                    String sent = sentTo.get(exchange.getExchangeId());
313                    return sent != null;
314                }
315    
316                public boolean matches() {
317                    // should be true as we use the onExchange to filter
318                    return true;
319                }
320    
321                @Override
322                public void reset() {
323                    sentTo.clear();
324                }
325    
326                @Override
327                public String toString() {
328                    return "wereSentTo(" + endpointUri + ")";
329                }
330            });
331            return this;
332        }
333    
334        /**
335         * Sets a condition when <tt>number</tt> of {@link Exchange} has been received.
336         * <p/>
337         * The number matching is <i>at least</i> based which means that if more messages received
338         * it will match also.
339         *
340         * @param number at least number of messages
341         * @return the builder
342         */
343        public NotifyBuilder whenReceived(final int number) {
344            stack.add(new EventPredicateSupport() {
345                private AtomicInteger current = new AtomicInteger();
346    
347                @Override
348                public boolean onExchangeCreated(Exchange exchange) {
349                    current.incrementAndGet();
350                    return true;
351                }
352    
353                public boolean matches() {
354                    return current.get() >= number;
355                }
356    
357                @Override
358                public void reset() {
359                    current.set(0);
360                }
361    
362                @Override
363                public String toString() {
364                    return "whenReceived(" + number + ")";
365                }
366            });
367            return this;
368        }
369    
370        /**
371         * Sets a condition when <tt>number</tt> of {@link Exchange} is done being processed.
372         * <p/>
373         * The number matching is <i>at least</i> based which means that if more messages received
374         * it will match also.
375         * <p/>
376         * The difference between <i>done</i> and <i>completed</i> is that done can also include failed
377         * messages, where as completed is only successful processed messages.
378         *
379         * @param number at least number of messages
380         * @return the builder
381         */
382        public NotifyBuilder whenDone(final int number) {
383            stack.add(new EventPredicateSupport() {
384                private final AtomicInteger current = new AtomicInteger();
385    
386                @Override
387                public boolean onExchangeCompleted(Exchange exchange) {
388                    current.incrementAndGet();
389                    return true;
390                }
391    
392                @Override
393                public boolean onExchangeFailed(Exchange exchange) {
394                    current.incrementAndGet();
395                    return true;
396                }
397    
398                public boolean matches() {
399                    return current.get() >= number;
400                }
401    
402                @Override
403                public void reset() {
404                    current.set(0);
405                }
406    
407                @Override
408                public String toString() {
409                    return "whenDone(" + number + ")";
410                }
411            });
412            return this;
413        }
414    
415        /**
416         * Sets a condition when tne <tt>n'th</tt> (by index) {@link Exchange} is done being processed.
417         * <p/>
418         * The difference between <i>done</i> and <i>completed</i> is that done can also include failed
419         * messages, where as completed is only successful processed messages.
420         *
421         * @param index the message by index to be done
422         * @return the builder
423         */
424        public NotifyBuilder whenDoneByIndex(final int index) {
425            stack.add(new EventPredicateSupport() {
426                private AtomicInteger current = new AtomicInteger();
427                private String id;
428                private AtomicBoolean done = new AtomicBoolean();
429    
430                @Override
431                public boolean onExchangeCreated(Exchange exchange) {
432                    if (current.get() == index) {
433                        id = exchange.getExchangeId();
434                    }
435                    current.incrementAndGet();
436                    return true;
437                }
438    
439                @Override
440                public boolean onExchangeCompleted(Exchange exchange) {
441                    if (exchange.getExchangeId().equals(id)) {
442                        done.set(true);
443                    }
444                    return true;
445                }
446    
447                @Override
448                public boolean onExchangeFailed(Exchange exchange) {
449                    if (exchange.getExchangeId().equals(id)) {
450                        done.set(true);
451                    }
452                    return true;
453                }
454    
455                public boolean matches() {
456                    return done.get();
457                }
458    
459                @Override
460                public void reset() {
461                    current.set(0);
462                    id = null;
463                    done.set(false);
464                }
465    
466                @Override
467                public String toString() {
468                    return "whenDoneByIndex(" + index + ")";
469                }
470            });
471            return this;
472        }
473    
474        /**
475         * Sets a condition when <tt>number</tt> of {@link Exchange} has been completed.
476         * <p/>
477         * The number matching is <i>at least</i> based which means that if more messages received
478         * it will match also.
479         * <p/>
480         * The difference between <i>done</i> and <i>completed</i> is that done can also include failed
481         * messages, where as completed is only successful processed messages.
482         *
483         * @param number at least number of messages
484         * @return the builder
485         */
486        public NotifyBuilder whenCompleted(final int number) {
487            stack.add(new EventPredicateSupport() {
488                private AtomicInteger current = new AtomicInteger();
489    
490                @Override
491                public boolean onExchangeCompleted(Exchange exchange) {
492                    current.incrementAndGet();
493                    return true;
494                }
495    
496                public boolean matches() {
497                    return current.get() >= number;
498                }
499    
500                @Override
501                public void reset() {
502                    current.set(0);
503                }
504    
505                @Override
506                public String toString() {
507                    return "whenCompleted(" + number + ")";
508                }
509            });
510            return this;
511        }
512    
513        /**
514         * Sets a condition when <tt>number</tt> of {@link Exchange} has failed.
515         * <p/>
516         * The number matching is <i>at least</i> based which means that if more messages received
517         * it will match also.
518         *
519         * @param number at least number of messages
520         * @return the builder
521         */
522        public NotifyBuilder whenFailed(final int number) {
523            stack.add(new EventPredicateSupport() {
524                private AtomicInteger current = new AtomicInteger();
525    
526                @Override
527                public boolean onExchangeFailed(Exchange exchange) {
528                    current.incrementAndGet();
529                    return true;
530                }
531    
532                public boolean matches() {
533                    return current.get() >= number;
534                }
535    
536                @Override
537                public void reset() {
538                    current.set(0);
539                }
540    
541                @Override
542                public String toString() {
543                    return "whenFailed(" + number + ")";
544                }
545            });
546            return this;
547        }
548    
549        /**
550         * Sets a condition when <tt>number</tt> of {@link Exchange} is done being processed.
551         * <p/>
552         * messages, where as completed is only successful processed messages.
553         *
554         * @param number exactly number of messages
555         * @return the builder
556         */
557        public NotifyBuilder whenExactlyDone(final int number) {
558            stack.add(new EventPredicateSupport() {
559                private AtomicInteger current = new AtomicInteger();
560    
561                @Override
562                public boolean onExchangeCompleted(Exchange exchange) {
563                    current.incrementAndGet();
564                    return true;
565                }
566    
567                @Override
568                public boolean onExchangeFailed(Exchange exchange) {
569                    current.incrementAndGet();
570                    return true;
571                }
572    
573                public boolean matches() {
574                    return current.get() == number;
575                }
576    
577                @Override
578                public void reset() {
579                    current.set(0);
580                }
581    
582                @Override
583                public String toString() {
584                    return "whenExactlyDone(" + number + ")";
585                }
586            });
587            return this;
588        }
589    
590        /**
591         * Sets a condition when <tt>number</tt> of {@link Exchange} has been completed.
592         * <p/>
593         * The difference between <i>done</i> and <i>completed</i> is that done can also include failed
594         * messages, where as completed is only successful processed messages.
595         *
596         * @param number exactly number of messages
597         * @return the builder
598         */
599        public NotifyBuilder whenExactlyCompleted(final int number) {
600            stack.add(new EventPredicateSupport() {
601                private AtomicInteger current = new AtomicInteger();
602    
603                @Override
604                public boolean onExchangeCompleted(Exchange exchange) {
605                    current.incrementAndGet();
606                    return true;
607                }
608    
609                public boolean matches() {
610                    return current.get() == number;
611                }
612    
613                @Override
614                public void reset() {
615                    current.set(0);
616                }
617    
618                @Override
619                public String toString() {
620                    return "whenExactlyCompleted(" + number + ")";
621                }
622            });
623            return this;
624        }
625    
626        /**
627         * Sets a condition when <tt>number</tt> of {@link Exchange} has failed.
628         *
629         * @param number exactly number of messages
630         * @return the builder
631         */
632        public NotifyBuilder whenExactlyFailed(final int number) {
633            stack.add(new EventPredicateSupport() {
634                private AtomicInteger current = new AtomicInteger();
635    
636                @Override
637                public boolean onExchangeFailed(Exchange exchange) {
638                    current.incrementAndGet();
639                    return true;
640                }
641    
642                public boolean matches() {
643                    return current.get() == number;
644                }
645    
646                @Override
647                public void reset() {
648                    current.set(0);
649                }
650    
651                @Override
652                public String toString() {
653                    return "whenExactlyFailed(" + number + ")";
654                }
655            });
656            return this;
657        }
658    
659        /**
660         * Sets a condition that <b>any received</b> {@link Exchange} should match the {@link Predicate}
661         *
662         * @param predicate the predicate
663         * @return the builder
664         */
665        public NotifyBuilder whenAnyReceivedMatches(final Predicate predicate) {
666            return doWhenAnyMatches(predicate, true);
667        }
668    
669        /**
670         * Sets a condition that <b>any done</b> {@link Exchange} should match the {@link Predicate}
671         *
672         * @param predicate the predicate
673         * @return the builder
674         */
675        public NotifyBuilder whenAnyDoneMatches(final Predicate predicate) {
676            return doWhenAnyMatches(predicate, false);
677        }
678    
679        private NotifyBuilder doWhenAnyMatches(final Predicate predicate, final boolean received) {
680            stack.add(new EventPredicateSupport() {
681                private final AtomicBoolean matches = new AtomicBoolean();
682    
683                @Override
684                public boolean onExchangeCompleted(Exchange exchange) {
685                    if (!received && !matches.get()) {
686                        matches.set(predicate.matches(exchange));
687                    }
688                    return true;
689                }
690    
691                @Override
692                public boolean onExchangeFailed(Exchange exchange) {
693                    if (!received && !matches.get()) {
694                        matches.set(predicate.matches(exchange));
695                    }
696                    return true;
697                }
698    
699                @Override
700                public boolean onExchangeCreated(Exchange exchange) {
701                    if (received && !matches.get()) {
702                        matches.set(predicate.matches(exchange));
703                    }
704                    return true;
705                }
706    
707                public boolean matches() {
708                    return matches.get();
709                }
710    
711                @Override
712                public void reset() {
713                    matches.set(false);
714                }
715    
716                @Override
717                public String toString() {
718                    if (received) {
719                        return "whenAnyReceivedMatches(" + predicate + ")";
720                    } else {
721                        return "whenAnyDoneMatches(" + predicate + ")";
722                    }
723                }
724            });
725            return this;
726        }
727    
728        /**
729         * Sets a condition that <b>all received</b> {@link Exchange} should match the {@link Predicate}
730         *
731         * @param predicate the predicate
732         * @return the builder
733         */
734        public NotifyBuilder whenAllReceivedMatches(final Predicate predicate) {
735            return doWhenAllMatches(predicate, true);
736        }
737    
738        /**
739         * Sets a condition that <b>all done</b> {@link Exchange} should match the {@link Predicate}
740         *
741         * @param predicate the predicate
742         * @return the builder
743         */
744        public NotifyBuilder whenAllDoneMatches(final Predicate predicate) {
745            return doWhenAllMatches(predicate, false);
746        }
747    
748        private NotifyBuilder doWhenAllMatches(final Predicate predicate, final boolean received) {
749            stack.add(new EventPredicateSupport() {
750                private final AtomicBoolean matches = new AtomicBoolean(true);
751    
752                @Override
753                public boolean onExchangeCompleted(Exchange exchange) {
754                    if (!received && matches.get()) {
755                        matches.set(predicate.matches(exchange));
756                    }
757                    return true;
758                }
759    
760                @Override
761                public boolean onExchangeFailed(Exchange exchange) {
762                    if (!received && matches.get()) {
763                        matches.set(predicate.matches(exchange));
764                    }
765                    return true;
766                }
767    
768                @Override
769                public boolean onExchangeCreated(Exchange exchange) {
770                    if (received && matches.get()) {
771                        matches.set(predicate.matches(exchange));
772                    }
773                    return true;
774                }
775    
776                public boolean matches() {
777                    return matches.get();
778                }
779    
780                @Override
781                public void reset() {
782                    matches.set(true);
783                }
784    
785                @Override
786                public String toString() {
787                    if (received) {
788                        return "whenAllReceivedMatches(" + predicate + ")";
789                    } else {
790                        return "whenAllDoneMatches(" + predicate + ")";
791                    }
792                }
793            });
794            return this;
795        }
796    
797        /**
798         * Sets a condition when the provided mock is satisfied based on {@link Exchange}
799         * being sent to it when they are <b>done</b>.
800         * <p/>
801         * The idea is that you can use Mock for setting fine grained expectations
802         * and then use that together with this builder. The mock provided does <b>NOT</b>
803         * have to already exist in the route. You can just create a new pseudo mock
804         * and this builder will send the done {@link Exchange} to it. So its like
805         * adding the mock to the end of your route(s).
806         *
807         * @param mock the mock
808         * @return the builder
809         */
810        public NotifyBuilder whenDoneSatisfied(final MockEndpoint mock) {
811            return doWhenSatisfied(mock, false);
812        }
813    
814        /**
815         * Sets a condition when the provided mock is satisfied based on {@link Exchange}
816         * being sent to it when they are <b>received</b>.
817         * <p/>
818         * The idea is that you can use Mock for setting fine grained expectations
819         * and then use that together with this builder. The mock provided does <b>NOT</b>
820         * have to already exist in the route. You can just create a new pseudo mock
821         * and this builder will send the done {@link Exchange} to it. So its like
822         * adding the mock to the end of your route(s).
823         *
824         * @param mock the mock
825         * @return the builder
826         */
827        public NotifyBuilder whenReceivedSatisfied(final MockEndpoint mock) {
828            return doWhenSatisfied(mock, true);
829        }
830    
831        private NotifyBuilder doWhenSatisfied(final MockEndpoint mock, final boolean received) {
832            stack.add(new EventPredicateSupport() {
833                private Producer producer;
834    
835                @Override
836                public boolean onExchangeCreated(Exchange exchange) {
837                    if (received) {
838                        sendToMock(exchange);
839                    }
840                    return true;
841                }
842    
843                @Override
844                public boolean onExchangeFailed(Exchange exchange) {
845                    if (!received) {
846                        sendToMock(exchange);
847                    }
848                    return true;
849                }
850    
851                @Override
852                public boolean onExchangeCompleted(Exchange exchange) {
853                    if (!received) {
854                        sendToMock(exchange);
855                    }
856                    return true;
857                }
858    
859                private void sendToMock(Exchange exchange) {
860                    // send the exchange when its completed to the mock
861                    try {
862                        if (producer == null) {
863                            producer = mock.createProducer();
864                        }
865                        producer.process(exchange);
866                    } catch (Exception e) {
867                        throw ObjectHelper.wrapRuntimeCamelException(e);
868                    }
869                }
870    
871                public boolean matches() {
872                    try {
873                        return mock.await(0, TimeUnit.SECONDS);
874                    } catch (InterruptedException e) {
875                        throw ObjectHelper.wrapRuntimeCamelException(e);
876                    }
877                }
878    
879                @Override
880                public void reset() {
881                    mock.reset();
882                }
883    
884                @Override
885                public String toString() {
886                    if (received) {
887                        return "whenReceivedSatisfied(" + mock + ")";
888                    } else {
889                        return "whenDoneSatisfied(" + mock + ")";
890                    }
891                }
892            });
893            return this;
894        }
895    
896        /**
897         * Sets a condition when the provided mock is <b>not</b> satisfied based on {@link Exchange}
898         * being sent to it when they are <b>received</b>.
899         * <p/>
900         * The idea is that you can use Mock for setting fine grained expectations
901         * and then use that together with this builder. The mock provided does <b>NOT</b>
902         * have to already exist in the route. You can just create a new pseudo mock
903         * and this builder will send the done {@link Exchange} to it. So its like
904         * adding the mock to the end of your route(s).
905         *
906         * @param mock the mock
907         * @return the builder
908         */
909        public NotifyBuilder whenReceivedNotSatisfied(final MockEndpoint mock) {
910            return doWhenNotSatisfied(mock, true);
911        }
912    
913        /**
914         * Sets a condition when the provided mock is <b>not</b> satisfied based on {@link Exchange}
915         * being sent to it when they are <b>done</b>.
916         * <p/>
917         * The idea is that you can use Mock for setting fine grained expectations
918         * and then use that together with this builder. The mock provided does <b>NOT</b>
919         * have to already exist in the route. You can just create a new pseudo mock
920         * and this builder will send the done {@link Exchange} to it. So its like
921         * adding the mock to the end of your route(s).
922         *
923         * @param mock the mock
924         * @return the builder
925         */
926        public NotifyBuilder whenDoneNotSatisfied(final MockEndpoint mock) {
927            return doWhenNotSatisfied(mock, false);
928        }
929    
930        private NotifyBuilder doWhenNotSatisfied(final MockEndpoint mock, final boolean received) {
931            stack.add(new EventPredicateSupport() {
932                private Producer producer;
933    
934                @Override
935                public boolean onExchangeCreated(Exchange exchange) {
936                    if (received) {
937                        sendToMock(exchange);
938                    }
939                    return true;
940                }
941    
942                @Override
943                public boolean onExchangeFailed(Exchange exchange) {
944                    if (!received) {
945                        sendToMock(exchange);
946                    }
947                    return true;
948                }
949    
950                @Override
951                public boolean onExchangeCompleted(Exchange exchange) {
952                    if (!received) {
953                        sendToMock(exchange);
954                    }
955                    return true;
956                }
957    
958                private void sendToMock(Exchange exchange) {
959                    // send the exchange when its completed to the mock
960                    try {
961                        if (producer == null) {
962                            producer = mock.createProducer();
963                        }
964                        producer.process(exchange);
965                    } catch (Exception e) {
966                        throw ObjectHelper.wrapRuntimeCamelException(e);
967                    }
968                }
969    
970                public boolean matches() {
971                    try {
972                        return !mock.await(0, TimeUnit.SECONDS);
973                    } catch (InterruptedException e) {
974                        throw ObjectHelper.wrapRuntimeCamelException(e);
975                    }
976                }
977    
978                @Override
979                public void reset() {
980                    mock.reset();
981                }
982    
983                @Override
984                public String toString() {
985                    if (received) {
986                        return "whenReceivedNotSatisfied(" + mock + ")";
987                    } else {
988                        return "whenDoneNotSatisfied(" + mock + ")";
989                    }
990                }
991            });
992            return this;
993        }
994    
995        /**
996         * Sets a condition that the bodies is expected to be <b>received</b> in the order as well.
997         * <p/>
998         * This condition will discard any additional messages. If you need a more strict condition
999         * then use {@link #whenExactBodiesReceived(Object...)}
1000         *
1001         * @param bodies the expected bodies
1002         * @return the builder
1003         * @see #whenExactBodiesReceived(Object...)
1004         */
1005        public NotifyBuilder whenBodiesReceived(Object... bodies) {
1006            List<Object> bodyList = new ArrayList<Object>();
1007            bodyList.addAll(Arrays.asList(bodies));
1008            return doWhenBodies(bodyList, true, false);
1009        }
1010    
1011        /**
1012         * Sets a condition that the bodies is expected to be <b>done</b> in the order as well.
1013         * <p/>
1014         * This condition will discard any additional messages. If you need a more strict condition
1015         * then use {@link #whenExactBodiesDone(Object...)}
1016         *
1017         * @param bodies the expected bodies
1018         * @return the builder
1019         * @see #whenExactBodiesDone(Object...)
1020         */
1021        public NotifyBuilder whenBodiesDone(Object... bodies) {
1022            List<Object> bodyList = new ArrayList<Object>();
1023            bodyList.addAll(Arrays.asList(bodies));
1024            return doWhenBodies(bodyList, false, false);
1025        }
1026    
1027        /**
1028         * Sets a condition that the bodies is expected to be <b>received</b> in the order as well.
1029         * <p/>
1030         * This condition is strict which means that it only expect that exact number of bodies
1031         *
1032         * @param bodies the expected bodies
1033         * @return the builder
1034         * @see #whenBodiesReceived(Object...)
1035         */
1036        public NotifyBuilder whenExactBodiesReceived(Object... bodies) {
1037            List<Object> bodyList = new ArrayList<Object>();
1038            bodyList.addAll(Arrays.asList(bodies));
1039            return doWhenBodies(bodyList, true, true);
1040        }
1041    
1042        /**
1043         * Sets a condition that the bodies is expected to be <b>done</b> in the order as well.
1044         * <p/>
1045         * This condition is strict which means that it only expect that exact number of bodies
1046         *
1047         * @param bodies the expected bodies
1048         * @return the builder
1049         * @see #whenExactBodiesDone(Object...)
1050         */
1051        public NotifyBuilder whenExactBodiesDone(Object... bodies) {
1052            List<Object> bodyList = new ArrayList<Object>();
1053            bodyList.addAll(Arrays.asList(bodies));
1054            return doWhenBodies(bodyList, false, true);
1055        }
1056    
1057        private NotifyBuilder doWhenBodies(final List<?> bodies, final boolean received, final boolean exact) {
1058            stack.add(new EventPredicateSupport() {
1059                private volatile boolean matches;
1060                private final AtomicInteger current = new AtomicInteger();
1061    
1062                @Override
1063                public boolean onExchangeCreated(Exchange exchange) {
1064                    if (received) {
1065                        matchBody(exchange);
1066                    }
1067                    return true;
1068                }
1069    
1070                @Override
1071                public boolean onExchangeFailed(Exchange exchange) {
1072                    if (!received) {
1073                        matchBody(exchange);
1074                    }
1075                    return true;
1076                }
1077    
1078                @Override
1079                public boolean onExchangeCompleted(Exchange exchange) {
1080                    if (!received) {
1081                        matchBody(exchange);
1082                    }
1083                    return true;
1084                }
1085    
1086                private void matchBody(Exchange exchange) {
1087                    if (current.incrementAndGet() > bodies.size()) {
1088                        // out of bounds
1089                        return;
1090                    }
1091    
1092                    Object actual = exchange.getIn().getBody();
1093                    Object expected = bodies.get(current.get() - 1);
1094                    matches = ObjectHelper.equal(expected, actual);
1095                }
1096    
1097                public boolean matches() {
1098                    if (exact) {
1099                        return matches && current.get() == bodies.size();
1100                    } else {
1101                        return matches && current.get() >= bodies.size();
1102                    }
1103                }
1104    
1105                @Override
1106                public void reset() {
1107                    matches = false;
1108                    current.set(0);
1109                }
1110    
1111                @Override
1112                public String toString() {
1113                    if (received) {
1114                        return "" + (exact ? "whenExactBodiesReceived(" : "whenBodiesReceived(") + bodies + ")";
1115                    } else {
1116                        return "" + (exact ? "whenExactBodiesDone(" : "whenBodiesDone(") + bodies + ")";
1117                    }
1118                }
1119            });
1120            return this;
1121        }
1122    
1123        /**
1124         * Prepares to append an additional expression using the <i>and</i> operator.
1125         *
1126         * @return the builder
1127         */
1128        public NotifyBuilder and() {
1129            doCreate(EventOperation.and);
1130            return this;
1131        }
1132    
1133        /**
1134         * Prepares to append an additional expression using the <i>or</i> operator.
1135         *
1136         * @return the builder
1137         */
1138        public NotifyBuilder or() {
1139            doCreate(EventOperation.or);
1140            return this;
1141        }
1142    
1143        /**
1144         * Prepares to append an additional expression using the <i>not</i> operator.
1145         *
1146         * @return the builder
1147         */
1148        public NotifyBuilder not() {
1149            doCreate(EventOperation.not);
1150            return this;
1151        }
1152    
1153        /**
1154         * Creates the expression this builder should use for matching.
1155         * <p/>
1156         * You must call this method when you are finished building the expressions.
1157         *
1158         * @return the created builder ready for matching
1159         */
1160        public NotifyBuilder create() {
1161            doCreate(EventOperation.and);
1162            created = true;
1163            return this;
1164        }
1165    
1166        /**
1167         * Does all the expression match?
1168         * <p/>
1169         * This operation will return immediately which means it can be used for testing at this very moment.
1170         *
1171         * @return <tt>true</tt> if matching, <tt>false</tt> otherwise
1172         */
1173        public boolean matches() {
1174            if (!created) {
1175                throw new IllegalStateException("NotifyBuilder has not been created. Invoke the create() method before matching.");
1176            }
1177            return matches;
1178        }
1179    
1180        /**
1181         * Does all the expression match?
1182         * <p/>
1183         * This operation will wait until the match is <tt>true</tt> or otherwise a timeout occur
1184         * which means <tt>false</tt> will be returned.
1185         *
1186         * @param timeout  the timeout value
1187         * @param timeUnit the time unit
1188         * @return <tt>true</tt> if matching, <tt>false</tt> otherwise due to timeout
1189         */
1190        public boolean matches(long timeout, TimeUnit timeUnit) {
1191            if (!created) {
1192                throw new IllegalStateException("NotifyBuilder has not been created. Invoke the create() method before matching.");
1193            }
1194            try {
1195                latch.await(timeout, timeUnit);
1196            } catch (InterruptedException e) {
1197                throw ObjectHelper.wrapRuntimeCamelException(e);
1198            }
1199            return matches();
1200        }
1201    
1202        /**
1203         * Does all the expressions match?
1204         * <p/>
1205         * This operation will wait until the match is <tt>true</tt> or otherwise a timeout occur
1206         * which means <tt>false</tt> will be returned.
1207         * <p/>
1208         * The timeout value is by default 10 seconds. But it will use the highest <i>maximum result wait time</i>
1209         * from the configured mocks, if such a value has been configured.
1210         * <p/>
1211         * This method is convenient to use in unit tests to have it adhere and wait
1212         * as long as the mock endpoints.
1213         *
1214         * @return <tt>true</tt> if matching, <tt>false</tt> otherwise due to timeout
1215         */
1216        public boolean matchesMockWaitTime() {
1217            if (!created) {
1218                throw new IllegalStateException("NotifyBuilder has not been created. Invoke the create() method before matching.");
1219            }
1220            long timeout = 0;
1221            for (Endpoint endpoint : context.getEndpoints()) {
1222                if (endpoint instanceof MockEndpoint) {
1223                    long waitTime = ((MockEndpoint) endpoint).getResultWaitTime();
1224                    if (waitTime > 0) {
1225                        timeout = Math.max(timeout, waitTime);
1226                    }
1227                }
1228            }
1229    
1230            // use 10 sec as default
1231            if (timeout == 0) {
1232                timeout = 10000;
1233            }
1234    
1235            return matches(timeout, TimeUnit.MILLISECONDS);
1236        }
1237    
1238        /**
1239         * Resets the notifier.
1240         */
1241        public void reset() {
1242            for (EventPredicateHolder predicate : predicates) {
1243                predicate.reset();
1244            }
1245            latch = new CountDownLatch(1);
1246            matches = false;
1247        }
1248    
1249        @Override
1250        public String toString() {
1251            StringBuilder sb = new StringBuilder();
1252            for (EventPredicateHolder eventPredicateHolder : predicates) {
1253                if (sb.length() > 0) {
1254                    sb.append(".");
1255                }
1256                sb.append(eventPredicateHolder.toString());
1257            }
1258            // a crude way of skipping the first invisible operation
1259            return ObjectHelper.after(sb.toString(), "().");
1260        }
1261    
1262        private void doCreate(EventOperation newOperation) {
1263            // init operation depending on the newOperation
1264            if (operation == null) {
1265                // if the first new operation is an or then this operation must be an or as well
1266                // otherwise it should be and based
1267                operation = newOperation == EventOperation.or ? EventOperation.or : EventOperation.and;
1268            }
1269    
1270            // we have some predicates
1271            if (!stack.isEmpty()) {
1272                // we only want to match from routes, so skip for example events
1273                // which is triggered by producer templates etc.
1274                fromRoutesOnly();
1275    
1276                // the stack must have at least one non abstract
1277                boolean found = false;
1278                for (EventPredicate predicate : stack) {
1279                    if (!predicate.isAbstract()) {
1280                        found = true;
1281                        break;
1282                    }
1283                }
1284                if (!found) {
1285                    throw new IllegalArgumentException("NotifyBuilder must contain at least one non-abstract predicate (such as whenDone)");
1286                }
1287    
1288                CompoundEventPredicate compound = new CompoundEventPredicate(stack);
1289                stack.clear();
1290                predicates.add(new EventPredicateHolder(operation, compound));
1291            }
1292    
1293            operation = newOperation;
1294            // reset wereSentTo index position as this its a new group
1295            wereSentToIndex = 0;
1296        }
1297    
1298        /**
1299         * Notifier which hooks into Camel to listen for {@link Exchange} relevant events for this builder
1300         */
1301        private final class ExchangeNotifier extends EventNotifierSupport {
1302    
1303            public void notify(EventObject event) throws Exception {
1304                if (event instanceof ExchangeCreatedEvent) {
1305                    onExchangeCreated((ExchangeCreatedEvent) event);
1306                } else if (event instanceof ExchangeCompletedEvent) {
1307                    onExchangeCompleted((ExchangeCompletedEvent) event);
1308                } else if (event instanceof ExchangeFailedEvent) {
1309                    onExchangeFailed((ExchangeFailedEvent) event);
1310                } else if (event instanceof ExchangeSentEvent) {
1311                    onExchangeSent((ExchangeSentEvent) event);
1312                }
1313    
1314                // now compute whether we matched
1315                computeMatches();
1316            }
1317    
1318            public boolean isEnabled(EventObject event) {
1319                return true;
1320            }
1321    
1322            private void onExchangeCreated(ExchangeCreatedEvent event) {
1323                for (EventPredicateHolder predicate : predicates) {
1324                    predicate.getPredicate().onExchangeCreated(event.getExchange());
1325                }
1326            }
1327    
1328            private void onExchangeCompleted(ExchangeCompletedEvent event) {
1329                for (EventPredicateHolder predicate : predicates) {
1330                    predicate.getPredicate().onExchangeCompleted(event.getExchange());
1331                }
1332            }
1333    
1334            private void onExchangeFailed(ExchangeFailedEvent event) {
1335                for (EventPredicateHolder predicate : predicates) {
1336                    predicate.getPredicate().onExchangeFailed(event.getExchange());
1337                }
1338            }
1339    
1340            private void onExchangeSent(ExchangeSentEvent event) {
1341                for (EventPredicateHolder predicate : predicates) {
1342                    predicate.getPredicate().onExchangeSent(event.getExchange(), event.getEndpoint(), event.getTimeTaken());
1343                }
1344            }
1345    
1346            private synchronized void computeMatches() {
1347                // use a temporary answer until we have computed the value to assign
1348                Boolean answer = null;
1349    
1350                for (EventPredicateHolder holder : predicates) {
1351                    EventOperation operation = holder.getOperation();
1352                    if (EventOperation.and == operation) {
1353                        if (holder.getPredicate().matches()) {
1354                            answer = true;
1355                        } else {
1356                            answer = false;
1357                            // and break out since its an AND so it must match
1358                            break;
1359                        }
1360                    } else if (EventOperation.or == operation) {
1361                        if (holder.getPredicate().matches()) {
1362                            answer = true;
1363                        }
1364                    } else if (EventOperation.not == operation) {
1365                        if (holder.getPredicate().matches()) {
1366                            answer = false;
1367                            // and break out since its a NOT so it must not match
1368                            break;
1369                        } else {
1370                            answer = true;
1371                        }
1372                    }
1373                }
1374    
1375                // if we did compute a value then assign that
1376                if (answer != null) {
1377                    matches = answer;
1378                    if (matches) {
1379                        // signal completion
1380                        latch.countDown();
1381                    }
1382                }
1383            }
1384    
1385            @Override
1386            protected void doStart() throws Exception {
1387                // we only care about Exchange events
1388                setIgnoreCamelContextEvents(true);
1389                setIgnoreRouteEvents(true);
1390                setIgnoreServiceEvents(true);
1391            }
1392    
1393            @Override
1394            protected void doStop() throws Exception {
1395            }
1396        }
1397    
1398        private enum EventOperation {
1399            and, or, not;
1400        }
1401    
1402        private interface EventPredicate {
1403    
1404            /**
1405             * Evaluates whether the predicate matched or not.
1406             *
1407             * @return <tt>true</tt> if matched, <tt>false</tt> otherwise
1408             */
1409            boolean matches();
1410    
1411            /**
1412             * Resets the predicate
1413             */
1414            void reset();
1415    
1416            /**
1417             * Whether the predicate is abstract
1418             */
1419            boolean isAbstract();
1420    
1421            /**
1422             * Callback for {@link Exchange} lifecycle
1423             *
1424             * @param exchange the exchange
1425             * @return <tt>true</tt> to allow continue evaluating, <tt>false</tt> to stop immediately
1426             */
1427            boolean onExchangeCreated(Exchange exchange);
1428    
1429            /**
1430             * Callback for {@link Exchange} lifecycle
1431             *
1432             * @param exchange the exchange
1433             * @return <tt>true</tt> to allow continue evaluating, <tt>false</tt> to stop immediately
1434             */
1435            boolean onExchangeCompleted(Exchange exchange);
1436    
1437            /**
1438             * Callback for {@link Exchange} lifecycle
1439             *
1440             * @param exchange the exchange
1441             * @return <tt>true</tt> to allow continue evaluating, <tt>false</tt> to stop immediately
1442             */
1443            boolean onExchangeFailed(Exchange exchange);
1444    
1445            /**
1446             * Callback for {@link Exchange} lifecycle
1447             *
1448             * @param exchange the exchange
1449             * @param endpoint the endpoint sent to
1450             * @param timeTaken time taken in millis to send the to endpoint
1451             * @return <tt>true</tt> to allow continue evaluating, <tt>false</tt> to stop immediately
1452             */
1453            boolean onExchangeSent(Exchange exchange, Endpoint endpoint, long timeTaken);
1454        }
1455    
1456        private abstract class EventPredicateSupport implements EventPredicate {
1457    
1458            public boolean isAbstract() {
1459                return false;
1460            }
1461    
1462            public void reset() {
1463                // noop
1464            }
1465    
1466            public boolean onExchangeCreated(Exchange exchange) {
1467                return onExchange(exchange);
1468            }
1469    
1470            public boolean onExchangeCompleted(Exchange exchange) {
1471                return onExchange(exchange);
1472            }
1473    
1474            public boolean onExchangeFailed(Exchange exchange) {
1475                return onExchange(exchange);
1476            }
1477    
1478            public boolean onExchangeSent(Exchange exchange, Endpoint endpoint, long timeTaken) {
1479                // no need to invoke onExchange as this is a special case when the Exchange
1480                // was sent to a specific endpoint
1481                return true;
1482            }
1483    
1484            public boolean onExchange(Exchange exchange) {
1485                return true;
1486            }
1487        }
1488    
1489        /**
1490         * To hold an operation and predicate
1491         */
1492        private final class EventPredicateHolder {
1493            private final EventOperation operation;
1494            private final EventPredicate predicate;
1495    
1496            private EventPredicateHolder(EventOperation operation, EventPredicate predicate) {
1497                this.operation = operation;
1498                this.predicate = predicate;
1499            }
1500    
1501            public EventOperation getOperation() {
1502                return operation;
1503            }
1504    
1505            public EventPredicate getPredicate() {
1506                return predicate;
1507            }
1508    
1509            public void reset() {
1510                predicate.reset();
1511            }
1512    
1513            @Override
1514            public String toString() {
1515                return operation.name() + "()." + predicate;
1516            }
1517        }
1518    
1519        /**
1520         * To hold multiple predicates which are part of same expression
1521         */
1522        private final class CompoundEventPredicate implements EventPredicate {
1523    
1524            private List<EventPredicate> predicates = new ArrayList<EventPredicate>();
1525    
1526            private CompoundEventPredicate(List<EventPredicate> predicates) {
1527                this.predicates.addAll(predicates);
1528            }
1529    
1530            public boolean isAbstract() {
1531                return false;
1532            }
1533    
1534            public boolean matches() {
1535                for (EventPredicate predicate : predicates) {
1536                    boolean answer = predicate.matches();
1537                    LOG.trace("matches() {} -> {}", predicate, answer);
1538                    if (!answer) {
1539                        // break at first false
1540                        return false;
1541                    }
1542                }
1543                return true;
1544            }
1545    
1546            public void reset() {
1547                for (EventPredicate predicate : predicates) {
1548                    LOG.trace("reset() {}", predicate);
1549                    predicate.reset();
1550                }
1551            }
1552    
1553            public boolean onExchangeCreated(Exchange exchange) {
1554                for (EventPredicate predicate : predicates) {
1555                    boolean answer = predicate.onExchangeCreated(exchange);
1556                    LOG.trace("onExchangeCreated() {} -> {}", predicate, answer);
1557                    if (!answer) {
1558                        // break at first false
1559                        return false;
1560                    }
1561                }
1562                return true;
1563            }
1564    
1565            public boolean onExchangeCompleted(Exchange exchange) {
1566                for (EventPredicate predicate : predicates) {
1567                    boolean answer = predicate.onExchangeCompleted(exchange);
1568                    LOG.trace("onExchangeCompleted() {} -> {}", predicate, answer);
1569                    if (!answer) {
1570                        // break at first false
1571                        return false;
1572                    }
1573                }
1574                return true;
1575            }
1576    
1577            public boolean onExchangeFailed(Exchange exchange) {
1578                for (EventPredicate predicate : predicates) {
1579                    boolean answer = predicate.onExchangeFailed(exchange);
1580                    LOG.trace("onExchangeFailed() {} -> {}", predicate, answer);
1581                    if (!answer) {
1582                        // break at first false
1583                        return false;
1584                    }
1585                }
1586                return true;
1587            }
1588    
1589            @Override
1590            public boolean onExchangeSent(Exchange exchange, Endpoint endpoint, long timeTaken) {
1591                for (EventPredicate predicate : predicates) {
1592                    boolean answer = predicate.onExchangeSent(exchange, endpoint, timeTaken);
1593                    LOG.trace("onExchangeSent() {} {} -> {}", new Object[]{endpoint, predicate, answer});
1594                    if (!answer) {
1595                        // break at first false
1596                        return false;
1597                    }
1598                }
1599                return true;
1600            }
1601    
1602            @Override
1603            public String toString() {
1604                StringBuilder sb = new StringBuilder();
1605                for (EventPredicate eventPredicate : predicates) {
1606                    if (sb.length() > 0) {
1607                        sb.append(".");
1608                    }
1609                    sb.append(eventPredicate.toString());
1610                }
1611                return sb.toString();
1612            }
1613        }
1614    
1615    }