View Javadoc
1   /*
2    *   Licensed to the Apache Software Foundation (ASF) under one
3    *   or more contributor license agreements.  See the NOTICE file
4    *   distributed with this work for additional information
5    *   regarding copyright ownership.  The ASF licenses this file
6    *   to you under the Apache License, Version 2.0 (the
7    *   "License"); you may not use this file except in compliance
8    *   with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing,
13   *   software distributed under the License is distributed on an
14   *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *   KIND, either express or implied.  See the License for the
16   *   specific language governing permissions and limitations
17   *   under the License.
18   *
19   */
20  
21  package org.apache.directory.ldap.client.api.future;
22  
23  
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.ExecutionException;
26  import java.util.concurrent.Future;
27  import java.util.concurrent.LinkedBlockingQueue;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.TimeoutException;
30  
31  import org.apache.directory.api.ldap.model.message.Response;
32  import org.apache.directory.ldap.client.api.LdapConnection;
33  
34  
35  /**
36   * A Future implementation used in LdapConnection operations.
37   *
38   * @param <R> The result type returned by this Future's <tt>get</tt> method
39   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
40   */
41  public class ResponseFuture<R extends Response> implements Future<Response>
42  {
43      /** the blocking queue holding LDAP responses */
44      protected BlockingQueue<R> queue;
45  
46      /** flag to determine if this future is cancelled */
47      protected boolean cancelled = false;
48  
49      /** If the request has been cancelled because of an exception  it will be stored here */
50      protected Throwable cause;
51  
52      /** The messageID for this future */
53      protected int messageId;
54  
55      /** The connection used by the request */
56      protected LdapConnection connection;
57  
58  
59      /**
60       * Creates a new instance of ResponseFuture.
61       *
62       * @param connection The LdapConnection used by the request
63       * @param messageId The associated message ID
64       */
65      public ResponseFuture( LdapConnection connection, int messageId )
66      {
67          queue = new LinkedBlockingQueue<R>();
68          this.messageId = messageId;
69          this.connection = connection;
70      }
71  
72  
73      /**
74       * {@inheritDoc}
75       */
76      public boolean cancel( boolean mayInterruptIfRunning )
77      {
78          if ( cancelled )
79          {
80              return cancelled;
81          }
82  
83          // set the cancel flag first
84          cancelled = true;
85  
86          // Send an abandonRequest only if this future exists
87          if ( connection.doesFutureExistFor( messageId ) )
88          {
89              connection.abandon( messageId );
90          }
91  
92          // then clear the queue, cause the might be some incoming messages before this abandon request
93          // hits the server
94          queue.clear();
95  
96          return cancelled;
97      }
98  
99  
100     /**
101      * {@inheritDoc}
102      * @throws InterruptedException if the operation has been cancelled by client
103      */
104     public R get() throws InterruptedException, ExecutionException
105     {
106         R response = null;
107 
108         response = queue.take();
109 
110         return response;
111     }
112 
113 
114     /**
115      * {@inheritDoc}
116      * @throws InterruptedException if the operation has been cancelled by client
117      */
118     public void set( R response ) throws InterruptedException, ExecutionException
119     {
120         queue.add( response );
121     }
122 
123 
124     /**
125      * {@inheritDoc}
126      * @throws InterruptedException if the operation has been cancelled by client
127      */
128     public R get( long timeout, TimeUnit unit ) throws InterruptedException, ExecutionException, TimeoutException
129     {
130         R response = queue.poll( timeout, unit );
131 
132         return response;
133     }
134 
135 
136     /**
137      * {@inheritDoc}
138      */
139     public boolean isCancelled()
140     {
141         return cancelled;
142     }
143 
144 
145     /**
146      * This operation is not supported in this implementation of Future.
147      * 
148      * {@inheritDoc}
149      */
150     public boolean isDone()
151     {
152         throw new UnsupportedOperationException( "Operation not supported" );
153     }
154 
155 
156     /**
157      * @return the cause
158      */
159     public Throwable getCause()
160     {
161         return cause;
162     }
163 
164 
165     /**
166      * Associate a cause to the ResponseFuture
167      * @param cause the cause to set
168      */
169     public void setCause( Throwable cause )
170     {
171         this.cause = cause;
172     }
173 
174 
175     /**
176      * Cancel the Future
177      *
178      */
179     public void cancel()
180     {
181         // set the cancel flag first
182         cancelled = true;
183     }
184 
185 
186     /**
187      * {@inheritDoc}
188      */
189     public String toString()
190     {
191         StringBuilder sb = new StringBuilder();
192 
193         sb.append( "[msgId : " ).append( messageId ).append( ", " );
194         sb.append( "size : " ).append( queue.size() ).append( ", " );
195         sb.append( "Canceled :" ).append( cancelled ).append( "]" );
196 
197         return sb.toString();
198     }
199 }