001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020 021package org.apache.directory.ldap.client.api.future; 022 023 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.Future; 027import java.util.concurrent.LinkedBlockingQueue; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.TimeoutException; 030 031import org.apache.directory.api.ldap.model.message.Response; 032import org.apache.directory.ldap.client.api.LdapConnection; 033 034 035/** 036 * A Future implementation used in LdapConnection operations. 037 * 038 * @param <R> The result type returned by this Future's <tt>get</tt> method 039 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a> 040 */ 041public class ResponseFuture<R extends Response> implements Future<Response> 042{ 043 /** the blocking queue holding LDAP responses */ 044 protected BlockingQueue<R> queue; 045 046 /** flag to determine if this future is cancelled */ 047 protected boolean cancelled = false; 048 049 /** If the request has been cancelled because of an exception it will be stored here */ 050 protected Throwable cause; 051 052 /** The messageID for this future */ 053 protected int messageId; 054 055 /** The connection used by the request */ 056 protected LdapConnection connection; 057 058 059 /** 060 * Creates a new instance of ResponseFuture. 061 * 062 * @param connection The LdapConnection used by the request 063 * @param messageId The associated message ID 064 */ 065 public ResponseFuture( LdapConnection connection, int messageId ) 066 { 067 queue = new LinkedBlockingQueue<R>(); 068 this.messageId = messageId; 069 this.connection = connection; 070 } 071 072 073 /** 074 * {@inheritDoc} 075 */ 076 public boolean cancel( boolean mayInterruptIfRunning ) 077 { 078 if ( cancelled ) 079 { 080 return cancelled; 081 } 082 083 // set the cancel flag first 084 cancelled = true; 085 086 // Send an abandonRequest only if this future exists 087 if ( connection.doesFutureExistFor( messageId ) ) 088 { 089 connection.abandon( messageId ); 090 } 091 092 // then clear the queue, cause the might be some incoming messages before this abandon request 093 // hits the server 094 queue.clear(); 095 096 return cancelled; 097 } 098 099 100 /** 101 * {@inheritDoc} 102 * @throws InterruptedException if the operation has been cancelled by client 103 */ 104 public R get() throws InterruptedException, ExecutionException 105 { 106 R response = null; 107 108 response = queue.take(); 109 110 return response; 111 } 112 113 114 /** 115 * {@inheritDoc} 116 * @throws InterruptedException if the operation has been cancelled by client 117 */ 118 public void set( R response ) throws InterruptedException, ExecutionException 119 { 120 queue.add( response ); 121 } 122 123 124 /** 125 * {@inheritDoc} 126 * @throws InterruptedException if the operation has been cancelled by client 127 */ 128 public R get( long timeout, TimeUnit unit ) throws InterruptedException, ExecutionException, TimeoutException 129 { 130 R response = queue.poll( timeout, unit ); 131 132 return response; 133 } 134 135 136 /** 137 * {@inheritDoc} 138 */ 139 public boolean isCancelled() 140 { 141 return cancelled; 142 } 143 144 145 /** 146 * This operation is not supported in this implementation of Future. 147 * 148 * {@inheritDoc} 149 */ 150 public boolean isDone() 151 { 152 throw new UnsupportedOperationException( "Operation not supported" ); 153 } 154 155 156 /** 157 * @return the cause 158 */ 159 public Throwable getCause() 160 { 161 return cause; 162 } 163 164 165 /** 166 * Associate a cause to the ResponseFuture 167 * @param cause the cause to set 168 */ 169 public void setCause( Throwable cause ) 170 { 171 this.cause = cause; 172 } 173 174 175 /** 176 * Cancel the Future 177 * 178 */ 179 public void cancel() 180 { 181 // set the cancel flag first 182 cancelled = true; 183 } 184 185 186 /** 187 * {@inheritDoc} 188 */ 189 public String toString() 190 { 191 StringBuilder sb = new StringBuilder(); 192 193 sb.append( "[msgId : " ).append( messageId ).append( ", " ); 194 sb.append( "size : " ).append( queue.size() ).append( ", " ); 195 sb.append( "Canceled :" ).append( cancelled ).append( "]" ); 196 197 return sb.toString(); 198 } 199}