001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.dataset;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    
022    import org.apache.camel.Exchange;
023    import org.apache.camel.Message;
024    import org.apache.camel.Processor;
025    
026    /**
027     * Base class for DataSet
028     *
029     * @version 
030     */
031    public abstract class DataSetSupport implements DataSet {
032        private Map<String, Object> defaultHeaders;
033        private Processor outputTransformer;
034        private long size = 10;
035        private long reportCount = -1;
036    
037        public DataSetSupport() {
038        }
039    
040        public DataSetSupport(int size) {
041            setSize(size);
042        }
043    
044        public void populateMessage(Exchange exchange, long messageIndex) throws Exception {
045            Message in = exchange.getIn();
046            in.setBody(createMessageBody(messageIndex));
047            in.setHeaders(getDefaultHeaders());
048            applyHeaders(exchange, messageIndex);
049    
050            if (outputTransformer != null) {
051                outputTransformer.process(exchange);
052            }
053        }
054    
055        public void assertMessageExpected(DataSetEndpoint dataSetEndpoint, Exchange expected, Exchange actual, long index) throws Exception {
056            Object expectedBody = expected.getIn().getBody();
057            Object actualBody = actual.getIn().getBody();
058            if (expectedBody != null) {
059                // let's coerce to the correct type
060                actualBody = actual.getIn().getMandatoryBody(expectedBody.getClass());
061            }
062            DataSetEndpoint.assertEquals("message body", expectedBody, actualBody, actual);
063        }
064    
065        // Properties
066        //-------------------------------------------------------------------------
067    
068        public long getSize() {
069            return size;
070        }
071    
072        public void setSize(long size) {
073            this.size = size;
074        }
075    
076        public long getReportCount() {
077            if (reportCount <= 0) {
078                reportCount = getSize() / 5;
079            }
080            // report cannot be 0 then default to the size
081            if (reportCount == 0) {
082                reportCount = getSize();
083            }
084            return reportCount;
085        }
086    
087        /**
088         * Sets the number of messages in a group on which we will report that messages have been received.
089         */
090        public void setReportCount(long reportCount) {
091            this.reportCount = reportCount;
092        }
093    
094        public Map<String, Object> getDefaultHeaders() {
095            if (defaultHeaders == null) {
096                defaultHeaders = new HashMap<String, Object>();
097                populateDefaultHeaders(defaultHeaders);
098            }
099            return defaultHeaders;
100        }
101    
102        public void setDefaultHeaders(Map<String, Object> defaultHeaders) {
103            this.defaultHeaders = defaultHeaders;
104        }
105    
106        public Processor getOutputTransformer() {
107            return outputTransformer;
108        }
109    
110        public void setOutputTransformer(Processor outputTransformer) {
111            this.outputTransformer = outputTransformer;
112        }
113    
114        // Implementation methods
115        //-------------------------------------------------------------------------
116    
117        protected abstract Object createMessageBody(long messageIndex);
118    
119        /**
120         * Allows derived classes to add some custom headers for a given message
121         */
122        protected void applyHeaders(Exchange exchange, long messageIndex) {
123        }
124    
125        /**
126         * Allows derived classes to customize a default set of properties
127         */
128        protected void populateDefaultHeaders(Map<String, Object> map) {
129        }
130    }