View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.Closeable;
31  import java.io.IOException;
32  import java.nio.channels.ClosedSelectorException;
33  import java.nio.channels.SelectionKey;
34  import java.nio.channels.Selector;
35  import java.util.Set;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  import java.util.concurrent.atomic.AtomicReference;
38  
39  import org.apache.hc.core5.function.Callback;
40  import org.apache.hc.core5.io.CloseMode;
41  import org.apache.hc.core5.io.Closer;
42  import org.apache.hc.core5.util.Args;
43  import org.apache.hc.core5.util.TimeValue;
44  import org.apache.hc.core5.util.Timeout;
45  
46  abstract class AbstractSingleCoreIOReactor implements IOReactor {
47  
48      private final Callback<Exception> exceptionCallback;
49      private final AtomicReference<IOReactorStatus> status;
50      private final AtomicBoolean terminated;
51      private final Object shutdownMutex;
52  
53      final Selector selector;
54  
55      AbstractSingleCoreIOReactor(final Callback<Exception> exceptionCallback) {
56          super();
57          this.exceptionCallback = exceptionCallback;
58          this.shutdownMutex = new Object();
59          this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
60          this.terminated = new AtomicBoolean();
61          try {
62              this.selector = Selector.open();
63          } catch (final IOException ex) {
64              throw new IllegalStateException("Unexpected failure opening I/O selector", ex);
65          }
66      }
67  
68      @Override
69      public final IOReactorStatus getStatus() {
70          return this.status.get();
71      }
72  
73      void logException(final Exception ex) {
74          if (exceptionCallback != null) {
75              exceptionCallback.execute(ex);
76          }
77      }
78  
79      abstract void doExecute() throws IOException;
80  
81      abstract void doTerminate() throws IOException;
82  
83      public void execute() {
84          if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE)) {
85              try {
86                  doExecute();
87              } catch (final ClosedSelectorException ignore) {
88                  // ignore
89              } catch (final Exception ex) {
90                  logException(ex);
91              } finally {
92                  try {
93                      doTerminate();
94                  } catch (final Exception ex) {
95                      logException(ex);
96                  } finally {
97                      close(CloseMode.IMMEDIATE);
98                  }
99              }
100         }
101     }
102 
103     @Override
104     public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
105         Args.notNull(waitTime, "Wait time");
106         final long deadline = System.currentTimeMillis() + waitTime.toMilliseconds();
107         long remaining = waitTime.toMilliseconds();
108         synchronized (this.shutdownMutex) {
109             while (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
110                 this.shutdownMutex.wait(remaining);
111                 remaining = deadline - System.currentTimeMillis();
112                 if (remaining <= 0) {
113                     return;
114                 }
115             }
116         }
117     }
118 
119     @Override
120     public final void initiateShutdown() {
121         if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.SHUT_DOWN)) {
122             synchronized (this.shutdownMutex) {
123                 this.shutdownMutex.notifyAll();
124             }
125         } else if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) {
126             this.selector.wakeup();
127         }
128     }
129 
130     @Override
131     public final void close(final CloseMode closeMode) {
132         close(closeMode, Timeout.ofSeconds(5));
133     }
134 
135     /**
136      * Shuts down the I/O reactor either gracefully or immediately.
137      * During graceful shutdown individual I/O sessions should be
138      * informed about imminent termination and be given a grace period
139      * to complete the ongoing I/O sessions. During immediate shutdown
140      * all ongoing I/O sessions get aborted immediately.
141      *
142      * @param closeMode How to close the IO reactor.
143      * @param timeout  How long to wait for the IO reactor to close gracefully.
144      * @since 5.2
145      */
146     public void close(final CloseMode closeMode, final Timeout timeout) {
147         if (closeMode == CloseMode.GRACEFUL) {
148             initiateShutdown();
149             try {
150                 awaitShutdown(timeout);
151             } catch (final InterruptedException e) {
152                 Thread.currentThread().interrupt();
153             }
154         }
155         this.status.set(IOReactorStatus.SHUT_DOWN);
156         if (terminated.compareAndSet(false, true)) {
157             try {
158                 final Set<SelectionKey> keys = this.selector.keys();
159                 for (final SelectionKey key : keys) {
160                     try {
161                         Closer.close((Closeable) key.attachment());
162                     } catch (final IOException ex) {
163                         logException(ex);
164                     }
165                     key.channel().close();
166                 }
167                 selector.close();
168             } catch (final Exception ex) {
169                 logException(ex);
170             }
171         }
172         synchronized (this.shutdownMutex) {
173             this.shutdownMutex.notifyAll();
174         }
175     }
176 
177     @Override
178     public final void close() {
179         close(CloseMode.GRACEFUL);
180     }
181 
182     @Override
183     public String toString() {
184         return super.toString() + " [status=" + status + "]";
185     }
186 
187 }