001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.seda; 018 019 import java.util.concurrent.BlockingQueue; 020 import java.util.concurrent.CountDownLatch; 021 import java.util.concurrent.TimeUnit; 022 023 import org.apache.camel.AsyncCallback; 024 import org.apache.camel.Exchange; 025 import org.apache.camel.ExchangeTimedOutException; 026 import org.apache.camel.WaitForTaskToComplete; 027 import org.apache.camel.impl.DefaultAsyncProducer; 028 import org.apache.camel.support.SynchronizationAdapter; 029 import org.apache.camel.util.ExchangeHelper; 030 import org.slf4j.Logger; 031 import org.slf4j.LoggerFactory; 032 033 /** 034 * @version 035 */ 036 public class SedaProducer extends DefaultAsyncProducer { 037 private static final transient Logger LOG = LoggerFactory.getLogger(SedaProducer.class); 038 039 /** 040 * @deprecated Better make use of the {@link SedaEndpoint#getQueue()} API which delivers the accurate reference to the queue currently being used. 041 */ 042 @Deprecated 043 protected final BlockingQueue<Exchange> queue; 044 private final SedaEndpoint endpoint; 045 private final WaitForTaskToComplete waitForTaskToComplete; 046 private final long timeout; 047 private final boolean blockWhenFull; 048 049 /** 050 * @deprecated Use {@link #SedaProducer(SedaEndpoint, WaitForTaskToComplete, long, boolean) the other constructor}. 051 */ 052 @Deprecated 053 public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout) { 054 this(endpoint, waitForTaskToComplete, timeout, false); 055 } 056 057 /** 058 * @deprecated Use {@link #SedaProducer(SedaEndpoint, WaitForTaskToComplete, long, boolean) the other constructor}. 059 */ 060 @Deprecated 061 public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout, boolean blockWhenFull) { 062 this(endpoint, waitForTaskToComplete, timeout, blockWhenFull); 063 } 064 065 public SedaProducer(SedaEndpoint endpoint, WaitForTaskToComplete waitForTaskToComplete, long timeout, boolean blockWhenFull) { 066 super(endpoint); 067 this.queue = endpoint.getQueue(); 068 this.endpoint = endpoint; 069 this.waitForTaskToComplete = waitForTaskToComplete; 070 this.timeout = timeout; 071 this.blockWhenFull = blockWhenFull; 072 } 073 074 @Override 075 public boolean process(final Exchange exchange, final AsyncCallback callback) { 076 WaitForTaskToComplete wait = waitForTaskToComplete; 077 if (exchange.getProperty(Exchange.ASYNC_WAIT) != null) { 078 wait = exchange.getProperty(Exchange.ASYNC_WAIT, WaitForTaskToComplete.class); 079 } 080 081 if (wait == WaitForTaskToComplete.Always 082 || (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange))) { 083 084 // do not handover the completion as we wait for the copy to complete, and copy its result back when it done 085 Exchange copy = prepareCopy(exchange, false); 086 087 // latch that waits until we are complete 088 final CountDownLatch latch = new CountDownLatch(1); 089 090 // we should wait for the reply so install a on completion so we know when its complete 091 copy.addOnCompletion(new SynchronizationAdapter() { 092 @Override 093 public void onDone(Exchange response) { 094 // check for timeout, which then already would have invoked the latch 095 if (latch.getCount() == 0) { 096 if (log.isTraceEnabled()) { 097 log.trace("{}. Timeout occurred so response will be ignored: {}", this, response.hasOut() ? response.getOut() : response.getIn()); 098 } 099 return; 100 } else { 101 if (log.isTraceEnabled()) { 102 log.trace("{} with response: {}", this, response.hasOut() ? response.getOut() : response.getIn()); 103 } 104 try { 105 ExchangeHelper.copyResults(exchange, response); 106 } finally { 107 // always ensure latch is triggered 108 latch.countDown(); 109 } 110 } 111 } 112 113 @Override 114 public boolean allowHandover() { 115 // do not allow handover as we want to seda producer to have its completion triggered 116 // at this point in the routing (at this leg), instead of at the very last (this ensure timeout is honored) 117 return false; 118 } 119 120 @Override 121 public String toString() { 122 return "onDone at endpoint: " + endpoint; 123 } 124 }); 125 126 log.trace("Adding Exchange to queue: {}", copy); 127 try { 128 addToQueue(copy); 129 } catch (SedaConsumerNotAvailableException e) { 130 exchange.setException(e); 131 callback.done(true); 132 return true; 133 } 134 135 if (timeout > 0) { 136 if (log.isTraceEnabled()) { 137 log.trace("Waiting for task to complete using timeout (ms): {} at [{}]", timeout, endpoint.getEndpointUri()); 138 } 139 // lets see if we can get the task done before the timeout 140 boolean done = false; 141 try { 142 done = latch.await(timeout, TimeUnit.MILLISECONDS); 143 } catch (InterruptedException e) { 144 // ignore 145 } 146 if (!done) { 147 exchange.setException(new ExchangeTimedOutException(exchange, timeout)); 148 // remove timed out Exchange from queue 149 endpoint.getQueue().remove(copy); 150 // count down to indicate timeout 151 latch.countDown(); 152 } 153 } else { 154 if (log.isTraceEnabled()) { 155 log.trace("Waiting for task to complete (blocking) at [{}]", endpoint.getEndpointUri()); 156 } 157 // no timeout then wait until its done 158 try { 159 latch.await(); 160 } catch (InterruptedException e) { 161 // ignore 162 } 163 } 164 } else { 165 // no wait, eg its a InOnly then just add to queue and return 166 // handover the completion so its the copy which performs that, as we do not wait 167 Exchange copy = prepareCopy(exchange, true); 168 log.trace("Adding Exchange to queue: {}", copy); 169 try { 170 addToQueue(copy); 171 } catch (SedaConsumerNotAvailableException e) { 172 exchange.setException(e); 173 callback.done(true); 174 return true; 175 } 176 } 177 178 // we use OnCompletion on the Exchange to callback and wait for the Exchange to be done 179 // so we should just signal the callback we are done synchronously 180 callback.done(true); 181 return true; 182 } 183 184 protected Exchange prepareCopy(Exchange exchange, boolean handover) { 185 // use a new copy of the exchange to route async 186 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover); 187 // set a new from endpoint to be the seda queue 188 copy.setFromEndpoint(endpoint); 189 return copy; 190 } 191 192 @Override 193 protected void doStart() throws Exception { 194 super.doStart(); 195 endpoint.onStarted(this); 196 } 197 198 @Override 199 protected void doStop() throws Exception { 200 endpoint.onStopped(this); 201 super.doStop(); 202 } 203 204 /** 205 * Strategy method for adding the exchange to the queue. 206 * <p> 207 * Will perform a blocking "put" if blockWhenFull is true, otherwise it will 208 * simply add which will throw exception if the queue is full 209 * 210 * @param exchange the exchange to add to the queue 211 */ 212 protected void addToQueue(Exchange exchange) throws SedaConsumerNotAvailableException { 213 QueueReference queueReference = endpoint.getQueueReference(); 214 BlockingQueue<Exchange> queue = queueReference.getQueue(); 215 216 if (endpoint.isFailIfNoConsumers() && !queueReference.hasConsumers()) { 217 throw new SedaConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); 218 } 219 if (blockWhenFull) { 220 try { 221 queue.put(exchange); 222 } catch (InterruptedException e) { 223 // ignore 224 log.debug("Put interrupted, are we stopping? {}", isStopping() || isStopped()); 225 } 226 } else { 227 queue.add(exchange); 228 } 229 } 230 231 }