001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.directvm;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.Map;
022    import java.util.concurrent.ConcurrentHashMap;
023    import java.util.concurrent.ConcurrentMap;
024    import java.util.concurrent.atomic.AtomicInteger;
025    
026    import org.apache.camel.Endpoint;
027    import org.apache.camel.impl.UriEndpointComponent;
028    
029    /**
030     * Represents the component that manages {@link DirectVmEndpoint}. It holds the
031     * list of named direct-vm endpoints.
032     */
033    public class DirectVmComponent extends UriEndpointComponent {
034    
035        private static final AtomicInteger START_COUNTER = new AtomicInteger();
036    
037        // must keep a map of consumers on the component to ensure endpoints can lookup old consumers
038        // later in case the DirectVmEndpoint was re-created due the old was evicted from the endpoints LRUCache
039        // on DefaultCamelContext
040        private static final ConcurrentMap<String, DirectVmConsumer> CONSUMERS = new ConcurrentHashMap<String, DirectVmConsumer>();
041        private boolean block;
042        private long timeout = 30000L;
043    
044        public DirectVmComponent() {
045            super(DirectVmEndpoint.class);
046        }
047    
048        /**
049         * Gets all the consumer endpoints.
050         *
051         * @return consumer endpoints
052         */
053        public static Collection<Endpoint> getConsumerEndpoints() {
054            Collection<Endpoint> endpoints = new ArrayList<Endpoint>(CONSUMERS.size());
055            for (DirectVmConsumer consumer : CONSUMERS.values()) {
056                endpoints.add(consumer.getEndpoint());
057            }
058            return endpoints;
059        }
060    
061        @Override
062        protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
063            DirectVmEndpoint answer = new DirectVmEndpoint(uri, this);
064            answer.setBlock(block);
065            answer.setTimeout(timeout);
066            answer.configureProperties(parameters);
067            return answer;
068        }
069    
070        public DirectVmConsumer getConsumer(DirectVmEndpoint endpoint) {
071            String key = getConsumerKey(endpoint.getEndpointUri());
072            return CONSUMERS.get(key);
073        }
074    
075        public void addConsumer(DirectVmEndpoint endpoint, DirectVmConsumer consumer) {
076            String key = getConsumerKey(endpoint.getEndpointUri());
077            DirectVmConsumer existing = CONSUMERS.putIfAbsent(key, consumer);
078            if (existing != null) {
079                String contextId = existing.getEndpoint().getCamelContext().getName();
080                throw new IllegalStateException("A consumer " + existing + " already exists from CamelContext: " + contextId + ". Multiple consumers not supported");
081            }
082        }
083    
084        public void removeConsumer(DirectVmEndpoint endpoint, DirectVmConsumer consumer) {
085            String key = getConsumerKey(endpoint.getEndpointUri());
086            CONSUMERS.remove(key);
087        }
088    
089        private static String getConsumerKey(String uri) {
090            if (uri.contains("?")) {
091                // strip parameters
092                uri = uri.substring(0, uri.indexOf('?'));
093            }
094            return uri;
095        }
096    
097        @Override
098        protected void doStart() throws Exception {
099            super.doStart();
100            START_COUNTER.incrementAndGet();
101        }
102    
103        @Override
104        protected void doStop() throws Exception {
105            if (START_COUNTER.decrementAndGet() <= 0) {
106                // clear queues when no more direct-vm components in use
107                CONSUMERS.clear();
108            }
109            super.doStop();
110        }
111    
112        public boolean isBlock() {
113            return block;
114        }
115    
116        public void setBlock(boolean block) {
117            this.block = block;
118        }
119    
120        public long getTimeout() {
121            return timeout;
122        }
123    
124        public void setTimeout(long timeout) {
125            this.timeout = timeout;
126        }
127    }