1 package org.apache.maven.plugin.surefire.booterclient.output;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.shared.utils.cli.StreamConsumer;
23 import org.apache.maven.surefire.util.internal.DaemonThreadFactory;
24
25 import java.io.Closeable;
26 import java.io.IOException;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.atomic.AtomicBoolean;
30
31 import static java.lang.Thread.currentThread;
32
33
34
35
36
37
38 public final class ThreadedStreamConsumer
39 implements StreamConsumer, Closeable
40 {
41 private static final String END_ITEM = "";
42
43 private static final int ITEM_LIMIT_BEFORE_SLEEP = 10_000;
44
45 private final BlockingQueue<String> items = new ArrayBlockingQueue<>( ITEM_LIMIT_BEFORE_SLEEP );
46
47 private final AtomicBoolean stop = new AtomicBoolean();
48
49 private final Thread thread;
50
51 private final Pumper pumper;
52
53 final class Pumper
54 implements Runnable
55 {
56 private final StreamConsumer target;
57
58 private final MultipleFailureException errors = new MultipleFailureException();
59
60 Pumper( StreamConsumer target )
61 {
62 this.target = target;
63 }
64
65
66
67
68
69
70
71
72
73
74
75
76 @Override
77 public void run()
78 {
79 while ( !ThreadedStreamConsumer.this.stop.get() )
80 {
81 try
82 {
83 String item = ThreadedStreamConsumer.this.items.take();
84 if ( shouldStopQueueing( item ) )
85 {
86 return;
87 }
88 target.consumeLine( item );
89 }
90 catch ( Throwable t )
91 {
92 errors.addException( t );
93 }
94 }
95 }
96
97 boolean hasErrors()
98 {
99 return errors.hasNestedExceptions();
100 }
101
102 void throwErrors() throws IOException
103 {
104 throw errors;
105 }
106 }
107
108 public ThreadedStreamConsumer( StreamConsumer target )
109 {
110 pumper = new Pumper( target );
111 thread = DaemonThreadFactory.newDaemonThread( pumper, "ThreadedStreamConsumer" );
112 thread.start();
113 }
114
115 @Override
116 public void consumeLine( String s )
117 {
118 if ( stop.get() && !thread.isAlive() )
119 {
120 items.clear();
121 return;
122 }
123
124 try
125 {
126 items.put( s );
127 }
128 catch ( InterruptedException e )
129 {
130 currentThread().interrupt();
131 throw new IllegalStateException( e );
132 }
133 }
134
135 @Override
136 public void close()
137 throws IOException
138 {
139 if ( stop.compareAndSet( false, true ) )
140 {
141 items.clear();
142 try
143 {
144 items.put( END_ITEM );
145 }
146 catch ( InterruptedException e )
147 {
148 currentThread().interrupt();
149 }
150 }
151
152 if ( pumper.hasErrors() )
153 {
154 pumper.throwErrors();
155 }
156 }
157
158
159
160
161
162
163
164 private boolean shouldStopQueueing( String item )
165 {
166 return item == END_ITEM;
167 }
168 }