View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.named.ipc;
20  
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.IOException;
24  import java.net.SocketAddress;
25  import java.nio.channels.ByteChannel;
26  import java.nio.channels.Channels;
27  import java.nio.channels.ServerSocketChannel;
28  import java.nio.channels.SocketChannel;
29  import java.util.ArrayList;
30  import java.util.HashMap;
31  import java.util.Iterator;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.concurrent.CompletableFuture;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.CopyOnWriteArrayList;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicInteger;
39  
40  /**
41   * Implementation of the server side.
42   * The server instance is bound to a given maven repository.
43   *
44   * @since 2.0.1
45   */
46  public class IpcServer {
47      /**
48       * Should the IPC server not fork? (i.e. for testing purposes)
49       *
50       * @configurationSource {@link System#getProperty(String, String)}
51       * @configurationType {@link java.lang.Boolean}
52       * @configurationDefaultValue {@link #DEFAULT_NO_FORK}
53       */
54      public static final String SYSTEM_PROP_NO_FORK = "aether.named.ipc.nofork";
55  
56      public static final boolean DEFAULT_NO_FORK = false;
57  
58      /**
59       * IPC idle timeout in seconds. If there is no IPC request during idle time, it will stop.
60       *
61       * @configurationSource {@link System#getProperty(String, String)}
62       * @configurationType {@link java.lang.Integer}
63       * @configurationDefaultValue {@link #DEFAULT_IDLE_TIMEOUT}
64       */
65      public static final String SYSTEM_PROP_IDLE_TIMEOUT = "aether.named.ipc.idleTimeout";
66  
67      public static final int DEFAULT_IDLE_TIMEOUT = 60;
68  
69      /**
70       * IPC socket family to use.
71       *
72       * @configurationSource {@link System#getProperty(String, String)}
73       * @configurationType {@link java.lang.String}
74       * @configurationDefaultValue {@link #DEFAULT_FAMILY}
75       */
76      public static final String SYSTEM_PROP_FAMILY = "aether.named.ipc.family";
77  
78      public static final String DEFAULT_FAMILY = "unix";
79  
80      /**
81       * Should the IPC server not use native executable?
82       *
83       * @configurationSource {@link System#getProperty(String, String)}
84       * @configurationType {@link java.lang.Boolean}
85       * @configurationDefaultValue {@link #DEFAULT_NO_NATIVE}
86       */
87      public static final String SYSTEM_PROP_NO_NATIVE = "aether.named.ipc.nonative";
88  
89      public static final boolean DEFAULT_NO_NATIVE = true;
90  
91      /**
92       * The name if the IPC server native executable (without file extension like ".exe")
93       *
94       * @configurationSource {@link System#getProperty(String, String)}
95       * @configurationType {@link java.lang.String}
96       * @configurationDefaultValue {@link #DEFAULT_NATIVE_NAME}
97       */
98      public static final String SYSTEM_PROP_NATIVE_NAME = "aether.named.ipc.nativeName";
99  
100     public static final String DEFAULT_NATIVE_NAME = "ipc-sync";
101 
102     /**
103      * Should the IPC server log debug messages? (i.e. for testing purposes)
104      *
105      * @configurationSource {@link System#getProperty(String, String)}
106      * @configurationType {@link java.lang.Boolean}
107      * @configurationDefaultValue {@link #DEFAULT_DEBUG}
108      */
109     public static final String SYSTEM_PROP_DEBUG = "aether.named.ipc.debug";
110 
111     public static final boolean DEFAULT_DEBUG = false;
112 
113     private final ServerSocketChannel serverSocket;
114     private final Map<SocketChannel, Thread> clients = new HashMap<>();
115     private final AtomicInteger counter = new AtomicInteger();
116     private final Map<String, Lock> locks = new ConcurrentHashMap<>();
117     private final Map<String, Context> contexts = new ConcurrentHashMap<>();
118     private static final boolean DEBUG =
119             Boolean.parseBoolean(System.getProperty(SYSTEM_PROP_DEBUG, Boolean.toString(DEFAULT_DEBUG)));
120     private final long idleTimeout;
121     private volatile long lastUsed;
122     private volatile boolean closing;
123 
124     public IpcServer(SocketFamily family) throws IOException {
125         serverSocket = family.openServerSocket();
126         long timeout = TimeUnit.SECONDS.toNanos(DEFAULT_IDLE_TIMEOUT);
127         String str = System.getProperty(SYSTEM_PROP_IDLE_TIMEOUT);
128         if (str != null) {
129             try {
130                 TimeUnit unit = TimeUnit.SECONDS;
131                 if (str.endsWith("ms")) {
132                     unit = TimeUnit.MILLISECONDS;
133                     str = str.substring(0, str.length() - 2);
134                 }
135                 long dur = Long.parseLong(str);
136                 timeout = unit.toNanos(dur);
137             } catch (NumberFormatException e) {
138                 error("Property " + SYSTEM_PROP_IDLE_TIMEOUT + " specified with invalid value: " + str, e);
139             }
140         }
141         idleTimeout = timeout;
142     }
143 
144     public static void main(String[] args) throws Exception {
145         // When spawning a new process, the child process is create within
146         // the same process group.  This means that a few signals are sent
147         // to the whole group.  This is the case for SIGINT (Ctrl-C) and
148         // SIGTSTP (Ctrl-Z) which are both sent to all the processed in the
149         // group when initiated from the controlling terminal.
150         // This is only a problem when the client creates the daemon, but
151         // without ignoring those signals, a client being interrupted will
152         // also interrupt and kill the daemon.
153         try {
154             sun.misc.Signal.handle(new sun.misc.Signal("INT"), sun.misc.SignalHandler.SIG_IGN);
155             if (IpcClient.IS_WINDOWS) {
156                 sun.misc.Signal.handle(new sun.misc.Signal("TSTP"), sun.misc.SignalHandler.SIG_IGN);
157             }
158         } catch (Throwable t) {
159             error("Unable to ignore INT and TSTP signals", t);
160         }
161 
162         String family = args[0];
163         String tmpAddress = args[1];
164         String rand = args[2];
165 
166         runServer(SocketFamily.valueOf(family), tmpAddress, rand);
167     }
168 
169     static IpcServer runServer(SocketFamily family, String tmpAddress, String rand) throws IOException {
170         IpcServer server = new IpcServer(family);
171         run(server::run, false); // this is one-off
172         String address = SocketFamily.toString(server.getLocalAddress());
173         SocketAddress socketAddress = SocketFamily.fromString(tmpAddress);
174         try (SocketChannel socket = SocketChannel.open(socketAddress)) {
175             try (DataOutputStream dos = new DataOutputStream(Channels.newOutputStream(socket))) {
176                 dos.writeUTF(rand);
177                 dos.writeUTF(address);
178                 dos.flush();
179             }
180         }
181 
182         return server;
183     }
184 
185     private static void debug(String msg, Object... args) {
186         if (DEBUG) {
187             System.out.printf("[ipc] [debug] " + msg + "\n", args);
188         }
189     }
190 
191     private static void info(String msg, Object... args) {
192         System.out.printf("[ipc] [info] " + msg + "\n", args);
193     }
194 
195     private static void error(String msg, Throwable t) {
196         System.out.println("[ipc] [error] " + msg);
197         t.printStackTrace(System.out);
198     }
199 
200     private static void run(Runnable runnable, boolean daemon) {
201         Thread thread = new Thread(runnable);
202         if (daemon) {
203             thread.setDaemon(true);
204         }
205         thread.start();
206     }
207 
208     public SocketAddress getLocalAddress() throws IOException {
209         return serverSocket.getLocalAddress();
210     }
211 
212     public void run() {
213         try {
214             info("IpcServer started at %s", getLocalAddress().toString());
215             use();
216             run(this::expirationCheck, true);
217             while (!closing) {
218                 SocketChannel socket = this.serverSocket.accept();
219                 run(() -> client(socket), false);
220             }
221         } catch (Throwable t) {
222             if (!closing) {
223                 error("Error running sync server loop", t);
224             }
225         }
226     }
227 
228     private void client(SocketChannel socket) {
229         int c;
230         synchronized (clients) {
231             clients.put(socket, Thread.currentThread());
232             c = clients.size();
233         }
234         info("New client connected (%d connected)", c);
235         use();
236         Map<String, Context> clientContexts = new ConcurrentHashMap<>();
237         try {
238             ByteChannel wrapper = new ByteChannelWrapper(socket);
239             DataInputStream input = new DataInputStream(Channels.newInputStream(wrapper));
240             DataOutputStream output = new DataOutputStream(Channels.newOutputStream(wrapper));
241             while (!closing) {
242                 int requestId = input.readInt();
243                 int sz = input.readInt();
244                 List<String> request = new ArrayList<>(sz);
245                 for (int i = 0; i < sz; i++) {
246                     request.add(input.readUTF());
247                 }
248                 if (request.isEmpty()) {
249                     throw new IOException("Received invalid request");
250                 }
251                 use();
252                 String contextId;
253                 Context context;
254                 String command = request.remove(0);
255                 switch (command) {
256                     case IpcMessages.REQUEST_CONTEXT:
257                         if (request.size() != 1) {
258                             throw new IOException("Expected one argument for " + command + " but got " + request);
259                         }
260                         boolean shared = Boolean.parseBoolean(request.remove(0));
261                         context = new Context(shared);
262                         contexts.put(context.id, context);
263                         clientContexts.put(context.id, context);
264                         synchronized (output) {
265                             debug("Created context %s", context.id);
266                             output.writeInt(requestId);
267                             output.writeInt(2);
268                             output.writeUTF(IpcMessages.RESPONSE_CONTEXT);
269                             output.writeUTF(context.id);
270                             output.flush();
271                         }
272                         break;
273                     case IpcMessages.REQUEST_ACQUIRE:
274                         if (request.size() < 1) {
275                             throw new IOException(
276                                     "Expected at least one argument for " + command + " but got " + request);
277                         }
278                         contextId = request.remove(0);
279                         context = contexts.get(contextId);
280                         if (context == null) {
281                             throw new IOException(
282                                     "Unknown context: " + contextId + ". Known contexts = " + contexts.keySet());
283                         }
284                         context.lock(request).thenRun(() -> {
285                             try {
286                                 synchronized (output) {
287                                     debug("Locking in context %s", context.id);
288                                     output.writeInt(requestId);
289                                     output.writeInt(1);
290                                     output.writeUTF(IpcMessages.RESPONSE_ACQUIRE);
291                                     output.flush();
292                                 }
293                             } catch (IOException e) {
294                                 try {
295                                     socket.close();
296                                 } catch (IOException ioException) {
297                                     e.addSuppressed(ioException);
298                                 }
299                                 error("Error writing lock response", e);
300                             }
301                         });
302                         break;
303                     case IpcMessages.REQUEST_CLOSE:
304                         if (request.size() != 1) {
305                             throw new IOException("Expected one argument for " + command + " but got " + request);
306                         }
307                         contextId = request.remove(0);
308                         context = contexts.remove(contextId);
309                         clientContexts.remove(contextId);
310                         if (context == null) {
311                             throw new IOException(
312                                     "Unknown context: " + contextId + ". Known contexts = " + contexts.keySet());
313                         }
314                         context.unlock();
315                         synchronized (output) {
316                             debug("Closing context %s", context.id);
317                             output.writeInt(requestId);
318                             output.writeInt(1);
319                             output.writeUTF(IpcMessages.RESPONSE_CLOSE);
320                             output.flush();
321                         }
322                         break;
323                     case IpcMessages.REQUEST_STOP:
324                         if (request.size() != 0) {
325                             throw new IOException("Expected zero argument for " + command + " but got " + request);
326                         }
327                         synchronized (output) {
328                             debug("Stopping server");
329                             output.writeInt(requestId);
330                             output.writeInt(1);
331                             output.writeUTF(IpcMessages.RESPONSE_STOP);
332                             output.flush();
333                         }
334                         close();
335                         break;
336                     default:
337                         throw new IOException("Unknown request: " + request.get(0));
338                 }
339             }
340         } catch (Throwable t) {
341             if (!closing) {
342                 error("Error processing request", t);
343             }
344         } finally {
345             if (!closing) {
346                 info("Client disconnecting...");
347             }
348             clientContexts.values().forEach(context -> {
349                 contexts.remove(context.id);
350                 context.unlock();
351             });
352             try {
353                 socket.close();
354             } catch (IOException ioException) {
355                 // ignore
356             }
357             synchronized (clients) {
358                 clients.remove(socket);
359                 c = clients.size();
360             }
361             if (!closing) {
362                 info("%d clients left", c);
363             }
364         }
365     }
366 
367     private void use() {
368         lastUsed = System.nanoTime();
369     }
370 
371     private void expirationCheck() {
372         while (true) {
373             long current = System.nanoTime();
374             long left = (lastUsed + idleTimeout) - current;
375             if (left < 0) {
376                 info("IpcServer expired, closing");
377                 close();
378                 break;
379             } else {
380                 try {
381                     Thread.sleep(TimeUnit.NANOSECONDS.toMillis(left));
382                 } catch (InterruptedException e) {
383                     info("IpcServer expiration check interrupted, closing");
384                     close();
385                     break;
386                 }
387             }
388         }
389     }
390 
391     void close() {
392         closing = true;
393         try {
394             serverSocket.close();
395         } catch (IOException e) {
396             error("Error closing server socket", e);
397         }
398         clients.forEach((s, t) -> {
399             try {
400                 s.close();
401             } catch (IOException e) {
402                 // ignore
403             }
404             t.interrupt();
405         });
406     }
407 
408     static class Waiter {
409         final Context context;
410         final CompletableFuture<Void> future;
411 
412         Waiter(Context context, CompletableFuture<Void> future) {
413             this.context = context;
414             this.future = future;
415         }
416     }
417 
418     static class Lock {
419 
420         final String key;
421 
422         List<Context> holders;
423         List<Waiter> waiters;
424 
425         Lock(String key) {
426             this.key = key;
427         }
428 
429         public synchronized CompletableFuture<Void> lock(Context context) {
430             if (holders == null) {
431                 holders = new ArrayList<>();
432             }
433             if (holders.isEmpty() || holders.get(0).shared && context.shared) {
434                 holders.add(context);
435                 return CompletableFuture.completedFuture(null);
436             }
437             if (waiters == null) {
438                 waiters = new ArrayList<>();
439             }
440 
441             CompletableFuture<Void> future = new CompletableFuture<>();
442             waiters.add(new Waiter(context, future));
443             return future;
444         }
445 
446         public synchronized void unlock(Context context) {
447             if (holders.remove(context)) {
448                 while (waiters != null
449                         && !waiters.isEmpty()
450                         && (holders.isEmpty() || holders.get(0).shared && waiters.get(0).context.shared)) {
451                     Waiter waiter = waiters.remove(0);
452                     holders.add(waiter.context);
453                     waiter.future.complete(null);
454                 }
455             } else if (waiters != null) {
456                 for (Iterator<Waiter> it = waiters.iterator(); it.hasNext(); ) {
457                     Waiter waiter = it.next();
458                     if (waiter.context == context) {
459                         it.remove();
460                         waiter.future.cancel(false);
461                     }
462                 }
463             }
464         }
465     }
466 
467     class Context {
468 
469         final String id;
470         final boolean shared;
471         final List<String> locks = new CopyOnWriteArrayList<>();
472 
473         Context(boolean shared) {
474             this.id = String.format("%08x", counter.incrementAndGet());
475             this.shared = shared;
476         }
477 
478         public CompletableFuture<?> lock(List<String> keys) {
479             locks.addAll(keys);
480             CompletableFuture<?>[] futures = keys.stream()
481                     .map(k -> IpcServer.this.locks.computeIfAbsent(k, Lock::new))
482                     .map(l -> l.lock(this))
483                     .toArray(CompletableFuture[]::new);
484             return CompletableFuture.allOf(futures);
485         }
486 
487         public void unlock() {
488             locks.stream()
489                     .map(k -> IpcServer.this.locks.computeIfAbsent(k, Lock::new))
490                     .forEach(l -> l.unlock(this));
491         }
492     }
493 }