001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.Callable; 020 import java.util.concurrent.ExecutorService; 021 022 import org.apache.camel.AsyncCallback; 023 import org.apache.camel.AsyncProcessor; 024 import org.apache.camel.CamelContext; 025 import org.apache.camel.Exchange; 026 import org.apache.camel.ExchangePattern; 027 import org.apache.camel.Message; 028 import org.apache.camel.Ordered; 029 import org.apache.camel.Predicate; 030 import org.apache.camel.Processor; 031 import org.apache.camel.Traceable; 032 import org.apache.camel.support.ServiceSupport; 033 import org.apache.camel.support.SynchronizationAdapter; 034 import org.apache.camel.util.AsyncProcessorHelper; 035 import org.apache.camel.util.ExchangeHelper; 036 import org.apache.camel.util.ServiceHelper; 037 import org.slf4j.Logger; 038 import org.slf4j.LoggerFactory; 039 040 import static org.apache.camel.util.ObjectHelper.notNull; 041 042 /** 043 * @version 044 */ 045 public class OnCompletionProcessor extends ServiceSupport implements AsyncProcessor, Traceable { 046 047 private static final Logger LOG = LoggerFactory.getLogger(OnCompletionProcessor.class); 048 private final CamelContext camelContext; 049 private final Processor processor; 050 private final ExecutorService executorService; 051 private final boolean shutdownExecutorService; 052 private final boolean onCompleteOnly; 053 private final boolean onFailureOnly; 054 private final Predicate onWhen; 055 private final boolean useOriginalBody; 056 057 public OnCompletionProcessor(CamelContext camelContext, Processor processor, ExecutorService executorService, boolean shutdownExecutorService, 058 boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen, boolean useOriginalBody) { 059 notNull(camelContext, "camelContext"); 060 notNull(processor, "processor"); 061 this.camelContext = camelContext; 062 this.processor = processor; 063 this.executorService = executorService; 064 this.shutdownExecutorService = shutdownExecutorService; 065 this.onCompleteOnly = onCompleteOnly; 066 this.onFailureOnly = onFailureOnly; 067 this.onWhen = onWhen; 068 this.useOriginalBody = useOriginalBody; 069 } 070 071 @Override 072 protected void doStart() throws Exception { 073 ServiceHelper.startService(processor); 074 } 075 076 @Override 077 protected void doStop() throws Exception { 078 ServiceHelper.stopService(processor); 079 } 080 081 @Override 082 protected void doShutdown() throws Exception { 083 ServiceHelper.stopAndShutdownService(processor); 084 if (shutdownExecutorService) { 085 getCamelContext().getExecutorServiceManager().shutdownNow(executorService); 086 } 087 } 088 089 public CamelContext getCamelContext() { 090 return camelContext; 091 } 092 093 public void process(Exchange exchange) throws Exception { 094 AsyncProcessorHelper.process(this, exchange); 095 } 096 097 public boolean process(Exchange exchange, AsyncCallback callback) { 098 if (processor != null) { 099 // register callback 100 exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronization()); 101 } 102 103 callback.done(true); 104 return true; 105 } 106 107 /** 108 * Processes the exchange by the processors 109 * 110 * @param processor the processor 111 * @param exchange the exchange 112 */ 113 protected static void doProcess(Processor processor, Exchange exchange) { 114 try { 115 processor.process(exchange); 116 } catch (Exception e) { 117 exchange.setException(e); 118 } 119 } 120 121 /** 122 * Prepares the {@link Exchange} to send as onCompletion. 123 * 124 * @param exchange the current exchange 125 * @return the exchange to be routed in onComplete 126 */ 127 protected Exchange prepareExchange(Exchange exchange) { 128 Exchange answer; 129 130 // for asynchronous routing we must use a copy as we dont want it 131 // to cause side effects of the original exchange 132 // (the original thread will run in parallel) 133 answer = ExchangeHelper.createCorrelatedCopy(exchange, false); 134 if (answer.hasOut()) { 135 // move OUT to IN (pipes and filters) 136 answer.setIn(answer.getOut()); 137 answer.setOut(null); 138 } 139 // set MEP to InOnly as this wire tap is a fire and forget 140 answer.setPattern(ExchangePattern.InOnly); 141 142 if (useOriginalBody) { 143 LOG.trace("Using the original IN message instead of current"); 144 145 Message original = exchange.getUnitOfWork().getOriginalInMessage(); 146 answer.setIn(original); 147 } 148 149 // add a header flag to indicate its a on completion exchange 150 answer.setProperty(Exchange.ON_COMPLETION, Boolean.TRUE); 151 152 return answer; 153 } 154 155 private final class OnCompletionSynchronization extends SynchronizationAdapter implements Ordered { 156 157 public int getOrder() { 158 // we want to be last 159 return Ordered.LOWEST; 160 } 161 162 @Override 163 public void onComplete(final Exchange exchange) { 164 if (onFailureOnly) { 165 return; 166 } 167 168 if (onWhen != null && !onWhen.matches(exchange)) { 169 // predicate did not match so do not route the onComplete 170 return; 171 } 172 173 // must use a copy as we dont want it to cause side effects of the original exchange 174 final Exchange copy = prepareExchange(exchange); 175 176 executorService.submit(new Callable<Exchange>() { 177 public Exchange call() throws Exception { 178 LOG.debug("Processing onComplete: {}", copy); 179 doProcess(processor, copy); 180 return copy; 181 } 182 }); 183 } 184 185 public void onFailure(final Exchange exchange) { 186 if (onCompleteOnly) { 187 return; 188 } 189 190 if (onWhen != null && !onWhen.matches(exchange)) { 191 // predicate did not match so do not route the onComplete 192 return; 193 } 194 195 // must use a copy as we dont want it to cause side effects of the original exchange 196 final Exchange copy = prepareExchange(exchange); 197 // must remove exception otherwise onFailure routing will fail as well 198 // the caused exception is stored as a property (Exchange.EXCEPTION_CAUGHT) on the exchange 199 copy.setException(null); 200 201 executorService.submit(new Callable<Exchange>() { 202 public Exchange call() throws Exception { 203 LOG.debug("Processing onFailure: {}", copy); 204 doProcess(processor, copy); 205 return null; 206 } 207 }); 208 } 209 210 @Override 211 public String toString() { 212 if (!onCompleteOnly && !onFailureOnly) { 213 return "onCompleteOrFailure"; 214 } else if (onCompleteOnly) { 215 return "onCompleteOnly"; 216 } else { 217 return "onFailureOnly"; 218 } 219 } 220 } 221 222 @Override 223 public String toString() { 224 return "OnCompletionProcessor[" + processor + "]"; 225 } 226 227 public String getTraceLabel() { 228 return "onCompletion"; 229 } 230 }