View Javadoc
1   package org.eclipse.aether.util.concurrency;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   * 
12   *  http://www.apache.org/licenses/LICENSE-2.0
13   * 
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import static java.util.Objects.requireNonNull;
23  
24  import java.util.concurrent.atomic.AtomicInteger;
25  import java.util.concurrent.atomic.AtomicReference;
26  import java.util.concurrent.locks.LockSupport;
27  
28  /**
29   * A utility class to forward any uncaught {@link Error} or {@link RuntimeException} from a {@link Runnable} executed in
30   * a worker thread back to the parent thread. The simplified usage pattern looks like this:
31   * 
32   * <pre>
33   * RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
34   * for ( Runnable task : tasks )
35   * {
36   *     executor.execute( errorForwarder.wrap( task ) );
37   * }
38   * errorForwarder.await();
39   * </pre>
40   */
41  public final class RunnableErrorForwarder
42  {
43  
44      private final Thread thread = Thread.currentThread();
45  
46      private final AtomicInteger counter = new AtomicInteger();
47  
48      private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
49  
50      /**
51       * Creates a new error forwarder for worker threads spawned by the current thread.
52       */
53      public RunnableErrorForwarder()
54      {
55      }
56  
57      /**
58       * Wraps the specified runnable into an equivalent runnable that will allow forwarding of uncaught errors.
59       *
60       * @param runnable The runnable from which to forward errors, must not be {@code null}.
61       * @return The error-forwarding runnable to eventually execute, never {@code null}.
62       */
63      public Runnable wrap( final Runnable runnable )
64      {
65          requireNonNull( runnable, "runnable cannot be null" );
66  
67          counter.incrementAndGet();
68  
69          return new Runnable()
70          {
71              public void run()
72              {
73                  try
74                  {
75                      runnable.run();
76                  }
77                  catch ( RuntimeException e )
78                  {
79                      error.compareAndSet( null, e );
80                      throw e;
81                  }
82                  catch ( Error e )
83                  {
84                      error.compareAndSet( null, e );
85                      throw e;
86                  }
87                  finally
88                  {
89                      counter.decrementAndGet();
90                      LockSupport.unpark( thread );
91                  }
92              }
93          };
94      }
95  
96      /**
97       * Causes the current thread to wait until all previously {@link #wrap(Runnable) wrapped} runnables have terminated
98       * and potentially re-throws an uncaught {@link RuntimeException} or {@link Error} from any of the runnables. In
99       * case multiple runnables encountered uncaught errors, one error is arbitrarily selected. <em>Note:</em> This
100      * method must be called from the same thread that created this error forwarder instance.
101      */
102     public void await()
103     {
104         awaitTerminationOfAllRunnables();
105 
106         Throwable error = this.error.get();
107         if ( error != null )
108         {
109             if ( error instanceof RuntimeException )
110             {
111                 throw (RuntimeException) error;
112             }
113             else if ( error instanceof ThreadDeath )
114             {
115                 throw new IllegalStateException( error );
116             }
117             else if ( error instanceof Error )
118             {
119                 throw (Error) error;
120             }
121             throw new IllegalStateException( error );
122         }
123     }
124 
125     private void awaitTerminationOfAllRunnables()
126     {
127         if ( !thread.equals( Thread.currentThread() ) )
128         {
129             throw new IllegalStateException( "wrong caller thread, expected " + thread + " and not "
130                 + Thread.currentThread() );
131         }
132 
133         boolean interrupted = false;
134 
135         while ( counter.get() > 0 )
136         {
137             LockSupport.park();
138 
139             if ( Thread.interrupted() )
140             {
141                 interrupted = true;
142             }
143         }
144 
145         if ( interrupted )
146         {
147             Thread.currentThread().interrupt();
148         }
149     }
150 
151 }