001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.CountDownLatch;
020    
021    import org.apache.camel.AsyncCallback;
022    import org.apache.camel.AsyncProcessor;
023    import org.apache.camel.Endpoint;
024    import org.apache.camel.Exchange;
025    import org.apache.camel.ExchangePattern;
026    import org.apache.camel.Producer;
027    import org.apache.camel.util.ServiceHelper;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    /**
032     * Ensures a {@link Producer} is executed within an {@link org.apache.camel.spi.UnitOfWork}.
033     *
034     * @version 
035     */
036    public final class UnitOfWorkProducer implements Producer {
037    
038        private static final Logger LOG = LoggerFactory.getLogger(UnitOfWorkProducer.class);
039        private final Producer producer;
040        private final AsyncProcessor processor;
041    
042        /**
043         * The producer which should be executed within an {@link org.apache.camel.spi.UnitOfWork}.
044         *
045         * @param producer the producer
046         */
047        public UnitOfWorkProducer(Producer producer) {
048            this.producer = producer;
049            // wrap in unit of work
050            CamelInternalProcessor internal = new CamelInternalProcessor(producer);
051            internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
052            this.processor = internal;
053        }
054    
055        public Endpoint getEndpoint() {
056            return producer.getEndpoint();
057        }
058    
059        public Exchange createExchange() {
060            return producer.createExchange();
061        }
062    
063        public Exchange createExchange(ExchangePattern pattern) {
064            return producer.createExchange(pattern);
065        }
066    
067        public Exchange createExchange(Exchange exchange) {
068            return producer.createExchange(exchange);
069        }
070    
071        public void process(final Exchange exchange) throws Exception {
072            final CountDownLatch latch = new CountDownLatch(1);
073            boolean sync = processor.process(exchange, new AsyncCallback() {
074                public void done(boolean doneSync) {
075                    if (!doneSync) {
076                        LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId());
077                        latch.countDown();
078                    }
079                }
080    
081                @Override
082                public String toString() {
083                    return "Done " + processor;
084                }
085            });
086            if (!sync) {
087                LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
088                        exchange.getExchangeId(), exchange);
089                latch.await();
090                LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}",
091                        exchange.getExchangeId(), exchange);
092            }
093        }
094    
095        public void start() throws Exception {
096            ServiceHelper.startService(processor);
097        }
098    
099        public void stop() throws Exception {
100            ServiceHelper.stopService(processor);
101        }
102    
103        public boolean isSingleton() {
104            return producer.isSingleton();
105        }
106    
107        @Override
108        public String toString() {
109            return "UnitOfWork(" + producer + ")";
110        }
111    }