View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.nio;
29  
30  import java.io.IOException;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.atomic.AtomicReference;
33  
34  import org.apache.hc.core5.function.Callback;
35  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
36  import org.apache.hc.core5.io.CloseMode;
37  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
38  import org.apache.hc.core5.reactor.IOReactorConfig;
39  import org.apache.hc.core5.reactor.IOReactorService;
40  import org.apache.hc.core5.reactor.IOReactorStatus;
41  import org.apache.hc.core5.reactor.IOSession;
42  import org.apache.hc.core5.util.Args;
43  import org.apache.hc.core5.util.Asserts;
44  import org.apache.hc.core5.util.TimeValue;
45  
46  abstract class IOReactorExecutor<T extends IOReactorService> implements AutoCloseable {
47  
48      enum Status { READY, RUNNING, TERMINATED }
49  
50      private final IOReactorConfig ioReactorConfig;
51      private final ThreadFactory workerThreadFactory;
52      private final AtomicReference<T> ioReactorRef;
53      private final AtomicReference<Status> status;
54  
55      IOReactorExecutor(final IOReactorConfig ioReactorConfig, final ThreadFactory workerThreadFactory) {
56          super();
57          this.ioReactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
58          this.workerThreadFactory = workerThreadFactory;
59          this.ioReactorRef = new AtomicReference<>(null);
60          this.status = new AtomicReference<>(Status.READY);
61      }
62  
63      abstract T createIOReactor(
64              IOEventHandlerFactory ioEventHandlerFactory,
65              IOReactorConfig ioReactorConfig,
66              ThreadFactory threadFactory,
67              Callback<IOSession> sessionShutdownCallback) throws IOException;
68  
69      protected void execute(final IOEventHandlerFactory ioEventHandlerFactory) throws IOException {
70          Args.notNull(ioEventHandlerFactory, "Handler factory");
71          if (ioReactorRef.compareAndSet(null, createIOReactor(
72                  ioEventHandlerFactory,
73                  ioReactorConfig,
74                  workerThreadFactory,
75                  ShutdownCommand.GRACEFUL_NORMAL_CALLBACK))) {
76              if (status.compareAndSet(Status.READY, Status.RUNNING)) {
77                  ioReactorRef.get().start();
78              }
79          } else {
80              throw new IllegalStateException("I/O reactor has already been started");
81          }
82      }
83  
84      private T ensureRunning() {
85          final T ioReactor = ioReactorRef.get();
86          Asserts.check(ioReactor != null, "I/O reactor has not been started");
87          return ioReactor;
88      }
89  
90      T reactor() {
91          return ensureRunning();
92      }
93  
94      public IOReactorStatus getStatus() {
95          final T ioReactor = ioReactorRef.get();
96          return ioReactor != null ? ioReactor.getStatus() : IOReactorStatus.INACTIVE;
97      }
98  
99      public void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
100         Args.notNull(waitTime, "Wait time");
101         final T ioReactor = ioReactorRef.get();
102         if (ioReactor != null) {
103             ioReactor.awaitShutdown(waitTime);
104         }
105     }
106 
107     public void initiateShutdown() {
108         final T ioReactor = ioReactorRef.get();
109         if (ioReactor != null) {
110             if (status.compareAndSet(Status.RUNNING, Status.TERMINATED)) {
111                 ioReactor.initiateShutdown();
112             }
113         }
114     }
115 
116     public void shutdown(final TimeValue graceTime) {
117         Args.notNull(graceTime, "Grace time");
118         final T ioReactor = ioReactorRef.get();
119         if (ioReactor != null) {
120             if (status.compareAndSet(Status.RUNNING, Status.TERMINATED)) {
121                 ioReactor.initiateShutdown();
122             }
123             try {
124                 ioReactor.awaitShutdown(graceTime);
125             } catch (final InterruptedException ex) {
126                 Thread.currentThread().interrupt();
127             }
128             ioReactor.close(CloseMode.IMMEDIATE);
129         }
130     }
131 
132     @Override
133     public void close() throws Exception {
134         shutdown(TimeValue.ofSeconds(5));
135     }
136 
137 }