View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.chemistry.opencmis.client.runtime.util;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.IOException;
23  import java.io.OutputStream;
24  import java.io.SequenceInputStream;
25  
26  import org.apache.chemistry.opencmis.client.api.Document;
27  import org.apache.chemistry.opencmis.client.api.OperationContext;
28  import org.apache.chemistry.opencmis.client.api.Session;
29  import org.apache.chemistry.opencmis.client.util.ContentStreamUtils;
30  import org.apache.chemistry.opencmis.client.util.OperationContextUtils;
31  import org.apache.chemistry.opencmis.commons.data.ContentStream;
32  import org.apache.chemistry.opencmis.commons.spi.Holder;
33  
34  /**
35   * This OutputStream allows overwriting and appending to the content of a
36   * document.
37   * 
38   * This class is a convenience layer on top of the CMIS setContentStream() and
39   * appendContentStream() operations. It provides the glue to other (legacy)
40   * interfaces that know how to work with standard streams. However, calling
41   * setContentStream() and appendContentStream() directly provides more control
42   * and may be more efficient than using this class.
43   * 
44   * Data written to this stream is buffered. The default buffer size is 64 KiB.
45   * The data is send to the repository if the buffer is full, or {@link #flush()}
46   * is called, or {@link #close()} is called. Because sending data to the
47   * repository requires a HTTP call, this should be triggered only when
48   * necessary. Depending on the use case, the buffer size should be increased.
49   * 
50   * If the overwrite mode is enabled, the first call is a setContentStream()
51   * call, which overwrites the existing content. All following calls are
52   * appendContentStream() calls. If the overwrite mode is not enabled, all calls
53   * are appendContentStream() calls.
54   * 
55   * If the document is versioned, it's the responsibility of the caller to check
56   * it out and check it in. If the document is auto-versioned, each chunk of
57   * bytes may create a new version.
58   * 
59   * If the repository supports change tokens and the provided document has a
60   * non-empty change token property ({@code cmis:changeToken}), change tokens are
61   * respected.
62   * 
63   * This class is not thread safe.
64   */
65  public class AppendOutputStream extends OutputStream {
66  
67      public final static OperationContext DOCUMENT_OPERATION_CONTEXT = OperationContextUtils
68              .createMinimumOperationContext("cmis:contentStreamFileName", "cmis:contentStreamMimeType",
69                      "cmis:changeToken");
70  
71      private final static int DEFAULT_BUFFER_SIZE = 64 * 1024;
72  
73      private Session session;
74      private String repId;
75      private String documentId;
76      private String changeToken;
77      private boolean overwrite;
78      private String filename;
79      private String mimeType;
80      private byte[] buffer;
81      private int pos;
82      private boolean isClosed;
83  
84      public AppendOutputStream(Session session, Document doc, boolean overwrite, String filename, String mimeType) {
85          this(session, doc, overwrite, filename, mimeType, DEFAULT_BUFFER_SIZE);
86      }
87  
88      /**
89       * Creates an OutputStream that appends content to a document.
90       * 
91       * @param session
92       *            the session object, must not be {@code null}
93       * @param doc
94       *            the document, must not be {@code null}
95       * @param overwrite
96       *            if {@code true} the first call to repository sets a new
97       *            content, if {@code false} the all calls append to the current
98       *            content
99       * @param filename
100      *            the file name, may be {@code null}
101      * @param mimeType
102      *            the MIME type, may be {@code null}
103      * @param bufferSize
104      *            buffer size
105      */
106     public AppendOutputStream(Session session, Document doc, boolean overwrite, String filename, String mimeType,
107             int bufferSize) {
108         if (session == null) {
109             throw new IllegalArgumentException("Session must be set!");
110         }
111         if (doc == null) {
112             throw new IllegalArgumentException("Document must be set!");
113         }
114         if (bufferSize < 0) {
115             throw new IllegalArgumentException("Buffer size must be positive!");
116         }
117 
118         if (filename == null) {
119             this.filename = doc.getContentStreamFileName();
120         } else {
121             this.filename = filename;
122         }
123 
124         if (mimeType == null) {
125             this.mimeType = doc.getContentStreamMimeType();
126         } else {
127             this.mimeType = mimeType;
128         }
129 
130         this.session = session;
131         this.repId = session.getRepositoryInfo().getId();
132         this.documentId = doc.getId();
133         this.changeToken = doc.getChangeToken();
134         this.overwrite = overwrite;
135         this.buffer = new byte[bufferSize];
136         this.pos = 0;
137         this.isClosed = false;
138     }
139 
140     @Override
141     public void write(int b) throws IOException {
142         if (isClosed) {
143             throw new IOException("Stream is already closed!");
144         }
145 
146         if (pos + 1 > buffer.length) {
147             // not enough space in the buffer for the additional byte
148             // -> write buffer and the byte
149             send(new byte[] { (byte) (b & 0xFF) }, 0, 1, false);
150         } else {
151             buffer[pos++] = (byte) (b & 0xFF);
152         }
153     }
154 
155     @Override
156     public void write(byte[] b) throws IOException {
157         write(b, 0, b.length);
158     }
159 
160     @Override
161     public void write(byte[] b, int off, int len) throws IOException {
162         if (isClosed) {
163             throw new IOException("Stream is already closed!");
164         }
165 
166         if (b == null) {
167             throw new IllegalArgumentException("Data must not be null!");
168         } else if (off < 0 || off > b.length) {
169             throw new IndexOutOfBoundsException("Invalid offset!");
170         } else if (len < 0 || (off + len) > b.length || (off + len) < 0) {
171             throw new IndexOutOfBoundsException("Invalid length!");
172         } else if (len == 0) {
173             return;
174         }
175 
176         if (pos + len > buffer.length) {
177             // not enough space in the buffer for the additional bytes
178             // -> write buffer and bytes
179             send(b, off, len, false);
180         } else {
181             System.arraycopy(b, off, buffer, pos, len);
182             pos += len;
183         }
184     }
185 
186     @Override
187     public void flush() throws IOException {
188         flush(false);
189     }
190 
191     /**
192      * Appends (or sets) the current buffer to the document.
193      * 
194      * @param isLastChunk
195      *            indicates if this is the last chunk of the content
196      * @throws IOException
197      *             if an error occurs
198      */
199     public void flush(boolean isLastChunk) throws IOException {
200         if (isClosed) {
201             throw new IOException("Stream is already closed!");
202         }
203 
204         send(isLastChunk);
205     }
206 
207     /**
208      * Updates the document content with the provided content stream.
209      */
210     protected void send(boolean isLastChunk) throws IOException {
211         send(null, 0, 0, isLastChunk);
212     }
213 
214     /**
215      * Updates the document content with the provided content stream.
216      */
217     protected void send(byte[] extraBytes, int extraOff, int extraLen, boolean isLastChunk) throws IOException {
218 
219         ContentStream contentStream = null;
220         if (extraBytes == null) {
221             if (pos == 0) {
222                 // buffer is empty and no extra bytes -> nothing to do
223                 return;
224             } else {
225                 // buffer is not empty and no extra bytes -> send buffer
226                 contentStream = ContentStreamUtils.createByteArrayContentStream(filename, buffer, 0, pos, mimeType);
227             }
228         } else {
229             if (pos == 0) {
230                 // buffer is empty but we have extra bytes
231                 // -> only send extra bytes
232                 contentStream = ContentStreamUtils.createByteArrayContentStream(filename, extraBytes, extraOff,
233                         extraLen, mimeType);
234             } else {
235                 // buffer is not empty and we have extra bytes
236                 // -> send buffer and extra bytes
237                 contentStream = ContentStreamUtils.createContentStream(filename, pos + extraLen, mimeType,
238                         new ContentStreamUtils.AutoCloseInputStream(new SequenceInputStream(new ByteArrayInputStream(
239                                 buffer, 0, pos), new ByteArrayInputStream(extraBytes, extraOff, extraLen))));
240             }
241         }
242 
243         Holder<String> objectIdHolder = new Holder<String>(documentId);
244         Holder<String> changeTokenHolder = changeToken != null ? new Holder<String>(changeToken) : null;
245 
246         try {
247             if (overwrite) {
248                 // start a new content stream
249                 session.getBinding()
250                         .getObjectService()
251                         .setContentStream(repId, objectIdHolder, Boolean.TRUE, changeTokenHolder,
252                                 session.getObjectFactory().convertContentStream(contentStream), null);
253 
254                 // the following calls should append, not overwrite
255                 overwrite = false;
256             } else {
257                 // append to content stream
258                 session.getBinding()
259                         .getObjectService()
260                         .appendContentStream(repId, objectIdHolder, changeTokenHolder,
261                                 session.getObjectFactory().convertContentStream(contentStream), isLastChunk, null);
262             }
263         } catch (Exception e) {
264             isClosed = true;
265             throw new IOException("Could not append to document: " + e.toString(), e);
266         }
267 
268         if (objectIdHolder.getValue() != null) {
269             documentId = objectIdHolder.getValue();
270         }
271         if (changeTokenHolder != null) {
272             changeToken = changeTokenHolder.getValue();
273         }
274 
275         pos = 0;
276     }
277 
278     @Override
279     public void close() throws IOException {
280         close(true);
281     }
282 
283     /**
284      * Closes the stream.
285      * 
286      * @param isLastChunk
287      *            indicates if this is the last chunk of the content
288      * @throws IOException
289      *             if an error occurs
290      */
291     public void close(boolean isLastChunk) throws IOException {
292         if (isClosed) {
293             throw new IOException("Stream is already closed!");
294         }
295 
296         if (pos > 0) {
297             flush(isLastChunk);
298         }
299     }
300 }