001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.io.File;
020    import java.lang.management.ManagementFactory;
021    import java.lang.management.MemoryMXBean;
022    import java.util.LinkedHashSet;
023    import java.util.Set;
024    import java.util.UUID;
025    
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.CamelContextAware;
028    import org.apache.camel.Exchange;
029    import org.apache.camel.StreamCache;
030    import org.apache.camel.spi.StreamCachingStrategy;
031    import org.apache.camel.util.FilePathResolver;
032    import org.apache.camel.util.FileUtil;
033    import org.apache.camel.util.IOHelper;
034    import org.slf4j.Logger;
035    import org.slf4j.LoggerFactory;
036    
037    /**
038     * Default implementation of {@link StreamCachingStrategy}
039     */
040    public class DefaultStreamCachingStrategy extends org.apache.camel.support.ServiceSupport implements CamelContextAware, StreamCachingStrategy {
041    
042        @Deprecated
043        public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
044        @Deprecated
045        public static final String BUFFER_SIZE = "CamelCachedOutputStreamBufferSize";
046        @Deprecated
047        public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
048        @Deprecated
049        public static final String CIPHER_TRANSFORMATION = "CamelCachedOutputStreamCipherTransformation";
050    
051        private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamCachingStrategy.class);
052    
053        private CamelContext camelContext;
054        private boolean enabled;
055        private File spoolDirectory;
056        private transient String spoolDirectoryName = "${java.io.tmpdir}/camel/camel-tmp-#uuid#";
057        private long spoolThreshold = StreamCache.DEFAULT_SPOOL_THRESHOLD;
058        private int spoolUsedHeapMemoryThreshold;
059        private SpoolUsedHeapMemoryLimit spoolUsedHeapMemoryLimit;
060        private String spoolChiper;
061        private int bufferSize = IOHelper.DEFAULT_BUFFER_SIZE;
062        private boolean removeSpoolDirectoryWhenStopping = true;
063        private final UtilizationStatistics statistics = new UtilizationStatistics();
064        private final Set<SpoolRule> spoolRules = new LinkedHashSet<SpoolRule>();
065        private boolean anySpoolRules;
066    
067        public CamelContext getCamelContext() {
068            return camelContext;
069        }
070    
071        public void setCamelContext(CamelContext camelContext) {
072            this.camelContext = camelContext;
073        }
074    
075        public boolean isEnabled() {
076            return enabled;
077        }
078    
079        public void setEnabled(boolean enabled) {
080            this.enabled = enabled;
081        }
082    
083        public void setSpoolDirectory(String path) {
084            this.spoolDirectoryName = path;
085        }
086    
087        public void setSpoolDirectory(File path) {
088            this.spoolDirectory = path;
089        }
090    
091        public File getSpoolDirectory() {
092            return spoolDirectory;
093        }
094    
095        public long getSpoolThreshold() {
096            return spoolThreshold;
097        }
098    
099        public int getSpoolUsedHeapMemoryThreshold() {
100            return spoolUsedHeapMemoryThreshold;
101        }
102    
103        public void setSpoolUsedHeapMemoryThreshold(int spoolHeapMemoryWatermarkThreshold) {
104            this.spoolUsedHeapMemoryThreshold = spoolHeapMemoryWatermarkThreshold;
105        }
106    
107        public SpoolUsedHeapMemoryLimit getSpoolUsedHeapMemoryLimit() {
108            return spoolUsedHeapMemoryLimit;
109        }
110    
111        public void setSpoolUsedHeapMemoryLimit(SpoolUsedHeapMemoryLimit spoolUsedHeapMemoryLimit) {
112            this.spoolUsedHeapMemoryLimit = spoolUsedHeapMemoryLimit;
113        }
114    
115        public void setSpoolThreshold(long spoolThreshold) {
116            this.spoolThreshold = spoolThreshold;
117        }
118    
119        public String getSpoolChiper() {
120            return spoolChiper;
121        }
122    
123        public void setSpoolChiper(String spoolChiper) {
124            this.spoolChiper = spoolChiper;
125        }
126    
127        public int getBufferSize() {
128            return bufferSize;
129        }
130    
131        public void setBufferSize(int bufferSize) {
132            this.bufferSize = bufferSize;
133        }
134    
135        public boolean isRemoveSpoolDirectoryWhenStopping() {
136            return removeSpoolDirectoryWhenStopping;
137        }
138    
139        public void setRemoveSpoolDirectoryWhenStopping(boolean removeSpoolDirectoryWhenStopping) {
140            this.removeSpoolDirectoryWhenStopping = removeSpoolDirectoryWhenStopping;
141        }
142    
143        public boolean isAnySpoolRules() {
144            return anySpoolRules;
145        }
146    
147        public void setAnySpoolRules(boolean anySpoolTasks) {
148            this.anySpoolRules = anySpoolTasks;
149        }
150    
151        public Statistics getStatistics() {
152            return statistics;
153        }
154    
155        public boolean shouldSpoolCache(long length) {
156            if (!enabled || spoolRules.isEmpty()) {
157                return false;
158            }
159    
160            boolean all = true;
161            boolean any = false;
162            for (SpoolRule rule : spoolRules) {
163                boolean result = rule.shouldSpoolCache(length);
164                if (!result) {
165                    all = false;
166                    if (!anySpoolRules) {
167                        // no need to check anymore
168                        break;
169                    }
170                } else {
171                    any = true;
172                    if (anySpoolRules) {
173                        // no need to check anymore
174                        break;
175                    }
176                }
177            }
178    
179            boolean answer = anySpoolRules ? any : all;
180            LOG.debug("Should spool cache {} -> {}", length, answer);
181            return answer;
182        }
183    
184        public void addSpoolRule(SpoolRule rule) {
185            spoolRules.add(rule);
186        }
187    
188        public StreamCache cache(Exchange exchange) {
189            StreamCache cache = exchange.getIn().getBody(StreamCache.class);
190            if (cache != null) {
191                if (LOG.isTraceEnabled()) {
192                    LOG.trace("Cached stream to {} -> {}", cache.inMemory() ? "memory" : "spool", cache);
193                }
194                if (statistics.isStatisticsEnabled()) {
195                    try {
196                        if (cache.inMemory()) {
197                            statistics.updateMemory(cache.length());
198                        } else {
199                            statistics.updateSpool(cache.length());
200                        }
201                    } catch (Exception e) {
202                        LOG.debug("Error updating cache statistics. This exception is ignored.", e);
203                    }
204                }
205            }
206            return cache;
207        }
208    
209        protected String resolveSpoolDirectory(String path) {
210            String name = camelContext.getManagementNameStrategy().resolveManagementName(path, camelContext.getName(), false);
211            if (name != null) {
212                name = customResolveManagementName(name);
213            }
214            // and then check again with invalid check to ensure all ## is resolved
215            if (name != null) {
216                name = camelContext.getManagementNameStrategy().resolveManagementName(name, camelContext.getName(), true);
217            }
218            return name;
219        }
220    
221        protected String customResolveManagementName(String pattern) {
222            if (pattern.contains("#uuid#")) {
223                String uuid = UUID.randomUUID().toString();
224                pattern = pattern.replaceFirst("#uuid#", uuid);
225            }
226            return FilePathResolver.resolvePath(pattern);
227        }
228    
229        @Override
230        protected void doStart() throws Exception {
231            if (!enabled) {
232                LOG.debug("StreamCaching is not enabled");
233                return;
234            }
235    
236            String bufferSize = camelContext.getProperty(BUFFER_SIZE);
237            String hold = camelContext.getProperty(THRESHOLD);
238            String chiper = camelContext.getProperty(CIPHER_TRANSFORMATION);
239            String dir = camelContext.getProperty(TEMP_DIR);
240    
241            boolean warn = false;
242            if (bufferSize != null) {
243                warn = true;
244                this.bufferSize = camelContext.getTypeConverter().convertTo(Integer.class, bufferSize);
245            }
246            if (hold != null) {
247                warn = true;
248                this.spoolThreshold = camelContext.getTypeConverter().convertTo(Long.class, hold);
249            }
250            if (chiper != null) {
251                warn = true;
252                this.spoolChiper = chiper;
253            }
254            if (dir != null) {
255                warn = true;
256                this.spoolDirectory = camelContext.getTypeConverter().convertTo(File.class, dir);
257            }
258            if (warn) {
259                LOG.warn("Configuring of StreamCaching using CamelContext properties is deprecated - use StreamCachingStrategy instead.");
260            }
261    
262            if (spoolUsedHeapMemoryThreshold > 99) {
263                throw new IllegalArgumentException("SpoolHeapMemoryWatermarkThreshold must not be higher than 99, was: " + spoolUsedHeapMemoryThreshold);
264            }
265    
266            // if we can overflow to disk then make sure directory exists / is created
267            if (spoolThreshold > 0 || spoolUsedHeapMemoryThreshold > 0) {
268    
269                if (spoolDirectory == null && spoolDirectoryName == null) {
270                    throw new IllegalArgumentException("SpoolDirectory must be configured when using SpoolThreshold > 0");
271                }
272    
273                if (spoolDirectory == null) {
274                    String name = resolveSpoolDirectory(spoolDirectoryName);
275                    if (name != null) {
276                        spoolDirectory = new File(name);
277                        spoolDirectoryName = null;
278                    } else {
279                        throw new IllegalStateException("Cannot resolve spool directory from pattern: " + spoolDirectoryName);
280                    }
281                }
282    
283                if (spoolDirectory.exists()) {
284                    if (spoolDirectory.isDirectory()) {
285                        LOG.debug("Using spool directory: {}", spoolDirectory);
286                    } else {
287                        LOG.warn("Spool directory: {} is not a directory. This may cause problems spooling to disk for the stream caching!", spoolDirectory);
288                    }
289                } else {
290                    boolean created = spoolDirectory.mkdirs();
291                    if (!created) {
292                        LOG.warn("Cannot create spool directory: {}. This may cause problems spooling to disk for the stream caching!", spoolDirectory);
293                    } else {
294                        LOG.debug("Created spool directory: {}", spoolDirectory);
295                    }
296    
297                }
298    
299                if (spoolThreshold > 0) {
300                    spoolRules.add(new FixedThresholdSpoolRule());
301                }
302                if (spoolUsedHeapMemoryThreshold > 0) {
303                    if (spoolUsedHeapMemoryLimit == null) {
304                        // use max by default
305                        spoolUsedHeapMemoryLimit = SpoolUsedHeapMemoryLimit.Max;
306                    }
307                    spoolRules.add(new UsedHeapMemorySpoolRule(spoolUsedHeapMemoryLimit));
308                }
309            }
310    
311            LOG.debug("StreamCaching configuration {}", this.toString());
312    
313            if (spoolDirectory != null) {
314                LOG.info("StreamCaching in use with spool directory: {} and rules: {}", spoolDirectory.getPath(), spoolRules.toString());
315            } else {
316                LOG.info("StreamCaching in use with rules: {}", spoolRules.toString());
317            }
318        }
319    
320        @Override
321        protected void doStop() throws Exception {
322            if (spoolThreshold > 0 & spoolDirectory != null  && isRemoveSpoolDirectoryWhenStopping()) {
323                LOG.debug("Removing spool directory: {}", spoolDirectory);
324                FileUtil.removeDir(spoolDirectory);
325            }
326    
327            if (LOG.isDebugEnabled() && statistics.isStatisticsEnabled()) {
328                LOG.debug("Stopping StreamCachingStrategy with statistics: {}", statistics.toString());
329            }
330    
331            statistics.reset();
332        }
333    
334        @Override
335        public String toString() {
336            return "DefaultStreamCachingStrategy["
337                + "spoolDirectory=" + spoolDirectory
338                + ", spoolChiper=" + spoolChiper
339                + ", spoolThreshold=" + spoolThreshold
340                + ", spoolUsedHeapMemoryThreshold=" + spoolUsedHeapMemoryThreshold
341                + ", bufferSize=" + bufferSize
342                + ", anySpoolRules=" + anySpoolRules + "]";
343        }
344    
345        private final class FixedThresholdSpoolRule implements SpoolRule {
346    
347            public boolean shouldSpoolCache(long length) {
348                if (spoolThreshold > 0 && length > spoolThreshold) {
349                    LOG.trace("Should spool cache fixed threshold {} > {} -> true", length, spoolThreshold);
350                    return true;
351                }
352                return false;
353            }
354    
355            public String toString() {
356                if (spoolThreshold < 1024) {
357                    return "Spool > " + spoolThreshold + " bytes body size";
358                } else {
359                    return "Spool > " + (spoolThreshold >> 10) + "K body size";
360                }
361            }
362        }
363    
364        private final class UsedHeapMemorySpoolRule implements SpoolRule {
365    
366            private final MemoryMXBean heapUsage;
367            private final SpoolUsedHeapMemoryLimit limit;
368    
369            private UsedHeapMemorySpoolRule(SpoolUsedHeapMemoryLimit limit) {
370                this.limit = limit;
371                this.heapUsage = ManagementFactory.getMemoryMXBean();
372            }
373    
374            public boolean shouldSpoolCache(long length) {
375                if (spoolUsedHeapMemoryThreshold > 0) {
376                    // must use double to calculate with decimals for the percentage
377                    double used = heapUsage.getHeapMemoryUsage().getUsed();
378                    double upper = limit == SpoolUsedHeapMemoryLimit.Committed
379                        ? heapUsage.getHeapMemoryUsage().getCommitted() : heapUsage.getHeapMemoryUsage().getMax();
380                    double calc = (used / upper) * 100;
381                    int percentage = (int) calc;
382    
383                    if (LOG.isTraceEnabled()) {
384                        long u = heapUsage.getHeapMemoryUsage().getUsed();
385                        long c = heapUsage.getHeapMemoryUsage().getCommitted();
386                        long m = heapUsage.getHeapMemoryUsage().getMax();
387                        LOG.trace("Heap memory: [used={}M ({}%), committed={}M, max={}M]", new Object[]{u >> 20, percentage, c >> 20, m >> 20});
388                    }
389    
390                    if (percentage > spoolUsedHeapMemoryThreshold) {
391                        LOG.trace("Should spool cache heap memory threshold {} > {} -> true", percentage, spoolUsedHeapMemoryThreshold);
392                        return true;
393                    }
394                }
395                return false;
396            }
397    
398            public String toString() {
399                return "Spool > " + spoolUsedHeapMemoryThreshold + "% used of " + limit + " heap memory";
400            }
401        }
402    
403        /**
404         * Represents utilization statistics.
405         */
406        private static final class UtilizationStatistics implements Statistics {
407    
408            private boolean statisticsEnabled;
409            private volatile long memoryCounter;
410            private volatile long memorySize;
411            private volatile long memoryAverageSize;
412            private volatile long spoolCounter;
413            private volatile long spoolSize;
414            private volatile long spoolAverageSize;
415    
416            synchronized void updateMemory(long size) {
417                memoryCounter++;
418                memorySize += size;
419                memoryAverageSize = memorySize / memoryCounter;
420            }
421    
422            synchronized void updateSpool(long size) {
423                spoolCounter++;
424                spoolSize += size;
425                spoolAverageSize = spoolSize / spoolCounter;
426            }
427    
428            public long getCacheMemoryCounter() {
429                return memoryCounter;
430            }
431    
432            public long getCacheMemorySize() {
433                return memorySize;
434            }
435    
436            public long getCacheMemoryAverageSize() {
437                return memoryAverageSize;
438            }
439    
440            public long getCacheSpoolCounter() {
441                return spoolCounter;
442            }
443    
444            public long getCacheSpoolSize() {
445                return spoolSize;
446            }
447    
448            public long getCacheSpoolAverageSize() {
449                return spoolAverageSize;
450            }
451    
452            public synchronized void reset() {
453                memoryCounter = 0;
454                memorySize = 0;
455                memoryAverageSize = 0;
456                spoolCounter = 0;
457                spoolSize = 0;
458                spoolAverageSize = 0;
459            }
460    
461            public boolean isStatisticsEnabled() {
462                return statisticsEnabled;
463            }
464    
465            public void setStatisticsEnabled(boolean statisticsEnabled) {
466                this.statisticsEnabled = statisticsEnabled;
467            }
468    
469            public String toString() {
470                return String.format("[memoryCounter=%s, memorySize=%s, memoryAverageSize=%s, spoolCounter=%s, spoolSize=%s, spoolAverageSize=%s]",
471                        memoryCounter, memorySize, memoryAverageSize, spoolCounter, spoolSize, spoolAverageSize);
472            }
473        }
474    
475    }