View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.cli.transfer;
20  
21  import java.io.File;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.concurrent.ArrayBlockingQueue;
25  import java.util.concurrent.ConcurrentHashMap;
26  import java.util.concurrent.CountDownLatch;
27  import java.util.function.Consumer;
28  
29  import org.eclipse.aether.transfer.AbstractTransferListener;
30  import org.eclipse.aether.transfer.TransferCancelledException;
31  import org.eclipse.aether.transfer.TransferEvent;
32  import org.eclipse.aether.transfer.TransferListener;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  import static java.util.Objects.requireNonNull;
37  
38  /**
39   * A {@link TransferListener} implementation that wraps another delegate {@link TransferListener} but makes it run
40   * on single thread, keeping the listener logic simple. This listener also blocks on last transfer event to allow
41   * output to perform possible cleanup. It spawns a daemon thread to consume queued events that may fall in even
42   * concurrently.
43   *
44   * @since 4.0.0
45   */
46  public final class SimplexTransferListener extends AbstractTransferListener {
47      private static final Logger LOGGER = LoggerFactory.getLogger(SimplexTransferListener.class);
48      private static final int QUEUE_SIZE = 1024;
49      private static final int BATCH_MAX_SIZE = 500;
50      private final TransferListener delegate;
51      private final int batchMaxSize;
52      private final boolean blockOnLastEvent;
53      private final ArrayBlockingQueue<Exchange> eventQueue;
54  
55      /**
56       * Constructor that makes passed in delegate run on single thread, and will block on last event.
57       */
58      public SimplexTransferListener(TransferListener delegate) {
59          this(delegate, QUEUE_SIZE, BATCH_MAX_SIZE, true);
60      }
61  
62      /**
63       * Constructor that may alter behaviour of this listener.
64       *
65       * @param delegate The delegate that should run on single thread.
66       * @param queueSize The event queue size (default {@code 1024}).
67       * @param batchMaxSize The maximum batch size delegate should receive (default {@code 500}).
68       * @param blockOnLastEvent Should this listener block on last transfer end (completed or corrupted) block? (default {@code true}).
69       */
70      public SimplexTransferListener(
71              TransferListener delegate, int queueSize, int batchMaxSize, boolean blockOnLastEvent) {
72          this.delegate = requireNonNull(delegate);
73          if (queueSize < 1 || batchMaxSize < 1) {
74              throw new IllegalArgumentException("Queue and batch sizes must be greater than 1");
75          }
76          this.batchMaxSize = batchMaxSize;
77          this.blockOnLastEvent = blockOnLastEvent;
78  
79          this.eventQueue = new ArrayBlockingQueue<>(queueSize);
80          Thread updater = new Thread(this::feedConsumer);
81          updater.setDaemon(true);
82          updater.start();
83      }
84  
85      public TransferListener getDelegate() {
86          return delegate;
87      }
88  
89      private void feedConsumer() {
90          final ArrayList<Exchange> batch = new ArrayList<>(batchMaxSize);
91          try {
92              while (true) {
93                  batch.clear();
94                  if (eventQueue.drainTo(batch, BATCH_MAX_SIZE) == 0) {
95                      batch.add(eventQueue.take());
96                  }
97                  demux(batch);
98              }
99          } catch (InterruptedException e) {
100             throw new RuntimeException(e);
101         }
102     }
103 
104     private void demux(List<Exchange> exchanges) {
105         for (Exchange exchange : exchanges) {
106             exchange.process(transferEvent -> {
107                 TransferEvent.EventType type = transferEvent.getType();
108                 try {
109                     switch (type) {
110                         case INITIATED:
111                             delegate.transferInitiated(transferEvent);
112                             break;
113                         case STARTED:
114                             delegate.transferStarted(transferEvent);
115                             break;
116                         case PROGRESSED:
117                             delegate.transferProgressed(transferEvent);
118                             break;
119                         case CORRUPTED:
120                             delegate.transferCorrupted(transferEvent);
121                             break;
122                         case SUCCEEDED:
123                             delegate.transferSucceeded(transferEvent);
124                             break;
125                         case FAILED:
126                             delegate.transferFailed(transferEvent);
127                             break;
128                         default:
129                             LOGGER.warn("Invalid TransferEvent.EventType={}; ignoring it", type);
130                     }
131                 } catch (TransferCancelledException e) {
132                     ongoing.put(transferEvent.getResource().getFile(), Boolean.FALSE);
133                 }
134             });
135         }
136     }
137 
138     private void put(TransferEvent event, boolean last) {
139         try {
140             Exchange exchange;
141             if (blockOnLastEvent && last) {
142                 exchange = new BlockingExchange(event);
143             } else {
144                 exchange = new Exchange(event);
145             }
146             eventQueue.put(exchange);
147             exchange.waitForProcessed();
148         } catch (InterruptedException e) {
149             throw new RuntimeException(e);
150         }
151     }
152 
153     private final ConcurrentHashMap<File, Boolean> ongoing = new ConcurrentHashMap<>();
154 
155     @Override
156     public void transferInitiated(TransferEvent event) {
157         ongoing.putIfAbsent(event.getResource().getFile(), Boolean.TRUE);
158         put(event, false);
159     }
160 
161     @Override
162     public void transferStarted(TransferEvent event) throws TransferCancelledException {
163         if (ongoing.get(event.getResource().getFile()) == Boolean.FALSE) {
164             throw new TransferCancelledException();
165         }
166         put(event, false);
167     }
168 
169     @Override
170     public void transferProgressed(TransferEvent event) throws TransferCancelledException {
171         if (ongoing.get(event.getResource().getFile()) == Boolean.FALSE) {
172             throw new TransferCancelledException();
173         }
174         put(event, false);
175     }
176 
177     @Override
178     public void transferCorrupted(TransferEvent event) throws TransferCancelledException {
179         if (ongoing.get(event.getResource().getFile()) == Boolean.FALSE) {
180             throw new TransferCancelledException();
181         }
182         put(event, false);
183     }
184 
185     @Override
186     public void transferSucceeded(TransferEvent event) {
187         ongoing.remove(event.getResource().getFile());
188         put(event, ongoing.isEmpty());
189     }
190 
191     @Override
192     public void transferFailed(TransferEvent event) {
193         ongoing.remove(event.getResource().getFile());
194         put(event, ongoing.isEmpty());
195     }
196 
197     private static class Exchange {
198         private final TransferEvent event;
199 
200         private Exchange(TransferEvent event) {
201             this.event = event;
202         }
203 
204         public void process(Consumer<TransferEvent> consumer) {
205             consumer.accept(event);
206         }
207 
208         public void waitForProcessed() throws InterruptedException {
209             // nothing, is async
210         }
211     }
212 
213     private static class BlockingExchange extends Exchange {
214         private final CountDownLatch latch = new CountDownLatch(1);
215 
216         private BlockingExchange(TransferEvent event) {
217             super(event);
218         }
219 
220         @Override
221         public void process(Consumer<TransferEvent> consumer) {
222             super.process(consumer);
223             latch.countDown();
224         }
225 
226         @Override
227         public void waitForProcessed() throws InterruptedException {
228             latch.await();
229         }
230     }
231 }