1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.chemistry.opencmis.client.runtime.async;
20
21 import java.io.OutputStream;
22 import java.math.BigInteger;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.TimeUnit;
30
31 import org.apache.chemistry.opencmis.client.api.AsyncSession;
32 import org.apache.chemistry.opencmis.client.api.CmisObject;
33 import org.apache.chemistry.opencmis.client.api.Document;
34 import org.apache.chemistry.opencmis.client.api.ObjectId;
35 import org.apache.chemistry.opencmis.client.api.ObjectType;
36 import org.apache.chemistry.opencmis.client.api.OperationContext;
37 import org.apache.chemistry.opencmis.client.api.Policy;
38 import org.apache.chemistry.opencmis.client.api.Session;
39 import org.apache.chemistry.opencmis.commons.data.Ace;
40 import org.apache.chemistry.opencmis.commons.data.Acl;
41 import org.apache.chemistry.opencmis.commons.data.ContentStream;
42 import org.apache.chemistry.opencmis.commons.definitions.TypeDefinition;
43 import org.apache.chemistry.opencmis.commons.enums.AclPropagation;
44 import org.apache.chemistry.opencmis.commons.enums.UnfileObject;
45 import org.apache.chemistry.opencmis.commons.enums.VersioningState;
46 import org.apache.chemistry.opencmis.commons.impl.IOUtils;
47
48
49
50
51
52 public abstract class AbstractExecutorServiceAsyncSession<E extends ExecutorService> extends AbstractAsyncSession {
53
54 public AbstractExecutorServiceAsyncSession(Session session) {
55 super(session);
56 }
57
58
59
60
61 public abstract E getExecutorService();
62
63
64
65
66 public abstract static class SessionCallable<V> implements Callable<V> {
67
68 protected Session session;
69
70 public SessionCallable(Session session) {
71 if (session == null) {
72 throw new IllegalArgumentException("Session must be set!");
73 }
74
75 this.session = session;
76 }
77 }
78
79
80
81
82
83
84 public <T> Future<T> submit(SessionCallable<T> task) {
85 return getExecutorService().submit(task);
86 }
87
88
89
90 protected static class GetTypeDefinitonCallable extends SessionCallable<ObjectType> {
91 private String typeId;
92
93 public GetTypeDefinitonCallable(Session session, String typeId) {
94 super(session);
95 this.typeId = typeId;
96 }
97
98 @Override
99 public ObjectType call() throws Exception {
100 return session.getTypeDefinition(typeId);
101 }
102 }
103
104 @Override
105 public Future<ObjectType> getTypeDefinition(String typeId) {
106 return submit(new GetTypeDefinitonCallable(session, typeId));
107 }
108
109 protected static class CreateTypeCallable extends SessionCallable<ObjectType> {
110 private TypeDefinition type;
111
112 public CreateTypeCallable(Session session, TypeDefinition type) {
113 super(session);
114 this.type = type;
115 }
116
117 @Override
118 public ObjectType call() throws Exception {
119 return session.createType(type);
120 }
121 }
122
123 @Override
124 public Future<ObjectType> createType(TypeDefinition type) {
125 return submit(new CreateTypeCallable(session, type));
126 }
127
128 protected static class UpdateTypeCallable extends SessionCallable<ObjectType> {
129 private TypeDefinition type;
130
131 public UpdateTypeCallable(Session session, TypeDefinition type) {
132 super(session);
133 this.type = type;
134 }
135
136 @Override
137 public ObjectType call() throws Exception {
138 return session.updateType(type);
139 }
140 }
141
142 @Override
143 public Future<ObjectType> updateType(TypeDefinition type) {
144 return submit(new UpdateTypeCallable(session, type));
145 }
146
147 protected static class DeleteTypeCallable extends SessionCallable<Object> {
148 private String typeId;
149
150 public DeleteTypeCallable(Session session, String typeId) {
151 super(session);
152 this.typeId = typeId;
153 }
154
155 @Override
156 public Object call() throws Exception {
157 session.deleteType(typeId);
158 return null;
159 }
160 }
161
162 @Override
163 public Future<?> deleteType(String typeId) {
164 return submit(new DeleteTypeCallable(session, typeId));
165 }
166
167
168
169 protected static class GetObjectCallable extends SessionCallable<CmisObject> {
170 private ObjectId objectId;
171 private String objectIdStr;
172 private OperationContext context;
173
174 public GetObjectCallable(Session session, ObjectId objectId) {
175 this(session, objectId, null);
176 }
177
178 public GetObjectCallable(Session session, ObjectId objectId, OperationContext context) {
179 super(session);
180 this.objectId = objectId;
181 this.context = context;
182 }
183
184 public GetObjectCallable(Session session, String objectId) {
185 this(session, objectId, null);
186 }
187
188 public GetObjectCallable(Session session, String objectId, OperationContext context) {
189 super(session);
190 this.objectIdStr = objectId;
191 this.context = context;
192 }
193
194 @Override
195 public CmisObject call() throws Exception {
196 if (objectId != null) {
197 if (context != null) {
198 return session.getObject(objectId, context);
199 } else {
200 return session.getObject(objectId);
201 }
202 } else {
203 if (context != null) {
204 return session.getObject(objectIdStr, context);
205 } else {
206 return session.getObject(objectIdStr);
207 }
208 }
209 }
210 }
211
212 @Override
213 public Future<CmisObject> getObject(ObjectId objectId, OperationContext context) {
214 return submit(new GetObjectCallable(session, objectId, context));
215 }
216
217 @Override
218 public Future<CmisObject> getObject(String objectId, OperationContext context) {
219 return submit(new GetObjectCallable(session, objectId, context));
220 }
221
222 protected static class GetObjectByPathCallable extends SessionCallable<CmisObject> {
223 private String path;
224 private String parentPath;
225 private String name;
226 private OperationContext context;
227
228 public GetObjectByPathCallable(Session session, String path) {
229 this(session, path, (OperationContext) null);
230 }
231
232 public GetObjectByPathCallable(Session session, String path, OperationContext context) {
233 super(session);
234 this.path = path;
235 this.context = context;
236 }
237
238 public GetObjectByPathCallable(Session session, String parentPath, String name) {
239 this(session, parentPath, name, null);
240 }
241
242 public GetObjectByPathCallable(Session session, String parentPath, String name, OperationContext context) {
243 super(session);
244 this.parentPath = parentPath;
245 this.name = name;
246 this.context = context;
247 }
248
249 @Override
250 public CmisObject call() throws Exception {
251 if (parentPath != null) {
252 if (context != null) {
253 return session.getObjectByPath(parentPath, name, context);
254 } else {
255 return session.getObjectByPath(parentPath, name);
256 }
257 } else {
258 if (context != null) {
259 return session.getObjectByPath(path, context);
260 } else {
261 return session.getObjectByPath(path);
262 }
263 }
264 }
265 }
266
267 @Override
268 public Future<CmisObject> getObjectByPath(String path, OperationContext context) {
269 return submit(new GetObjectByPathCallable(session, path, context));
270 }
271
272 @Override
273 public Future<CmisObject> getObjectByPath(String parentPath, String name, OperationContext context) {
274 return submit(new GetObjectByPathCallable(session, parentPath, name, context));
275 }
276
277 protected static class GetLatestDocumentVersionCallable extends SessionCallable<Document> {
278 private ObjectId objectId;
279 private String objectIdStr;
280 private boolean major;
281 private OperationContext context;
282
283 public GetLatestDocumentVersionCallable(Session session, ObjectId objectId) {
284 this(session, objectId, false, null);
285 }
286
287 public GetLatestDocumentVersionCallable(Session session, ObjectId objectId, boolean major,
288 OperationContext context) {
289 super(session);
290 this.objectId = objectId;
291 this.major = major;
292 this.context = context;
293 }
294
295 public GetLatestDocumentVersionCallable(Session session, String objectId) {
296 this(session, objectId, false, null);
297 }
298
299 public GetLatestDocumentVersionCallable(Session session, String objectId, boolean major,
300 OperationContext context) {
301 super(session);
302 this.objectIdStr = objectId;
303 this.major = major;
304 this.context = context;
305 }
306
307 @Override
308 public Document call() throws Exception {
309 if (objectId != null) {
310 if (context != null) {
311 return session.getLatestDocumentVersion(objectId, major, context);
312 } else {
313 return session.getLatestDocumentVersion(objectId);
314 }
315 } else {
316 if (context != null) {
317 return session.getLatestDocumentVersion(objectIdStr, major, context);
318 } else {
319 return session.getLatestDocumentVersion(objectIdStr);
320 }
321 }
322 }
323 }
324
325 @Override
326 public Future<Document> getLatestDocumentVersion(ObjectId objectId, boolean major, OperationContext context) {
327 return submit(new GetLatestDocumentVersionCallable(session, objectId, major, context));
328 }
329
330 @Override
331 public Future<Document> getLatestDocumentVersion(String objectId, boolean major, OperationContext context) {
332 return submit(new GetLatestDocumentVersionCallable(session, objectId, major, context));
333 }
334
335
336
337 protected static class CreateDocumentCallable extends SessionCallable<ObjectId> {
338 private Map<String, ?> properties;
339 private ObjectId folderId;
340 private ContentStream contentStream;
341 private VersioningState versioningState;
342 private List<Policy> policies;
343 private List<Ace> addAces;
344 private List<Ace> removeAces;
345
346 public CreateDocumentCallable(Session session, Map<String, ?> properties, ObjectId folderId,
347 ContentStream contentStream, VersioningState versioningState, List<Policy> policies, List<Ace> addAces,
348 List<Ace> removeAces) {
349 super(session);
350 this.properties = properties;
351 this.folderId = folderId;
352 this.contentStream = contentStream;
353 this.versioningState = versioningState;
354 this.policies = policies;
355 this.addAces = addAces;
356 this.removeAces = removeAces;
357 }
358
359 @Override
360 public ObjectId call() throws Exception {
361 return session.createDocument(properties, folderId, contentStream, versioningState, policies, addAces,
362 removeAces);
363 }
364 }
365
366 @Override
367 public Future<ObjectId> createDocument(Map<String, ?> properties, ObjectId folderId, ContentStream contentStream,
368 VersioningState versioningState, List<Policy> policies, List<Ace> addAces, List<Ace> removeAces) {
369 return submit(new CreateDocumentCallable(session, properties, folderId, contentStream, versioningState,
370 policies, addAces, removeAces));
371 }
372
373 protected static class CreateDocumentFromSourceCallable extends SessionCallable<ObjectId> {
374 private ObjectId source;
375 private Map<String, ?> properties;
376 private ObjectId folderId;
377 private VersioningState versioningState;
378 private List<Policy> policies;
379 private List<Ace> addAces;
380 private List<Ace> removeAces;
381
382 public CreateDocumentFromSourceCallable(Session session, ObjectId source, Map<String, ?> properties,
383 ObjectId folderId, VersioningState versioningState, List<Policy> policies, List<Ace> addAces,
384 List<Ace> removeAces) {
385 super(session);
386 this.source = source;
387 this.properties = properties;
388 this.folderId = folderId;
389 this.versioningState = versioningState;
390 this.policies = policies;
391 this.addAces = addAces;
392 this.removeAces = removeAces;
393 }
394
395 @Override
396 public ObjectId call() throws Exception {
397 return session.createDocumentFromSource(source, properties, folderId, versioningState, policies, addAces,
398 removeAces);
399 }
400 }
401
402 @Override
403 public Future<ObjectId> createDocumentFromSource(ObjectId source, Map<String, ?> properties, ObjectId folderId,
404 VersioningState versioningState, List<Policy> policies, List<Ace> addAces, List<Ace> removeAces) {
405 return submit(new CreateDocumentFromSourceCallable(session, source, properties, folderId, versioningState,
406 policies, addAces, removeAces));
407 }
408
409 protected static class CreateFolderCallable extends SessionCallable<ObjectId> {
410
411 private Map<String, ?> properties;
412 private ObjectId folderId;
413 private List<Policy> policies;
414 private List<Ace> addAces;
415 private List<Ace> removeAces;
416
417 public CreateFolderCallable(Session session, Map<String, ?> properties, ObjectId folderId,
418 List<Policy> policies, List<Ace> addAces, List<Ace> removeAces) {
419 super(session);
420 this.properties = properties;
421 this.folderId = folderId;
422 this.policies = policies;
423 this.addAces = addAces;
424 this.removeAces = removeAces;
425 }
426
427 @Override
428 public ObjectId call() throws Exception {
429 return session.createFolder(properties, folderId, policies, addAces, removeAces);
430 }
431 }
432
433 @Override
434 public Future<ObjectId> createFolder(Map<String, ?> properties, ObjectId folderId, List<Policy> policies,
435 List<Ace> addAces, List<Ace> removeAces) {
436 return submit(new CreateFolderCallable(session, properties, folderId, policies, addAces, removeAces));
437 }
438
439 protected static class CreatePolicyCallable extends SessionCallable<ObjectId> {
440 private Map<String, ?> properties;
441 private ObjectId folderId;
442 private List<Policy> policies;
443 private List<Ace> addAces;
444 private List<Ace> removeAces;
445
446 public CreatePolicyCallable(Session session, Map<String, ?> properties, ObjectId folderId,
447 List<Policy> policies, List<Ace> addAces, List<Ace> removeAces) {
448 super(session);
449 this.properties = properties;
450 this.folderId = folderId;
451 this.policies = policies;
452 this.addAces = addAces;
453 this.removeAces = removeAces;
454 }
455
456 @Override
457 public ObjectId call() throws Exception {
458 return session.createPolicy(properties, folderId, policies, addAces, removeAces);
459 }
460 }
461
462 @Override
463 public Future<ObjectId> createPolicy(Map<String, ?> properties, ObjectId folderId, List<Policy> policies,
464 List<Ace> addAces, List<Ace> removeAces) {
465 return submit(new CreatePolicyCallable(session, properties, folderId, policies, addAces, removeAces));
466 }
467
468 protected static class CreateItemCallable extends SessionCallable<ObjectId> {
469 private Map<String, ?> properties;
470 private ObjectId folderId;
471 private List<Policy> policies;
472 private List<Ace> addAces;
473 private List<Ace> removeAces;
474
475 public CreateItemCallable(Session session, Map<String, ?> properties, ObjectId folderId, List<Policy> policies,
476 List<Ace> addAces, List<Ace> removeAces) {
477 super(session);
478 this.properties = properties;
479 this.folderId = folderId;
480 this.policies = policies;
481 this.addAces = addAces;
482 this.removeAces = removeAces;
483 }
484
485 @Override
486 public ObjectId call() throws Exception {
487 return session.createItem(properties, folderId, policies, addAces, removeAces);
488 }
489 }
490
491 @Override
492 public Future<ObjectId> createItem(Map<String, ?> properties, ObjectId folderId, List<Policy> policies,
493 List<Ace> addAces, List<Ace> removeAces) {
494 return submit(new CreateItemCallable(session, properties, folderId, policies, addAces, removeAces));
495 }
496
497 protected static class CreateRelationshipCallable extends SessionCallable<ObjectId> {
498 private Map<String, ?> properties;
499 private List<Policy> policies;
500 private List<Ace> addAces;
501 private List<Ace> removeAces;
502
503 public CreateRelationshipCallable(Session session, Map<String, ?> properties, List<Policy> policies,
504 List<Ace> addAces, List<Ace> removeAces) {
505 super(session);
506 this.properties = properties;
507 this.policies = policies;
508 this.addAces = addAces;
509 this.removeAces = removeAces;
510 }
511
512 @Override
513 public ObjectId call() throws Exception {
514 return session.createRelationship(properties, policies, addAces, removeAces);
515 }
516 }
517
518 @Override
519 public Future<ObjectId> createRelationship(Map<String, ?> properties, List<Policy> policies, List<Ace> addAces,
520 List<Ace> removeAces) {
521 return submit(new CreateRelationshipCallable(session, properties, policies, addAces, removeAces));
522 }
523
524
525
526 protected static class GetContentStreamCallable extends SessionCallable<ContentStream> {
527 private ObjectId docId;
528 private String streamId;
529 private BigInteger offset;
530 private BigInteger length;
531
532 public GetContentStreamCallable(Session session, ObjectId docId, String streamId, BigInteger offset,
533 BigInteger length) {
534 super(session);
535 this.docId = docId;
536 this.streamId = streamId;
537 this.offset = offset;
538 this.length = length;
539 }
540
541 @Override
542 public ContentStream call() throws Exception {
543 return session.getContentStream(docId, streamId, offset, length);
544 }
545 }
546
547 @Override
548 public Future<ContentStream> getContentStream(ObjectId docId, String streamId, BigInteger offset, BigInteger length) {
549 return submit(new GetContentStreamCallable(session, docId, streamId, offset, length));
550 }
551
552 protected static class StoreContentStreamCallable extends GetContentStreamCallable {
553 private OutputStream target;
554
555 public StoreContentStreamCallable(Session session, ObjectId docId, String streamId, BigInteger offset,
556 BigInteger length, OutputStream target) {
557 super(session, docId, streamId, offset, length);
558 this.target = target;
559 }
560
561 @Override
562 public ContentStream call() throws Exception {
563 ContentStream contentStream = super.call();
564 try {
565 if (contentStream != null && contentStream.getStream() != null && target != null) {
566 IOUtils.copy(contentStream.getStream(), target);
567 }
568 } finally {
569 IOUtils.closeQuietly(contentStream);
570 }
571
572 return contentStream;
573 }
574 }
575
576 @Override
577 public Future<ContentStream> storeContentStream(ObjectId docId, String streamId, BigInteger offset,
578 BigInteger length, OutputStream target) {
579 return submit(new StoreContentStreamCallable(session, docId, streamId, offset, length, target));
580 }
581
582
583
584 protected static class DeleteCallable extends SessionCallable<Object> {
585 private ObjectId objectId;
586 private boolean allVersions;
587
588 public DeleteCallable(Session session, ObjectId objectId, boolean allVersions) {
589 super(session);
590 this.objectId = objectId;
591 this.allVersions = allVersions;
592 }
593
594 @Override
595 public Object call() throws Exception {
596 session.delete(objectId, allVersions);
597 return null;
598 }
599 }
600
601 @Override
602 public Future<?> delete(ObjectId objectId, boolean allVersions) {
603 return submit(new DeleteCallable(session, objectId, allVersions));
604 }
605
606 protected static class DeleteTreeCallable extends SessionCallable<List<String>> {
607 private ObjectId folderId;
608 private boolean allVersions;
609 private UnfileObject unfile;
610 private boolean continueOnFailure;
611
612 public DeleteTreeCallable(Session session, ObjectId folderId, boolean allVersions, UnfileObject unfile,
613 boolean continueOnFailure) {
614 super(session);
615 this.folderId = folderId;
616 this.allVersions = allVersions;
617 this.unfile = unfile;
618 this.continueOnFailure = continueOnFailure;
619 }
620
621 @Override
622 public List<String> call() throws Exception {
623 return session.deleteTree(folderId, allVersions, unfile, continueOnFailure);
624 }
625 }
626
627 @Override
628 public Future<List<String>> deleteTree(ObjectId folderId, boolean allVersions, UnfileObject unfile,
629 boolean continueOnFailure) {
630 return submit(new DeleteTreeCallable(session, folderId, allVersions, unfile, continueOnFailure));
631 }
632
633
634
635 protected static class ApplyAclCallable extends SessionCallable<Acl> {
636 private ObjectId objectId;
637 private List<Ace> addAces;
638 private List<Ace> removeAces;
639 private AclPropagation aclPropagation;
640
641 public ApplyAclCallable(Session session, ObjectId objectId, List<Ace> addAces, List<Ace> removeAces,
642 AclPropagation aclPropagation) {
643 super(session);
644 this.objectId = objectId;
645 this.addAces = addAces;
646 this.removeAces = removeAces;
647 this.aclPropagation = aclPropagation;
648 }
649
650 @Override
651 public Acl call() throws Exception {
652 return session.applyAcl(objectId, addAces, removeAces, aclPropagation);
653 }
654 }
655
656 @Override
657 public Future<Acl> applyAcl(ObjectId objectId, List<Ace> addAces, List<Ace> removeAces,
658 AclPropagation aclPropagation) {
659 return getExecutorService()
660 .submit(new ApplyAclCallable(session, objectId, addAces, removeAces, aclPropagation));
661 }
662
663 protected static class SetAclCallable extends SessionCallable<Acl> {
664 private ObjectId objectId;
665 private List<Ace> aces;
666
667 public SetAclCallable(Session session, ObjectId objectId, List<Ace> aces) {
668 super(session);
669 this.objectId = objectId;
670 this.aces = aces;
671 }
672
673 @Override
674 public Acl call() throws Exception {
675 return session.setAcl(objectId, aces);
676 }
677 }
678
679 @Override
680 public Future<Acl> setAcl(ObjectId objectId, List<Ace> aces) {
681 return submit(new SetAclCallable(session, objectId, aces));
682 }
683
684
685
686 protected static class ApplyPolicyCallable extends SessionCallable<Object> {
687 private ObjectId objectId;
688 private ObjectId[] policyIds;
689
690 public ApplyPolicyCallable(Session session, ObjectId objectId, ObjectId... policyIds) {
691 super(session);
692 this.objectId = objectId;
693 this.policyIds = policyIds;
694 }
695
696 @Override
697 public Object call() throws Exception {
698 session.applyPolicy(objectId, policyIds);
699 return null;
700 }
701 }
702
703 @Override
704 public Future<?> applyPolicy(ObjectId objectId, ObjectId... policyIds) {
705 return submit(new ApplyPolicyCallable(session, objectId, policyIds));
706 }
707
708 protected static class RemovePolicyCallable extends SessionCallable<Object> {
709 private ObjectId objectId;
710 private ObjectId[] policyIds;
711
712 public RemovePolicyCallable(Session session, ObjectId objectId, ObjectId... policyIds) {
713 super(session);
714 this.objectId = objectId;
715 this.policyIds = policyIds;
716 }
717
718 @Override
719 public Object call() throws Exception {
720 session.removePolicy(objectId, policyIds);
721 return null;
722 }
723 }
724
725 @Override
726 public Future<?> removePolicy(ObjectId objectId, ObjectId... policyIds) {
727 return submit(new RemovePolicyCallable(session, objectId, policyIds));
728 }
729
730
731
732
733
734
735 public void shutdown() {
736 if (getExecutorService() != null) {
737 getExecutorService().shutdown();
738 }
739 }
740
741
742
743
744 public List<Runnable> shutdownNow() {
745 if (getExecutorService() != null) {
746 return getExecutorService().shutdownNow();
747 }
748
749 return Collections.emptyList();
750 }
751
752
753
754
755 public boolean isShutdown() {
756 if (getExecutorService() != null) {
757 return getExecutorService().isShutdown();
758 }
759
760 return true;
761 }
762
763
764
765
766 public boolean isTerminated() {
767 if (getExecutorService() != null) {
768 return getExecutorService().isTerminated();
769 }
770
771 return true;
772 }
773
774
775
776
777 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
778 if (getExecutorService() != null) {
779 return getExecutorService().awaitTermination(timeout, unit);
780 }
781
782 return true;
783 }
784 }