001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.aggregate;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    
022    import org.apache.camel.Exchange;
023    
024    /**
025     * Aggregate all exchanges into a {@link List} of values defined by the {@link #getValue(Exchange)} call.
026     * The combined Exchange will hold all the aggregated exchanges in a {@link java.util.List}
027     * as a exchange property with the key {@link org.apache.camel.Exchange#GROUPED_EXCHANGE}.
028     * <p/>
029     * The method {@link #isStoreAsBodyOnCompletion()} determines if the aggregated {@link List} should
030     * be stored on the {@link org.apache.camel.Message#setBody(Object)} or be kept as a property
031     * on the exchange.
032     * <br/>
033     * The default behavior to store as message body, allows to more easily group together a list of values
034     * and have its result stored as a {@link List} on the completed {@link Exchange}.
035     *
036     * @since 2.11
037     */
038    public abstract class AbstractListAggregationStrategy<V> implements CompletionAwareAggregationStrategy {
039    
040        /**
041         * This method is implemented by the sub-class and is called to retrieve
042         * an instance of the value that will be aggregated and forwarded to the
043         * receiving end point.
044         * <p/>
045         * If <tt>null</tt> is returned, then the value is <b>not</b> added to the {@link List}.
046         *
047         * @param exchange  The exchange that is used to retrieve the value from
048         * @return An instance of V that is the associated value of the passed exchange
049         */
050        public abstract V getValue(Exchange exchange);
051    
052        /**
053         * Whether to store the completed aggregated {@link List} as message body, or to keep as property on the exchange.
054         * <p/>
055         * The default behavior is <tt>true</tt> to store as message body.
056         *
057         * @return <tt>true</tt> to store as message body, <tt>false</tt> to keep as property on the exchange.
058         */
059        public boolean isStoreAsBodyOnCompletion() {
060            return true;
061        }
062    
063        @SuppressWarnings("unchecked")
064        public void onCompletion(Exchange exchange) {
065            if (isStoreAsBodyOnCompletion()) {
066                List<V> list = (List<V>) exchange.removeProperty(Exchange.GROUPED_EXCHANGE);
067                if (list != null) {
068                    exchange.getIn().setBody(list);
069                }
070            }
071        }
072    
073        /**
074         * This method will aggregate the old and new exchange and return the result.
075         *
076         * @param oldExchange The oldest exchange, can be null
077         * @param newExchange The newest exchange, can be null
078         * @return a composite exchange of the old and/or new exchanges
079         */
080        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
081            List<V> list;
082    
083            if (oldExchange == null) {
084                list = getList(newExchange);
085            } else {
086                list = getList(oldExchange);
087            }
088    
089            if (newExchange != null) {
090                V value = getValue(newExchange);
091                if (value != null) {
092                    list.add(value);
093                }
094            }
095    
096            return oldExchange != null ? oldExchange : newExchange;
097        }
098    
099        @SuppressWarnings("unchecked")
100        private List<V> getList(Exchange exchange) {
101            List<V> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
102            if (list == null) {
103                list = new GroupedExchangeList();
104                exchange.setProperty(Exchange.GROUPED_EXCHANGE, list);
105            }
106            return list;
107        }
108    
109        /**
110         * A list to contains grouped {@link Exchange}s.
111         */
112        private static final class GroupedExchangeList extends ArrayList {
113    
114            @Override
115            public String toString() {
116                // lets override toString so we don't write data for all the Exchanges by default
117                return "List<Exchange>(" + size() + " elements)";
118            }
119        }
120    
121    }