001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.seda;
018    
019    import java.util.ArrayList;
020    import java.util.HashSet;
021    import java.util.List;
022    import java.util.Set;
023    import java.util.concurrent.BlockingQueue;
024    import java.util.concurrent.CopyOnWriteArraySet;
025    import java.util.concurrent.ExecutorService;
026    
027    import org.apache.camel.Component;
028    import org.apache.camel.Consumer;
029    import org.apache.camel.Exchange;
030    import org.apache.camel.Message;
031    import org.apache.camel.MultipleConsumersSupport;
032    import org.apache.camel.PollingConsumer;
033    import org.apache.camel.Processor;
034    import org.apache.camel.Producer;
035    import org.apache.camel.WaitForTaskToComplete;
036    import org.apache.camel.api.management.ManagedAttribute;
037    import org.apache.camel.api.management.ManagedOperation;
038    import org.apache.camel.api.management.ManagedResource;
039    import org.apache.camel.impl.DefaultEndpoint;
040    import org.apache.camel.processor.MulticastProcessor;
041    import org.apache.camel.spi.BrowsableEndpoint;
042    import org.apache.camel.spi.UriEndpoint;
043    import org.apache.camel.spi.UriParam;
044    import org.apache.camel.util.EndpointHelper;
045    import org.apache.camel.util.MessageHelper;
046    import org.apache.camel.util.ServiceHelper;
047    import org.apache.camel.util.URISupport;
048    import org.slf4j.Logger;
049    import org.slf4j.LoggerFactory;
050    
051    /**
052     * An implementation of the <a
053     * href="http://camel.apache.org/queue.html">Queue components</a> for
054     * asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
055     */
056    @ManagedResource(description = "Managed SedaEndpoint")
057    @UriEndpoint(scheme = "seda", consumerClass = SedaConsumer.class)
058    public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, MultipleConsumersSupport {
059        private static final Logger LOG = LoggerFactory.getLogger(SedaEndpoint.class);
060        private volatile BlockingQueue<Exchange> queue;
061        private final Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>();
062        private final Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>();
063        private volatile MulticastProcessor consumerMulticastProcessor;
064        private volatile boolean multicastStarted;
065        private volatile ExecutorService multicastExecutor;
066        @UriParam
067        private int size = Integer.MAX_VALUE;
068        @UriParam
069        private int concurrentConsumers = 1;
070        @UriParam
071        private boolean multipleConsumers;
072        @UriParam
073        private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
074        @UriParam
075        private long timeout = 30000;
076        @UriParam
077        private boolean blockWhenFull;
078        @UriParam
079        private int pollTimeout = 1000;
080        @UriParam
081        private boolean purgeWhenStopping;
082    
083        @UriParam
084        private boolean failIfNoConsumers;
085    
086        private BlockingQueueFactory<Exchange> queueFactory;
087    
088        public SedaEndpoint() {
089            queueFactory = new LinkedBlockingQueueFactory<Exchange>();
090        }
091    
092        public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) {
093            this(endpointUri, component, queue, 1);
094        }
095    
096        public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) {
097            this(endpointUri, component, concurrentConsumers);
098            this.queue = queue;
099            if (queue != null) {
100                this.size = queue.remainingCapacity();
101            }
102            queueFactory = new LinkedBlockingQueueFactory<Exchange>();
103            getComponent().registerQueue(this, queue);
104        }
105    
106        public SedaEndpoint(String endpointUri, Component component, BlockingQueueFactory<Exchange> queueFactory, int concurrentConsumers) {
107            this(endpointUri, component, concurrentConsumers);
108            this.queueFactory = queueFactory;
109        }
110    
111        private SedaEndpoint(String endpointUri, Component component, int concurrentConsumers) {
112            super(endpointUri, component);
113            this.concurrentConsumers = concurrentConsumers;
114        }
115    
116        @Override
117        public SedaComponent getComponent() {
118            return (SedaComponent) super.getComponent();
119        }
120    
121        public Producer createProducer() throws Exception {
122            return new SedaProducer(this, getWaitForTaskToComplete(), getTimeout(), isBlockWhenFull());
123        }
124    
125        public Consumer createConsumer(Processor processor) throws Exception {
126            if (getComponent() != null) {
127                // all consumers must match having the same multipleConsumers options
128                String key = getComponent().getQueueKey(getEndpointUri());
129                QueueReference ref = getComponent().getQueueReference(key);
130                if (ref != null && ref.getMultipleConsumers() != isMultipleConsumers()) {
131                    // there is already a multiple consumers, so make sure they matches
132                    throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue multiple consumers "
133                            + ref.getMultipleConsumers() + " does not match given multiple consumers " + multipleConsumers);
134                }
135            }
136    
137            Consumer answer = new SedaConsumer(this, processor);
138            configureConsumer(answer);
139            return answer;
140        }
141    
142        @Override
143        public PollingConsumer createPollingConsumer() throws Exception {
144            SedaPollingConsumer answer = new SedaPollingConsumer(this);
145            configureConsumer(answer);
146            return answer;
147        }
148    
149        public synchronized BlockingQueue<Exchange> getQueue() {
150            if (queue == null) {
151                // prefer to lookup queue from component, so if this endpoint is re-created or re-started
152                // then the existing queue from the component can be used, so new producers and consumers
153                // can use the already existing queue referenced from the component
154                if (getComponent() != null) {
155                    // use null to indicate default size (= use what the existing queue has been configured with)
156                    Integer size = getSize() == Integer.MAX_VALUE ? null : getSize();
157                    QueueReference ref = getComponent().getOrCreateQueue(this, size, isMultipleConsumers(), queueFactory);
158                    queue = ref.getQueue();
159                    String key = getComponent().getQueueKey(getEndpointUri());
160                    LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() !=  null ? ref.getSize() : Integer.MAX_VALUE});
161                    // and set the size we are using
162                    if (ref.getSize() != null) {
163                        setSize(ref.getSize());
164                    }
165                } else {
166                    // fallback and create queue (as this endpoint has no component)
167                    queue = createQueue();
168                    LOG.info("Endpoint {} is using queue: {} with size: {}", new Object[]{this, getEndpointUri(), getSize()});
169                }
170            }
171            return queue;
172        }
173    
174        protected BlockingQueue<Exchange> createQueue() {
175            if (size > 0) {
176                return queueFactory.create(size);
177            } else {
178                return queueFactory.create();
179            }
180        }
181    
182        public synchronized QueueReference getQueueReference() {
183            String key = getComponent().getQueueKey(getEndpointUri());
184            QueueReference ref =  getComponent().getQueueReference(key);
185            if (ref == null) {
186                LOG.warn("There was no queue reference for the endpoint {0}", getEndpointUri());
187            }
188            return ref;
189        }
190    
191        protected synchronized MulticastProcessor getConsumerMulticastProcessor() throws Exception {
192            if (!multicastStarted && consumerMulticastProcessor != null) {
193                // only start it on-demand to avoid starting it during stopping
194                ServiceHelper.startService(consumerMulticastProcessor);
195                multicastStarted = true;
196            }
197            return consumerMulticastProcessor;
198        }
199    
200        protected synchronized void updateMulticastProcessor() throws Exception {
201            // only needed if we support multiple consumers
202            if (!isMultipleConsumersSupported()) {
203                return;
204            }
205    
206            // stop old before we create a new
207            if (consumerMulticastProcessor != null) {
208                ServiceHelper.stopService(consumerMulticastProcessor);
209                consumerMulticastProcessor = null;
210            }
211    
212            int size = getConsumers().size();
213            if (size >= 1) {
214                if (multicastExecutor == null) {
215                    // create multicast executor as we need it when we have more than 1 processor
216                    multicastExecutor = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, URISupport.sanitizeUri(getEndpointUri()) + "(multicast)");
217                }
218                // create list of consumers to multicast to
219                List<Processor> processors = new ArrayList<Processor>(size);
220                for (SedaConsumer consumer : getConsumers()) {
221                    processors.add(consumer.getProcessor());
222                }
223                // create multicast processor
224                multicastStarted = false;
225                consumerMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null, true, multicastExecutor, false, false, false, 0, null, false);
226            }
227        }
228    
229        public void setQueue(BlockingQueue<Exchange> queue) {
230            this.queue = queue;
231            this.size = queue.remainingCapacity();
232        }
233    
234        @ManagedAttribute(description = "Queue max capacity")
235        public int getSize() {
236            return size;
237        }
238    
239        public void setSize(int size) {
240            this.size = size;
241        }
242    
243        @ManagedAttribute(description = "Current queue size")
244        public int getCurrentQueueSize() {
245            return queue.size();
246        }
247    
248        public void setBlockWhenFull(boolean blockWhenFull) {
249            this.blockWhenFull = blockWhenFull;
250        }
251    
252        @ManagedAttribute(description = "Whether the caller will block sending to a full queue")
253        public boolean isBlockWhenFull() {
254            return blockWhenFull;
255        }
256    
257        public void setConcurrentConsumers(int concurrentConsumers) {
258            this.concurrentConsumers = concurrentConsumers;
259        }
260    
261        @ManagedAttribute(description = "Number of concurrent consumers")
262        public int getConcurrentConsumers() {
263            return concurrentConsumers;
264        }
265    
266        public WaitForTaskToComplete getWaitForTaskToComplete() {
267            return waitForTaskToComplete;
268        }
269    
270        public void setWaitForTaskToComplete(WaitForTaskToComplete waitForTaskToComplete) {
271            this.waitForTaskToComplete = waitForTaskToComplete;
272        }
273    
274        @ManagedAttribute
275        public long getTimeout() {
276            return timeout;
277        }
278    
279        public void setTimeout(long timeout) {
280            this.timeout = timeout;
281        }
282    
283        @ManagedAttribute
284        public boolean isFailIfNoConsumers() {
285            return failIfNoConsumers;
286        }
287    
288        public void setFailIfNoConsumers(boolean failIfNoConsumers) {
289            this.failIfNoConsumers = failIfNoConsumers;
290        }
291    
292        @ManagedAttribute
293        public boolean isMultipleConsumers() {
294            return multipleConsumers;
295        }
296    
297        public void setMultipleConsumers(boolean multipleConsumers) {
298            this.multipleConsumers = multipleConsumers;
299        }
300    
301        @ManagedAttribute
302        public int getPollTimeout() {
303            return pollTimeout;
304        }
305    
306        public void setPollTimeout(int pollTimeout) {
307            this.pollTimeout = pollTimeout;
308        }
309    
310        @ManagedAttribute
311        public boolean isPurgeWhenStopping() {
312            return purgeWhenStopping;
313        }
314    
315        public void setPurgeWhenStopping(boolean purgeWhenStopping) {
316            this.purgeWhenStopping = purgeWhenStopping;
317        }
318    
319        @ManagedAttribute(description = "Singleton")
320        public boolean isSingleton() {
321            return true;
322        }
323    
324        /**
325         * Returns the current pending exchanges
326         */
327        public List<Exchange> getExchanges() {
328            return new ArrayList<Exchange>(getQueue());
329        }
330    
331        @ManagedAttribute
332        public boolean isMultipleConsumersSupported() {
333            return isMultipleConsumers();
334        }
335    
336        /**
337         * Purges the queue
338         */
339        @ManagedOperation(description = "Purges the seda queue")
340        public void purgeQueue() {
341            LOG.debug("Purging queue with {} exchanges", queue.size());
342            queue.clear();
343        }
344    
345        /**
346         * Returns the current active consumers on this endpoint
347         */
348        public Set<SedaConsumer> getConsumers() {
349            return new HashSet<SedaConsumer>(consumers);
350        }
351    
352        /**
353         * Returns the current active producers on this endpoint
354         */
355        public Set<SedaProducer> getProducers() {
356            return new HashSet<SedaProducer>(producers);
357        }
358    
359        @ManagedOperation(description = "Current number of Exchanges in Queue")
360        public long queueSize() {
361            return getExchanges().size();
362        }
363    
364        @ManagedOperation(description = "Get Exchange from queue by index")
365        public String browseExchange(Integer index) {
366            List<Exchange> exchanges = getExchanges();
367            if (index >= exchanges.size()) {
368                return null;
369            }
370            Exchange exchange = exchanges.get(index);
371            if (exchange == null) {
372                return null;
373            }
374            // must use java type with JMX such as java.lang.String
375            return exchange.toString();
376        }
377    
378        @ManagedOperation(description = "Get message body from queue by index")
379        public String browseMessageBody(Integer index) {
380            List<Exchange> exchanges = getExchanges();
381            if (index >= exchanges.size()) {
382                return null;
383            }
384            Exchange exchange = exchanges.get(index);
385            if (exchange == null) {
386                return null;
387            }
388    
389            // must use java type with JMX such as java.lang.String
390            String body;
391            if (exchange.hasOut()) {
392                body = exchange.getOut().getBody(String.class);
393            } else {
394                body = exchange.getIn().getBody(String.class);
395            }
396    
397            return body;
398        }
399    
400        @ManagedOperation(description = "Get message as XML from queue by index")
401        public String browseMessageAsXml(Integer index, Boolean includeBody) {
402            List<Exchange> exchanges = getExchanges();
403            if (index >= exchanges.size()) {
404                return null;
405            }
406            Exchange exchange = exchanges.get(index);
407            if (exchange == null) {
408                return null;
409            }
410    
411            Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
412            String xml = MessageHelper.dumpAsXml(msg, includeBody);
413    
414            return xml;
415        }
416    
417        @ManagedOperation(description = "Gets all the messages as XML from the queue")
418        public String browseAllMessagesAsXml(Boolean includeBody) {
419            return browseRangeMessagesAsXml(0, Integer.MAX_VALUE, includeBody);
420        }
421    
422        @ManagedOperation(description = "Gets the range of messages as XML from the queue")
423        public String browseRangeMessagesAsXml(Integer fromIndex, Integer toIndex, Boolean includeBody) {
424            return EndpointHelper.browseRangeMessagesAsXml(this, fromIndex, toIndex, includeBody);
425        }
426    
427        @ManagedAttribute(description = "Camel context ID")
428        public String getCamelId() {
429            return getCamelContext().getName();
430        }
431    
432        @ManagedAttribute(description = "Camel ManagementName")
433        public String getCamelManagementName() {
434            return getCamelContext().getManagementName();
435        }
436    
437        @ManagedAttribute(description = "Endpoint URI", mask = true)
438        public String getEndpointUri() {
439            return super.getEndpointUri();
440        }
441    
442        @ManagedAttribute(description = "Endpoint service state")
443        public String getState() {
444            return getStatus().name();
445        }
446    
447        void onStarted(SedaProducer producer) {
448            producers.add(producer);
449        }
450    
451        void onStopped(SedaProducer producer) {
452            producers.remove(producer);
453        }
454    
455        void onStarted(SedaConsumer consumer) throws Exception {
456            consumers.add(consumer);
457            if (isMultipleConsumers()) {
458                updateMulticastProcessor();
459            }
460        }
461    
462        void onStopped(SedaConsumer consumer) throws Exception {
463            consumers.remove(consumer);
464            if (isMultipleConsumers()) {
465                updateMulticastProcessor();
466            }
467        }
468    
469        public boolean hasConsumers() {
470            return this.consumers.size() > 0;
471        }
472    
473        @Override
474        protected void doStart() throws Exception {
475            super.doStart();
476    
477            // force creating queue when starting
478            if (queue == null) {
479                queue = getQueue();
480            }
481    
482            // special for unit testing where we can set a system property to make seda poll faster
483            // and therefore also react faster upon shutdown, which makes overall testing faster of the Camel project
484            String override = System.getProperty("CamelSedaPollTimeout", "" + getPollTimeout());
485            setPollTimeout(Integer.valueOf(override));
486        }
487    
488        @Override
489        public void stop() throws Exception {
490            if (getConsumers().isEmpty()) {
491                super.stop();
492            } else {
493                LOG.debug("There is still active consumers.");
494            }
495        }
496    
497        @Override
498        public void shutdown() throws Exception {
499            if (shutdown.get()) {
500                LOG.trace("Service already shut down");
501                return;
502            }
503    
504            // notify component we are shutting down this endpoint
505            if (getComponent() != null) {
506                getComponent().onShutdownEndpoint(this);
507            }
508    
509            if (getConsumers().isEmpty()) {
510                super.shutdown();
511            } else {
512                LOG.debug("There is still active consumers.");
513            }
514        }
515    
516        @Override
517        protected void doShutdown() throws Exception {
518            // shutdown thread pool if it was in use
519            if (multicastExecutor != null) {
520                getCamelContext().getExecutorServiceManager().shutdownNow(multicastExecutor);
521                multicastExecutor = null;
522            }
523    
524            // clear queue, as we are shutdown, so if re-created then the queue must be updated
525            queue = null;
526        }
527    
528    }