001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.util;
018    
019    import java.io.ByteArrayOutputStream;
020    import java.io.Closeable;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.util.Iterator;
024    import java.util.Scanner;
025    
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.NoTypeConversionAvailableException;
028    
029    /**
030     * Group based {@link Iterator} which groups the given {@link Iterator} a number of times
031     * and then return a combined response as a String.
032     * <p/>
033     * This implementation uses as internal byte array buffer, to combine the response.
034     * The token is inserted between the individual parts.
035     * <p/>
036     * For example if you group by new line, then a new line token is inserted between the lines.
037     */
038    public final class GroupIterator implements Iterator<Object>, Closeable {
039    
040        private final CamelContext camelContext;
041        private final Iterator<?> it;
042        private final String token;
043        private final int group;
044        private boolean closed;
045        private final ByteArrayOutputStream bos = new ByteArrayOutputStream();
046    
047        /**
048         * Creates a new group iterator
049         *
050         * @param camelContext  the camel context
051         * @param it            the iterator to group
052         * @param token         then token used to separate between the parts, use <tt>null</tt> to not add the token
053         * @param group         number of parts to group together
054         * @throws IllegalArgumentException is thrown if group is not a positive number
055         */
056        public GroupIterator(CamelContext camelContext, Iterator<?> it, String token, int group) {
057            this.camelContext = camelContext;
058            this.it = it;
059            this.token = token;
060            this.group = group;
061            if (group <= 0) {
062                throw new IllegalArgumentException("Group must be a positive number, was: " + group);
063            }
064        }
065    
066        @Override
067        public void close() throws IOException {
068            try {
069                if (it instanceof Scanner) {
070                    // special for Scanner which implement the Closeable since JDK7 
071                    Scanner scanner = (Scanner) it;
072                    scanner.close();
073                    IOException ioException = scanner.ioException();
074                    if (ioException != null) {
075                        throw ioException;
076                    }
077                } else if (it instanceof Closeable) {
078                    IOHelper.closeWithException((Closeable) it);
079                }
080            } finally {
081                // close the buffer as well
082                bos.close();
083                // we are now closed
084                closed = true;
085            }
086        }
087    
088        @Override
089        public boolean hasNext() {
090            if (closed) {
091                return false;
092            }
093    
094            boolean answer = it.hasNext();
095            if (!answer) {
096                // auto close
097                try {
098                    close();
099                } catch (IOException e) {
100                    // ignore
101                }
102            }
103            return answer;
104        }
105    
106        @Override
107        public Object next() {
108            try {
109                return doNext();
110            } catch (Exception e) {
111                throw ObjectHelper.wrapRuntimeCamelException(e);
112            }
113        }
114    
115        private Object doNext() throws IOException, NoTypeConversionAvailableException {
116            int count = 0;
117            Object data = "";
118            while (count < group && it.hasNext()) {
119                data = it.next();
120    
121                // include token in between
122                if (data != null && count > 0 && token != null) {
123                    bos.write(token.getBytes());
124                }
125                if (data instanceof InputStream) {
126                    InputStream is = (InputStream) data;
127                    IOHelper.copy(is, bos);
128                } else if (data instanceof byte[]) {
129                    byte[] bytes = (byte[]) data;
130                    bos.write(bytes);
131                } else if (data != null) {
132                    // convert to input stream
133                    InputStream is = camelContext.getTypeConverter().mandatoryConvertTo(InputStream.class, data);
134                    IOHelper.copy(is, bos);
135                }
136    
137                count++;
138            }
139    
140            // prepare and return answer as String
141            String answer = bos.toString();
142            bos.reset();
143            return answer;
144        }
145    
146        @Override
147        public void remove() {
148            it.remove();
149        }
150    }