1 package org.apache.maven.plugin.surefire.extensions;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.plugin.surefire.booterclient.output.NativeStdOutStreamConsumer;
23 import org.apache.maven.surefire.api.event.Event;
24 import org.apache.maven.surefire.extensions.CloseableDaemonThread;
25 import org.apache.maven.surefire.extensions.CommandReader;
26 import org.apache.maven.surefire.extensions.EventHandler;
27 import org.apache.maven.surefire.extensions.ForkChannel;
28 import org.apache.maven.surefire.extensions.ForkNodeArguments;
29 import org.apache.maven.surefire.extensions.util.CountdownCloseable;
30 import org.apache.maven.surefire.extensions.util.LineConsumerThread;
31
32 import javax.annotation.Nonnull;
33 import java.io.Closeable;
34 import java.io.IOException;
35 import java.net.Inet4Address;
36 import java.net.InetAddress;
37 import java.net.InetSocketAddress;
38 import java.net.SocketOption;
39 import java.nio.channels.AsynchronousServerSocketChannel;
40 import java.nio.channels.AsynchronousSocketChannel;
41 import java.nio.channels.ReadableByteChannel;
42 import java.nio.channels.WritableByteChannel;
43 import java.util.concurrent.ExecutionException;
44 import java.util.concurrent.ExecutorService;
45 import java.util.concurrent.Executors;
46
47 import static java.net.StandardSocketOptions.SO_KEEPALIVE;
48 import static java.net.StandardSocketOptions.SO_REUSEADDR;
49 import static java.net.StandardSocketOptions.TCP_NODELAY;
50 import static java.nio.channels.AsynchronousChannelGroup.withThreadPool;
51 import static java.nio.channels.AsynchronousServerSocketChannel.open;
52 import static org.apache.maven.surefire.api.util.internal.Channels.newBufferedChannel;
53 import static org.apache.maven.surefire.api.util.internal.Channels.newChannel;
54 import static org.apache.maven.surefire.api.util.internal.Channels.newInputStream;
55 import static org.apache.maven.surefire.api.util.internal.Channels.newOutputStream;
56 import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 final class SurefireForkChannel extends ForkChannel
73 {
74 private static final ExecutorService THREAD_POOL = Executors.newCachedThreadPool( newDaemonThreadFactory() );
75
76 private final AsynchronousServerSocketChannel server;
77 private final String localHost;
78 private final int localPort;
79 private volatile AsynchronousSocketChannel worker;
80 private volatile LineConsumerThread out;
81
82 SurefireForkChannel( @Nonnull ForkNodeArguments arguments ) throws IOException
83 {
84 super( arguments );
85 server = open( withThreadPool( THREAD_POOL ) );
86 setTrueOptions( SO_REUSEADDR, TCP_NODELAY, SO_KEEPALIVE );
87 InetAddress ip = Inet4Address.getLocalHost();
88 server.bind( new InetSocketAddress( ip, 0 ), 1 );
89 InetSocketAddress localAddress = (InetSocketAddress) server.getLocalAddress();
90 localHost = localAddress.getHostString();
91 localPort = localAddress.getPort();
92 }
93
94 @Override
95 public void connectToClient() throws IOException
96 {
97 if ( worker != null )
98 {
99 throw new IllegalStateException( "already accepted TCP client connection" );
100 }
101
102 try
103 {
104 worker = server.accept().get();
105 }
106 catch ( InterruptedException e )
107 {
108 throw new IOException( e.getLocalizedMessage(), e );
109 }
110 catch ( ExecutionException e )
111 {
112 throw new IOException( e.getLocalizedMessage(), e.getCause() );
113 }
114 }
115
116 @SafeVarargs
117 private final void setTrueOptions( SocketOption<Boolean>... options )
118 throws IOException
119 {
120 for ( SocketOption<Boolean> option : options )
121 {
122 if ( server.supportedOptions().contains( option ) )
123 {
124 server.setOption( option, true );
125 }
126 }
127 }
128
129 @Override
130 public String getForkNodeConnectionString()
131 {
132 return "tcp://" + localHost + ":" + localPort;
133 }
134
135 @Override
136 public int getCountdownCloseablePermits()
137 {
138 return 3;
139 }
140
141 @Override
142 public CloseableDaemonThread bindCommandReader( @Nonnull CommandReader commands,
143 WritableByteChannel stdIn )
144 {
145
146
147
148 WritableByteChannel channel = newChannel( newOutputStream( worker ) );
149 return new StreamFeeder( "commands-fork-" + getArguments().getForkChannelId(), channel, commands,
150 getArguments().getConsoleLogger() );
151 }
152
153 @Override
154 public CloseableDaemonThread bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
155 @Nonnull CountdownCloseable countdownCloseable,
156 ReadableByteChannel stdOut )
157 {
158 out = new LineConsumerThread( "fork-" + getArguments().getForkChannelId() + "-out-thread", stdOut,
159 new NativeStdOutStreamConsumer( getArguments().getConsoleLogger() ), countdownCloseable );
160 out.start();
161
162 ReadableByteChannel channel = newBufferedChannel( newInputStream( worker ) );
163 return new EventConsumerThread( "fork-" + getArguments().getForkChannelId() + "-event-thread", channel,
164 eventHandler, countdownCloseable, getArguments() );
165 }
166
167 @Override
168 public void close() throws IOException
169 {
170
171 try ( Closeable c1 = worker; Closeable c2 = server; Closeable c3 = out )
172 {
173
174 }
175 }
176 }