View Javadoc
1   /*
2    *   Licensed to the Apache Software Foundation (ASF) under one
3    *   or more contributor license agreements.  See the NOTICE file
4    *   distributed with this work for additional information
5    *   regarding copyright ownership.  The ASF licenses this file
6    *   to you under the Apache License, Version 2.0 (the
7    *   "License"); you may not use this file except in compliance
8    *   with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing,
13   *   software distributed under the License is distributed on an
14   *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *   KIND, either express or implied.  See the License for the
16   *   specific language governing permissions and limitations
17   *   under the License.
18   *
19   */
20  
21  package org.apache.directory.ldap.client.api.future;
22  
23  
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.Future;
26  import java.util.concurrent.LinkedBlockingQueue;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.apache.directory.api.ldap.model.message.Response;
30  import org.apache.directory.ldap.client.api.LdapConnection;
31  
32  
33  /**
34   * A Future implementation used in LdapConnection operations.
35   *
36   * @param <R> The result type returned by this Future's <tt>get</tt> method
37   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
38   */
39  public class ResponseFuture<R extends Response> implements Future<Response>
40  {
41      /** the blocking queue holding LDAP responses */
42      protected BlockingQueue<R> queue;
43  
44      /** flag to determine if this future is cancelled */
45      protected boolean cancelled = false;
46  
47      /** If the request has been cancelled because of an exception  it will be stored here */
48      protected Throwable cause;
49  
50      /** The messageID for this future */
51      protected int messageId;
52  
53      /** The connection used by the request */
54      protected LdapConnection connection;
55  
56  
57      /**
58       * Creates a new instance of ResponseFuture.
59       *
60       * @param connection The LdapConnection used by the request
61       * @param messageId The associated message ID
62       */
63      public ResponseFuture( LdapConnection connection, int messageId )
64      {
65          queue = new LinkedBlockingQueue<>();
66          this.messageId = messageId;
67          this.connection = connection;
68      }
69  
70  
71      /**
72       * {@inheritDoc}
73       */
74      @Override
75      public boolean cancel( boolean mayInterruptIfRunning )
76      {
77          if ( cancelled )
78          {
79              return cancelled;
80          }
81  
82          // set the cancel flag first
83          cancelled = true;
84  
85          // Send an abandonRequest only if this future exists
86          if ( !connection.isRequestCompleted( messageId ) )
87          {
88              connection.abandon( messageId );
89          }
90  
91          // then clear the queue, cause the might be some incoming messages before this abandon request
92          // hits the server
93          queue.clear();
94  
95          return cancelled;
96      }
97  
98  
99      /**
100      * {@inheritDoc}
101      * @throws InterruptedException if the operation has been cancelled by client
102      */
103     @Override
104     public R get() throws InterruptedException
105     {
106         return queue.take();
107     }
108 
109 
110     /**
111      * Set the associated Response in this Future
112      * 
113      * @param response The response to add into the Future
114      * @throws InterruptedException if the operation has been cancelled by client
115      */
116     public void set( R response ) throws InterruptedException
117     {
118         queue.add( response );
119     }
120 
121 
122     /**
123      * {@inheritDoc}
124      * @throws InterruptedException if the operation has been cancelled by client
125      */
126     @Override
127     public R get( long timeout, TimeUnit unit ) throws InterruptedException
128     {
129         return queue.poll( timeout, unit );
130     }
131 
132 
133     /**
134      * {@inheritDoc}
135      */
136     @Override
137     public boolean isCancelled()
138     {
139         return cancelled;
140     }
141 
142 
143     /**
144      * This operation is not supported in this implementation of Future.
145      * 
146      * {@inheritDoc}
147      */
148     @Override
149     public boolean isDone()
150     {
151         throw new UnsupportedOperationException( "Operation not supported" );
152     }
153 
154 
155     /**
156      * @return the cause
157      */
158     public Throwable getCause()
159     {
160         return cause;
161     }
162 
163 
164     /**
165      * Associate a cause to the ResponseFuture
166      * @param cause the cause to set
167      */
168     public void setCause( Throwable cause )
169     {
170         this.cause = cause;
171     }
172 
173 
174     /**
175      * Cancel the Future
176      *
177      */
178     public void cancel()
179     {
180         // set the cancel flag first
181         cancelled = true;
182     }
183 
184 
185     /**
186      * {@inheritDoc}
187      */
188     @Override
189     public String toString()
190     {
191         StringBuilder sb = new StringBuilder();
192 
193         sb.append( "[msgId : " ).append( messageId ).append( ", " );
194         sb.append( "size : " ).append( queue.size() ).append( ", " );
195         sb.append( "Canceled :" ).append( cancelled ).append( "]" );
196 
197         return sb.toString();
198     }
199 }