001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 import java.util.concurrent.Callable; 022 import java.util.concurrent.ExecutorService; 023 024 import org.apache.camel.AsyncCallback; 025 import org.apache.camel.AsyncProcessor; 026 import org.apache.camel.Endpoint; 027 import org.apache.camel.Exchange; 028 import org.apache.camel.ExchangePattern; 029 import org.apache.camel.Expression; 030 import org.apache.camel.Processor; 031 import org.apache.camel.Traceable; 032 import org.apache.camel.impl.DefaultExchange; 033 import org.apache.camel.support.ServiceSupport; 034 import org.apache.camel.util.AsyncProcessorHelper; 035 import org.apache.camel.util.ExchangeHelper; 036 import org.apache.camel.util.ObjectHelper; 037 import org.apache.camel.util.ServiceHelper; 038 import org.slf4j.Logger; 039 import org.slf4j.LoggerFactory; 040 041 /** 042 * Processor for wire tapping exchanges to an endpoint destination. 043 * 044 * @version 045 */ 046 public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, Traceable { 047 private static final Logger LOG = LoggerFactory.getLogger(WireTapProcessor.class); 048 private final Endpoint destination; 049 private final Processor processor; 050 private final ExchangePattern exchangePattern; 051 private final ExecutorService executorService; 052 private volatile boolean shutdownExecutorService; 053 054 // expression or processor used for populating a new exchange to send 055 // as opposed to traditional wiretap that sends a copy of the original exchange 056 private Expression newExchangeExpression; 057 private List<Processor> newExchangeProcessors; 058 private boolean copy; 059 private Processor onPrepare; 060 061 public WireTapProcessor(Endpoint destination, Processor processor, ExchangePattern exchangePattern, 062 ExecutorService executorService, boolean shutdownExecutorService) { 063 this.destination = destination; 064 this.processor = processor; 065 this.exchangePattern = exchangePattern; 066 ObjectHelper.notNull(executorService, "executorService"); 067 this.executorService = executorService; 068 this.shutdownExecutorService = shutdownExecutorService; 069 } 070 071 @Override 072 public String toString() { 073 return "WireTap[" + destination + "]"; 074 } 075 076 @Override 077 public String getTraceLabel() { 078 return "wireTap(" + destination + ")"; 079 } 080 081 public void process(Exchange exchange) throws Exception { 082 AsyncProcessorHelper.process(this, exchange); 083 } 084 085 public boolean process(Exchange exchange, final AsyncCallback callback) { 086 if (!isStarted()) { 087 throw new IllegalStateException("WireTapProcessor has not been started: " + this); 088 } 089 090 // must configure the wire tap beforehand 091 final Exchange wireTapExchange = configureExchange(exchange, exchangePattern); 092 093 // send the exchange to the destination using an executor service 094 executorService.submit(new Callable<Exchange>() { 095 public Exchange call() throws Exception { 096 try { 097 LOG.debug(">>>> (wiretap) {} {}", destination, wireTapExchange); 098 processor.process(wireTapExchange); 099 } catch (Throwable e) { 100 LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + destination + ". This exception will be ignored.", e); 101 } 102 return wireTapExchange; 103 }; 104 }); 105 106 // continue routing this synchronously 107 callback.done(true); 108 return true; 109 } 110 111 112 protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) { 113 Exchange answer; 114 if (copy) { 115 // use a copy of the original exchange 116 answer = configureCopyExchange(exchange); 117 } else { 118 // use a new exchange 119 answer = configureNewExchange(exchange); 120 } 121 122 // set property which endpoint we send to 123 answer.setProperty(Exchange.TO_ENDPOINT, destination.getEndpointUri()); 124 125 // prepare the exchange 126 if (newExchangeExpression != null) { 127 Object body = newExchangeExpression.evaluate(answer, Object.class); 128 if (body != null) { 129 answer.getIn().setBody(body); 130 } 131 } 132 133 if (newExchangeProcessors != null) { 134 for (Processor processor : newExchangeProcessors) { 135 try { 136 processor.process(answer); 137 } catch (Exception e) { 138 throw ObjectHelper.wrapRuntimeCamelException(e); 139 } 140 } 141 } 142 143 // invoke on prepare on the exchange if specified 144 if (onPrepare != null) { 145 try { 146 onPrepare.process(answer); 147 } catch (Exception e) { 148 throw ObjectHelper.wrapRuntimeCamelException(e); 149 } 150 } 151 152 return answer; 153 } 154 155 private Exchange configureCopyExchange(Exchange exchange) { 156 // must use a copy as we dont want it to cause side effects of the original exchange 157 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 158 // set MEP to InOnly as this wire tap is a fire and forget 159 copy.setPattern(ExchangePattern.InOnly); 160 return copy; 161 } 162 163 private Exchange configureNewExchange(Exchange exchange) { 164 return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly); 165 } 166 167 public List<Processor> getNewExchangeProcessors() { 168 return newExchangeProcessors; 169 } 170 171 public void setNewExchangeProcessors(List<Processor> newExchangeProcessors) { 172 this.newExchangeProcessors = newExchangeProcessors; 173 } 174 175 public Expression getNewExchangeExpression() { 176 return newExchangeExpression; 177 } 178 179 public void setNewExchangeExpression(Expression newExchangeExpression) { 180 this.newExchangeExpression = newExchangeExpression; 181 } 182 183 public void addNewExchangeProcessor(Processor processor) { 184 if (newExchangeProcessors == null) { 185 newExchangeProcessors = new ArrayList<Processor>(); 186 } 187 newExchangeProcessors.add(processor); 188 } 189 190 public boolean isCopy() { 191 return copy; 192 } 193 194 public void setCopy(boolean copy) { 195 this.copy = copy; 196 } 197 198 public Processor getOnPrepare() { 199 return onPrepare; 200 } 201 202 public void setOnPrepare(Processor onPrepare) { 203 this.onPrepare = onPrepare; 204 } 205 206 @Override 207 protected void doStart() throws Exception { 208 ServiceHelper.startService(processor); 209 } 210 211 @Override 212 protected void doStop() throws Exception { 213 ServiceHelper.stopService(processor); 214 } 215 216 @Override 217 protected void doShutdown() throws Exception { 218 ServiceHelper.stopAndShutdownService(processor); 219 if (shutdownExecutorService) { 220 destination.getCamelContext().getExecutorServiceManager().shutdownNow(executorService); 221 } 222 } 223 }