001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.dataset;
018    
019    import java.util.concurrent.atomic.AtomicInteger;
020    
021    import org.apache.camel.Component;
022    import org.apache.camel.Consumer;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Message;
025    import org.apache.camel.Processor;
026    import org.apache.camel.Service;
027    import org.apache.camel.component.mock.MockEndpoint;
028    import org.apache.camel.processor.ThroughputLogger;
029    import org.apache.camel.spi.UriEndpoint;
030    import org.apache.camel.spi.UriParam;
031    import org.apache.camel.util.CamelLogger;
032    import org.apache.camel.util.ExchangeHelper;
033    import org.apache.camel.util.ObjectHelper;
034    import org.apache.camel.util.URISupport;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * Endpoint for DataSet.
040     *
041     * @version 
042     */
043    @UriEndpoint(scheme = "dataset", consumerClass = DataSetConsumer.class)
044    public class DataSetEndpoint extends MockEndpoint implements Service {
045        private final transient Logger log;
046        private volatile DataSet dataSet;
047        private final AtomicInteger receivedCounter = new AtomicInteger();
048        @UriParam
049        private int minRate;
050        @UriParam
051        private long produceDelay = 3;
052        @UriParam
053        private long consumeDelay;
054        @UriParam
055        private long preloadSize;
056        @UriParam
057        private long initialDelay = 1000;
058    
059        @Deprecated
060        public DataSetEndpoint() {
061            this.log = LoggerFactory.getLogger(DataSetEndpoint.class);
062            // optimize as we dont need to copy the exchange
063            copyOnExchange = false;
064        }
065    
066        public DataSetEndpoint(String endpointUri, Component component, DataSet dataSet) {
067            super(endpointUri, component);
068            this.dataSet = dataSet;
069            this.log = LoggerFactory.getLogger(endpointUri);
070            // optimize as we dont need to copy the exchange
071            copyOnExchange = false;
072        }
073    
074        public static void assertEquals(String description, Object expected, Object actual, Exchange exchange) {
075            if (!ObjectHelper.equal(expected, actual)) {
076                throw new AssertionError(description + " does not match. Expected: " + expected + " but was: " + actual + " on " + exchange + " with headers: " + exchange.getIn().getHeaders());
077            }
078        }
079    
080        @Override
081        public Consumer createConsumer(Processor processor) throws Exception {
082            Consumer answer = new DataSetConsumer(this, processor);
083            configureConsumer(answer);
084            return answer;
085        }
086    
087        @Override
088        public void reset() {
089            super.reset();
090            receivedCounter.set(0);
091        }
092    
093        @Override
094        public int getReceivedCounter() {
095            return receivedCounter.get();
096        }
097    
098        /**
099         * Creates a message exchange for the given index in the {@link DataSet}
100         */
101        public Exchange createExchange(long messageIndex) throws Exception {
102            Exchange exchange = createExchange();
103            getDataSet().populateMessage(exchange, messageIndex);
104    
105            Message in = exchange.getIn();
106            in.setHeader(Exchange.DATASET_INDEX, messageIndex);
107    
108            return exchange;
109        }
110    
111        public int getMinRate() {
112            return minRate;
113        }
114    
115        public void setMinRate(int minRate) {
116            this.minRate = minRate;
117        }
118    
119        @Override
120        protected void waitForCompleteLatch(long timeout) throws InterruptedException {
121            super.waitForCompleteLatch(timeout);
122    
123            if (minRate > 0) {
124                int count = getReceivedCounter();
125                do {
126                    // wait as long as we get a decent message rate
127                    super.waitForCompleteLatch(1000L);
128                    count = getReceivedCounter() - count;
129                } while (count >= minRate);
130            }
131        }
132    
133        // Properties
134        //-------------------------------------------------------------------------
135    
136        public DataSet getDataSet() {
137            return dataSet;
138        }
139    
140        public void setDataSet(DataSet dataSet) {
141            this.dataSet = dataSet;
142        }
143    
144        public long getPreloadSize() {
145            return preloadSize;
146        }
147    
148        /**
149         * Sets how many messages should be preloaded (sent) before the route completes its initialization
150         */
151        public void setPreloadSize(long preloadSize) {
152            this.preloadSize = preloadSize;
153        }
154    
155        public long getConsumeDelay() {
156            return consumeDelay;
157        }
158    
159        /**
160         * Allows a delay to be specified which causes consumers to pause - to simulate slow consumers
161         */
162        public void setConsumeDelay(long consumeDelay) {
163            this.consumeDelay = consumeDelay;
164        }
165    
166        public long getProduceDelay() {
167            return produceDelay;
168        }
169    
170        /**
171         * Allows a delay to be specified which causes producers to pause - to simulate slow producers
172         */
173        public void setProduceDelay(long produceDelay) {
174            this.produceDelay = produceDelay;
175        }
176    
177        public long getInitialDelay() {
178            return initialDelay;
179        }
180    
181        public void setInitialDelay(long initialDelay) {
182            this.initialDelay = initialDelay;
183        }
184    
185        // Implementation methods
186        //-------------------------------------------------------------------------
187    
188        @Override
189        protected void performAssertions(Exchange actual, Exchange copy) throws Exception {
190            int receivedCount = receivedCounter.incrementAndGet();
191            long index = receivedCount - 1;
192            Exchange expected = createExchange(index);
193    
194            // now let's assert that they are the same
195            if (log.isDebugEnabled()) {
196                log.debug("Received message: {} (DataSet index={}) = {}",
197                        new Object[]{index, copy.getIn().getHeader(Exchange.DATASET_INDEX, Integer.class), copy});
198            }
199    
200            assertMessageExpected(index, expected, copy);
201    
202            if (consumeDelay > 0) {
203                Thread.sleep(consumeDelay);
204            }
205        }
206    
207        protected void assertMessageExpected(long index, Exchange expected, Exchange actual) throws Exception {
208            long actualCounter = ExchangeHelper.getMandatoryHeader(actual, Exchange.DATASET_INDEX, Long.class);
209            assertEquals("Header: " + Exchange.DATASET_INDEX, index, actualCounter, actual);
210    
211            getDataSet().assertMessageExpected(this, expected, actual, index);
212        }
213    
214        protected ThroughputLogger createReporter() {
215            // must sanitize uri to avoid logging sensitive information
216            String uri = URISupport.sanitizeUri(getEndpointUri());
217            CamelLogger logger = new CamelLogger(uri);
218            ThroughputLogger answer = new ThroughputLogger(logger, (int) this.getDataSet().getReportCount());
219            answer.setAction("Received");
220            return answer;
221        }
222    
223        @Override
224        protected void doStart() throws Exception {
225            super.doStart();
226    
227            long size = getDataSet().getSize();
228            expectedMessageCount((int) size);
229            if (reporter == null) {
230                reporter = createReporter();
231            }
232            log.info(this + " expecting " + size + " messages");
233        }
234    
235    }