001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.direct;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    
022    import org.apache.camel.Component;
023    import org.apache.camel.Consumer;
024    import org.apache.camel.Processor;
025    import org.apache.camel.Producer;
026    import org.apache.camel.impl.DefaultEndpoint;
027    import org.apache.camel.spi.UriEndpoint;
028    import org.apache.camel.spi.UriParam;
029    import org.apache.camel.util.ObjectHelper;
030    
031    /**
032     * Represents a direct endpoint that synchronously invokes the consumer of the
033     * endpoint when a producer sends a message to it.
034     *
035     * @version 
036     */
037    @UriEndpoint(scheme = "direct", consumerClass = DirectConsumer.class)
038    public class DirectEndpoint extends DefaultEndpoint {
039    
040        private volatile Map<String, DirectConsumer> consumers;
041        @UriParam
042        private boolean block;
043        @UriParam
044        private long timeout = 30000L;
045    
046        public DirectEndpoint() {
047            this.consumers = new HashMap<String, DirectConsumer>();
048        }
049    
050        public DirectEndpoint(String endpointUri, Component component) {
051            this(endpointUri, component, new HashMap<String, DirectConsumer>());
052        }
053    
054        public DirectEndpoint(String uri, Component component, Map<String, DirectConsumer> consumers) {
055            super(uri, component);
056            this.consumers = consumers;
057        }
058    
059        public Producer createProducer() throws Exception {
060            if (block) {
061                return new DirectBlockingProducer(this);
062            } else {
063                return new DirectProducer(this);
064            }
065        }
066    
067        public Consumer createConsumer(Processor processor) throws Exception {
068            Consumer answer = new DirectConsumer(this, processor);
069            configureConsumer(answer);
070            return answer;
071        }
072    
073        public boolean isSingleton() {
074            return true;
075        }
076    
077        public void addConsumer(DirectConsumer consumer) {
078            String key = consumer.getEndpoint().getKey();
079            consumers.put(key, consumer);
080        }
081    
082        public void removeConsumer(DirectConsumer consumer) {
083            String key = consumer.getEndpoint().getKey();
084            consumers.remove(key);
085        }
086    
087        public boolean hasConsumer(DirectConsumer consumer) {
088            String key = consumer.getEndpoint().getKey();
089            return consumers.containsKey(key);
090        }
091    
092        public DirectConsumer getConsumer() {
093            String key = getKey();
094            return consumers.get(key);
095        }
096    
097        public boolean isBlock() {
098            return block;
099        }
100    
101        public void setBlock(boolean block) {
102            this.block = block;
103        }
104    
105        public long getTimeout() {
106            return timeout;
107        }
108    
109        public void setTimeout(long timeout) {
110            this.timeout = timeout;
111        }
112    
113        protected String getKey() {
114            String uri = getEndpointUri();
115            if (uri.indexOf('?') != -1) {
116                return ObjectHelper.before(uri, "?");
117            } else {
118                return uri;
119            }
120        }
121    }