View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactor;
28  
29  import static org.hamcrest.MatcherAssert.assertThat;
30  
31  import java.net.UnknownHostException;
32  import java.util.concurrent.Future;
33  
34  import org.apache.hc.core5.concurrent.FutureCallback;
35  import org.apache.hc.core5.function.Callback;
36  import org.apache.hc.core5.io.CloseMode;
37  import org.apache.hc.core5.util.TimeValue;
38  import org.apache.hc.core5.util.Timeout;
39  import org.hamcrest.CoreMatchers;
40  import org.junit.jupiter.api.Assertions;
41  import org.junit.jupiter.api.BeforeEach;
42  import org.junit.jupiter.api.Test;
43  import org.mockito.Answers;
44  import org.mockito.ArgumentCaptor;
45  import org.mockito.ArgumentMatchers;
46  import org.mockito.Captor;
47  import org.mockito.Mock;
48  import org.mockito.Mockito;
49  import org.mockito.MockitoAnnotations;
50  
51  public class TestAbstractIOSessionPool {
52  
53      @Mock
54      private Future<IOSession> connectFuture;
55      @Mock
56      private FutureCallback<IOSession> callback1;
57      @Mock
58      private FutureCallback<IOSession> callback2;
59      @Mock
60      private IOSession ioSession1;
61      @Mock
62      private IOSession ioSession2;
63      @Captor
64      ArgumentCaptor<FutureCallback<IOSession>> connectCallbackCaptor;
65  
66      private AbstractIOSessionPool<String> impl;
67  
68      @BeforeEach
69      public void setup() {
70          MockitoAnnotations.openMocks(this);
71          impl = Mockito.mock(AbstractIOSessionPool.class, Mockito.withSettings()
72                  .defaultAnswer(Answers.CALLS_REAL_METHODS)
73                  .useConstructor());
74      }
75  
76      @Test
77      public void testGetSessions() throws Exception {
78  
79          Mockito.when(impl.connectSession(
80                  ArgumentMatchers.anyString(),
81                  ArgumentMatchers.any(),
82                  ArgumentMatchers.any())).thenReturn(connectFuture);
83  
84          Mockito.doAnswer(invocation -> {
85              final Callback<Boolean> callback = invocation.getArgument(1);
86              callback.execute(true);
87              return null;
88          }).when(impl).validateSession(ArgumentMatchers.any(), ArgumentMatchers.any());
89  
90          Mockito.when(ioSession1.isOpen()).thenReturn(true);
91  
92          final Future<IOSession> future1 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
93          assertThat(future1, CoreMatchers.notNullValue());
94          assertThat(future1.isDone(), CoreMatchers.equalTo(false));
95          assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));
96  
97          Mockito.verify(impl).connectSession(
98                  ArgumentMatchers.eq("somehost"),
99                  ArgumentMatchers.eq(Timeout.ofSeconds(123L)),
100                 ArgumentMatchers.any());
101 
102         final Future<IOSession> future2 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
103         assertThat(future2, CoreMatchers.notNullValue());
104         assertThat(future2.isDone(), CoreMatchers.equalTo(false));
105         assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));
106 
107         Mockito.verify(impl, Mockito.times(1)).connectSession(
108                 ArgumentMatchers.eq("somehost"),
109                 ArgumentMatchers.any(),
110                 ArgumentMatchers.argThat(callback -> {
111                     callback.completed(ioSession1);
112                     return true;
113                 }));
114 
115         assertThat(future1.isDone(), CoreMatchers.equalTo(true));
116         assertThat(future1.get(), CoreMatchers.sameInstance(ioSession1));
117 
118         assertThat(future2.isDone(), CoreMatchers.equalTo(true));
119         assertThat(future2.get(), CoreMatchers.sameInstance(ioSession1));
120 
121         Mockito.verify(impl, Mockito.times(2)).validateSession(ArgumentMatchers.any(), ArgumentMatchers.any());
122 
123         final Future<IOSession> future3 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
124 
125         Mockito.verify(impl, Mockito.times(1)).connectSession(
126                 ArgumentMatchers.eq("somehost"),
127                 ArgumentMatchers.any(),
128                 ArgumentMatchers.any());
129 
130         Mockito.verify(impl, Mockito.times(3)).validateSession(ArgumentMatchers.any(), ArgumentMatchers.any());
131 
132         assertThat(future3.isDone(), CoreMatchers.equalTo(true));
133         assertThat(future3.get(), CoreMatchers.sameInstance(ioSession1));
134     }
135 
136     @Test
137     public void testGetSessionConnectFailure() throws Exception {
138 
139         Mockito.when(impl.connectSession(
140                 ArgumentMatchers.anyString(),
141                 ArgumentMatchers.any(),
142                 ArgumentMatchers.any())).thenReturn(connectFuture);
143 
144         final Future<IOSession> future1 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
145         assertThat(future1, CoreMatchers.notNullValue());
146         assertThat(future1.isDone(), CoreMatchers.equalTo(false));
147         assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));
148 
149         Mockito.verify(impl).connectSession(
150                 ArgumentMatchers.eq("somehost"),
151                 ArgumentMatchers.eq(Timeout.ofSeconds(123L)),
152                 connectCallbackCaptor.capture());
153 
154         final Future<IOSession> future2 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
155         assertThat(future2, CoreMatchers.notNullValue());
156         assertThat(future2.isDone(), CoreMatchers.equalTo(false));
157         assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));
158 
159         final FutureCallback<IOSession> connectCallback = connectCallbackCaptor.getValue();
160         Assertions.assertNotNull(connectCallback);
161         connectCallback.failed(new Exception("Boom"));
162 
163         // Ensure connect failure invalidates all pending futures
164         assertThat(future1.isDone(), CoreMatchers.equalTo(true));
165         assertThat(future2.isDone(), CoreMatchers.equalTo(true));
166     }
167 
168     @Test
169     public void testShutdownPool() throws Exception {
170         final AbstractIOSessionPool.PoolEntry entry1 = impl.getPoolEntry("host1");
171         assertThat(entry1, CoreMatchers.notNullValue());
172         entry1.session = ioSession1;
173 
174         final AbstractIOSessionPool.PoolEntry entry2 = impl.getPoolEntry("host2");
175         assertThat(entry2, CoreMatchers.notNullValue());
176         entry2.session = ioSession2;
177 
178         final AbstractIOSessionPool.PoolEntry entry3 = impl.getPoolEntry("host3");
179         assertThat(entry3, CoreMatchers.notNullValue());
180         entry3.sessionFuture = connectFuture;
181         entry3.requestQueue.add(callback1);
182         entry3.requestQueue.add(callback2);
183 
184         impl.close(CloseMode.GRACEFUL);
185 
186         Mockito.verify(impl).closeSession(ioSession1, CloseMode.GRACEFUL);
187         Mockito.verify(impl).closeSession(ioSession2, CloseMode.GRACEFUL);
188         Mockito.verify(connectFuture).cancel(ArgumentMatchers.anyBoolean());
189         Mockito.verify(callback1).cancelled();
190         Mockito.verify(callback2).cancelled();
191     }
192 
193     @Test
194     public void testCloseIdleSessions() throws Exception {
195         final AbstractIOSessionPool.PoolEntry entry1 = impl.getPoolEntry("host1");
196         assertThat(entry1, CoreMatchers.notNullValue());
197         entry1.session = ioSession1;
198 
199         final AbstractIOSessionPool.PoolEntry entry2 = impl.getPoolEntry("host2");
200         assertThat(entry2, CoreMatchers.notNullValue());
201         entry2.session = ioSession2;
202 
203         impl.closeIdle(TimeValue.ZERO_MILLISECONDS);
204 
205         Mockito.verify(impl).closeSession(ioSession1, CloseMode.GRACEFUL);
206         Mockito.verify(impl).closeSession(ioSession2, CloseMode.GRACEFUL);
207 
208         assertThat(entry1.session, CoreMatchers.nullValue());
209         assertThat(entry2.session, CoreMatchers.nullValue());
210     }
211 
212     @Test
213     public void testEnumSessions() throws Exception {
214         final AbstractIOSessionPool.PoolEntry entry1 = impl.getPoolEntry("host1");
215         assertThat(entry1, CoreMatchers.notNullValue());
216         entry1.session = ioSession1;
217 
218         final AbstractIOSessionPool.PoolEntry entry2 = impl.getPoolEntry("host2");
219         assertThat(entry2, CoreMatchers.notNullValue());
220         entry2.session = ioSession2;
221 
222         impl.enumAvailable(ioSession -> ioSession.close(CloseMode.GRACEFUL));
223         Mockito.verify(ioSession1).close(CloseMode.GRACEFUL);
224         Mockito.verify(ioSession2).close(CloseMode.GRACEFUL);
225     }
226 
227     @Test
228     public void testGetSessionReconnectAfterValidate() throws Exception {
229         final AbstractIOSessionPool.PoolEntry entry1 = impl.getPoolEntry("somehost");
230         assertThat(entry1, CoreMatchers.notNullValue());
231         entry1.session = ioSession1;
232 
233         Mockito.when(ioSession1.isOpen()).thenReturn(true);
234         Mockito.doAnswer(invocation -> {
235             final Callback<Boolean> callback = invocation.getArgument(1);
236             callback.execute(false);
237             return null;
238         }).when(impl).validateSession(ArgumentMatchers.any(), ArgumentMatchers.any());
239 
240         impl.getSession("somehost", Timeout.ofSeconds(123L), null);
241 
242         Mockito.verify(impl, Mockito.times(1)).connectSession(
243                 ArgumentMatchers.eq("somehost"),
244                 ArgumentMatchers.eq(Timeout.ofSeconds(123L)),
245                 ArgumentMatchers.any());
246     }
247 
248     @Test
249     public void testGetSessionReconnectIfClosed() throws Exception {
250         final AbstractIOSessionPool.PoolEntry entry1 = impl.getPoolEntry("somehost");
251         assertThat(entry1, CoreMatchers.notNullValue());
252         entry1.session = ioSession1;
253 
254         Mockito.when(ioSession1.isOpen()).thenReturn(false);
255 
256         impl.getSession("somehost", Timeout.ofSeconds(123L), null);
257 
258         Mockito.verify(impl).connectSession(
259                 ArgumentMatchers.eq("somehost"),
260                 ArgumentMatchers.eq(Timeout.ofSeconds(123L)),
261                 ArgumentMatchers.any());
262     }
263 
264     @Test
265     public void testGetSessionConnectUnknownHost() throws Exception {
266 
267         Mockito.when(connectFuture.isDone()).thenReturn(true);
268         Mockito.when(impl.connectSession(
269                 ArgumentMatchers.anyString(),
270                 ArgumentMatchers.any(),
271                 ArgumentMatchers.argThat(callback -> {
272                     callback.failed(new UnknownHostException("Boom"));
273                     return true;
274                 }))).thenReturn(connectFuture);
275 
276         final Future<IOSession> future1 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
277         assertThat(future1, CoreMatchers.notNullValue());
278         assertThat(future1.isDone(), CoreMatchers.equalTo(true));
279 
280         final Future<IOSession> future2 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
281         assertThat(future2, CoreMatchers.notNullValue());
282         assertThat(future2.isDone(), CoreMatchers.equalTo(true));
283     }
284 
285 }