1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.cli.transfer;
20
21 import java.io.File;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.concurrent.ArrayBlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.function.Consumer;
28
29 import org.eclipse.aether.transfer.AbstractTransferListener;
30 import org.eclipse.aether.transfer.TransferCancelledException;
31 import org.eclipse.aether.transfer.TransferEvent;
32 import org.eclipse.aether.transfer.TransferListener;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import static java.util.Objects.requireNonNull;
37
38
39
40
41
42
43
44
45
46 public final class SimplexTransferListener extends AbstractTransferListener {
47 private static final Logger LOGGER = LoggerFactory.getLogger(SimplexTransferListener.class);
48 private static final int QUEUE_SIZE = 1024;
49 private static final int BATCH_MAX_SIZE = 500;
50 private final TransferListener delegate;
51 private final int batchMaxSize;
52 private final boolean blockOnLastEvent;
53 private final ArrayBlockingQueue<Exchange> eventQueue;
54
55
56
57
58 public SimplexTransferListener(TransferListener delegate) {
59 this(delegate, QUEUE_SIZE, BATCH_MAX_SIZE, true);
60 }
61
62
63
64
65
66
67
68
69
70 public SimplexTransferListener(
71 TransferListener delegate, int queueSize, int batchMaxSize, boolean blockOnLastEvent) {
72 this.delegate = requireNonNull(delegate);
73 if (queueSize < 1 || batchMaxSize < 1) {
74 throw new IllegalArgumentException("Queue and batch sizes must be greater than 1");
75 }
76 this.batchMaxSize = batchMaxSize;
77 this.blockOnLastEvent = blockOnLastEvent;
78
79 this.eventQueue = new ArrayBlockingQueue<>(queueSize);
80 Thread updater = new Thread(this::feedConsumer);
81 updater.setDaemon(true);
82 updater.start();
83 }
84
85 public TransferListener getDelegate() {
86 return delegate;
87 }
88
89 private void feedConsumer() {
90 final ArrayList<Exchange> batch = new ArrayList<>(batchMaxSize);
91 try {
92 while (true) {
93 batch.clear();
94 if (eventQueue.drainTo(batch, BATCH_MAX_SIZE) == 0) {
95 batch.add(eventQueue.take());
96 }
97 demux(batch);
98 }
99 } catch (InterruptedException e) {
100 throw new RuntimeException(e);
101 }
102 }
103
104 private void demux(List<Exchange> exchanges) {
105 for (Exchange exchange : exchanges) {
106 exchange.process(transferEvent -> {
107 TransferEvent.EventType type = transferEvent.getType();
108 try {
109 switch (type) {
110 case INITIATED:
111 delegate.transferInitiated(transferEvent);
112 break;
113 case STARTED:
114 delegate.transferStarted(transferEvent);
115 break;
116 case PROGRESSED:
117 delegate.transferProgressed(transferEvent);
118 break;
119 case CORRUPTED:
120 delegate.transferCorrupted(transferEvent);
121 break;
122 case SUCCEEDED:
123 delegate.transferSucceeded(transferEvent);
124 break;
125 case FAILED:
126 delegate.transferFailed(transferEvent);
127 break;
128 default:
129 LOGGER.warn("Invalid TransferEvent.EventType={}; ignoring it", type);
130 }
131 } catch (TransferCancelledException e) {
132 ongoing.put(transferEvent.getResource().getFile(), Boolean.FALSE);
133 }
134 });
135 }
136 }
137
138 private void put(TransferEvent event, boolean last) {
139 try {
140 Exchange exchange;
141 if (blockOnLastEvent && last) {
142 exchange = new BlockingExchange(event);
143 } else {
144 exchange = new Exchange(event);
145 }
146 eventQueue.put(exchange);
147 exchange.waitForProcessed();
148 } catch (InterruptedException e) {
149 throw new RuntimeException(e);
150 }
151 }
152
153 private final ConcurrentHashMap<File, Boolean> ongoing = new ConcurrentHashMap<>();
154
155 @Override
156 public void transferInitiated(TransferEvent event) {
157 ongoing.putIfAbsent(event.getResource().getFile(), Boolean.TRUE);
158 put(event, false);
159 }
160
161 @Override
162 public void transferStarted(TransferEvent event) throws TransferCancelledException {
163 if (ongoing.get(event.getResource().getFile()) == Boolean.FALSE) {
164 throw new TransferCancelledException();
165 }
166 put(event, false);
167 }
168
169 @Override
170 public void transferProgressed(TransferEvent event) throws TransferCancelledException {
171 if (ongoing.get(event.getResource().getFile()) == Boolean.FALSE) {
172 throw new TransferCancelledException();
173 }
174 put(event, false);
175 }
176
177 @Override
178 public void transferCorrupted(TransferEvent event) throws TransferCancelledException {
179 if (ongoing.get(event.getResource().getFile()) == Boolean.FALSE) {
180 throw new TransferCancelledException();
181 }
182 put(event, false);
183 }
184
185 @Override
186 public void transferSucceeded(TransferEvent event) {
187 ongoing.remove(event.getResource().getFile());
188 put(event, ongoing.isEmpty());
189 }
190
191 @Override
192 public void transferFailed(TransferEvent event) {
193 ongoing.remove(event.getResource().getFile());
194 put(event, ongoing.isEmpty());
195 }
196
197 private static class Exchange {
198 private final TransferEvent event;
199
200 private Exchange(TransferEvent event) {
201 this.event = event;
202 }
203
204 public void process(Consumer<TransferEvent> consumer) {
205 consumer.accept(event);
206 }
207
208 public void waitForProcessed() throws InterruptedException {
209
210 }
211 }
212
213 private static class BlockingExchange extends Exchange {
214 private final CountDownLatch latch = new CountDownLatch(1);
215
216 private BlockingExchange(TransferEvent event) {
217 super(event);
218 }
219
220 @Override
221 public void process(Consumer<TransferEvent> consumer) {
222 super.process(consumer);
223 latch.countDown();
224 }
225
226 @Override
227 public void waitForProcessed() throws InterruptedException {
228 latch.await();
229 }
230 }
231 }