001/*
002 *   Licensed to the Apache Software Foundation (ASF) under one
003 *   or more contributor license agreements.  See the NOTICE file
004 *   distributed with this work for additional information
005 *   regarding copyright ownership.  The ASF licenses this file
006 *   to you under the Apache License, Version 2.0 (the
007 *   "License"); you may not use this file except in compliance
008 *   with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *   Unless required by applicable law or agreed to in writing,
013 *   software distributed under the License is distributed on an
014 *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *   KIND, either express or implied.  See the License for the
016 *   specific language governing permissions and limitations
017 *   under the License.
018 *
019 */
020
021package org.apache.directory.ldap.client.api.future;
022
023
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.Future;
027import java.util.concurrent.LinkedBlockingQueue;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.TimeoutException;
030
031import org.apache.directory.api.ldap.model.message.Response;
032import org.apache.directory.ldap.client.api.LdapConnection;
033
034
035/**
036 * A Future implementation used in LdapConnection operations.
037 *
038 * @param <R> The result type returned by this Future's <tt>get</tt> method
039 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
040 */
041public class ResponseFuture<R extends Response> implements Future<Response>
042{
043    /** the blocking queue holding LDAP responses */
044    protected BlockingQueue<R> queue;
045
046    /** flag to determine if this future is cancelled */
047    protected boolean cancelled = false;
048
049    /** If the request has been cancelled because of an exception  it will be stored here */
050    protected Throwable cause;
051
052    /** The messageID for this future */
053    protected int messageId;
054
055    /** The connection used by the request */
056    protected LdapConnection connection;
057
058
059    /**
060     * Creates a new instance of ResponseFuture.
061     *
062     * @param connection The LdapConnection used by the request
063     * @param messageId The associated message ID
064     */
065    public ResponseFuture( LdapConnection connection, int messageId )
066    {
067        queue = new LinkedBlockingQueue<R>();
068        this.messageId = messageId;
069        this.connection = connection;
070    }
071
072
073    /**
074     * {@inheritDoc}
075     */
076    public boolean cancel( boolean mayInterruptIfRunning )
077    {
078        if ( cancelled )
079        {
080            return cancelled;
081        }
082
083        // set the cancel flag first
084        cancelled = true;
085
086        // Send an abandonRequest only if this future exists
087        if ( connection.doesFutureExistFor( messageId ) )
088        {
089            connection.abandon( messageId );
090        }
091
092        // then clear the queue, cause the might be some incoming messages before this abandon request
093        // hits the server
094        queue.clear();
095
096        return cancelled;
097    }
098
099
100    /**
101     * {@inheritDoc}
102     * @throws InterruptedException if the operation has been cancelled by client
103     */
104    public R get() throws InterruptedException, ExecutionException
105    {
106        R response = null;
107
108        response = queue.take();
109
110        return response;
111    }
112
113
114    /**
115     * {@inheritDoc}
116     * @throws InterruptedException if the operation has been cancelled by client
117     */
118    public void set( R response ) throws InterruptedException, ExecutionException
119    {
120        queue.add( response );
121    }
122
123
124    /**
125     * {@inheritDoc}
126     * @throws InterruptedException if the operation has been cancelled by client
127     */
128    public R get( long timeout, TimeUnit unit ) throws InterruptedException, ExecutionException, TimeoutException
129    {
130        R response = queue.poll( timeout, unit );
131
132        return response;
133    }
134
135
136    /**
137     * {@inheritDoc}
138     */
139    public boolean isCancelled()
140    {
141        return cancelled;
142    }
143
144
145    /**
146     * This operation is not supported in this implementation of Future.
147     * 
148     * {@inheritDoc}
149     */
150    public boolean isDone()
151    {
152        throw new UnsupportedOperationException( "Operation not supported" );
153    }
154
155
156    /**
157     * @return the cause
158     */
159    public Throwable getCause()
160    {
161        return cause;
162    }
163
164
165    /**
166     * Associate a cause to the ResponseFuture
167     * @param cause the cause to set
168     */
169    public void setCause( Throwable cause )
170    {
171        this.cause = cause;
172    }
173
174
175    /**
176     * Cancel the Future
177     *
178     */
179    public void cancel()
180    {
181        // set the cancel flag first
182        cancelled = true;
183    }
184
185
186    /**
187     * {@inheritDoc}
188     */
189    public String toString()
190    {
191        StringBuilder sb = new StringBuilder();
192
193        sb.append( "[msgId : " ).append( messageId ).append( ", " );
194        sb.append( "size : " ).append( queue.size() ).append( ", " );
195        sb.append( "Canceled :" ).append( cancelled ).append( "]" );
196
197        return sb.toString();
198    }
199}