View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core;
21  
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.apache.mina.core.buffer.IoBuffer;
29  import org.apache.mina.core.future.IoFuture;
30  import org.apache.mina.core.future.WriteFuture;
31  import org.apache.mina.core.session.IoSession;
32  
33  /**
34   * A utility class that provides various convenience methods related with
35   * {@link IoSession} and {@link IoFuture}.
36   * 
37   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
38   */
39  public final class IoUtil {
40      private static final IoSessionml#IoSession">IoSession[] EMPTY_SESSIONS = new IoSession[0];
41  
42      private IoUtil() {
43          // Do nothing
44      }
45  
46      /**
47       * Writes the specified {@code message} to the specified {@code sessions}.
48       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
49       * automatically duplicated using {@link IoBuffer#duplicate()}.
50       * 
51       * @param message The message to broadcast
52       * @param sessions The sessions that will receive the message
53       * @return The list of WriteFuture created for each broadcasted message
54       */
55      public static List<WriteFuture> broadcast(Object message, Collection<IoSession> sessions) {
56          List<WriteFuture> answer = new ArrayList<>(sessions.size());
57          broadcast(message, sessions.iterator(), answer);
58          return answer;
59      }
60  
61      /**
62       * Writes the specified {@code message} to the specified {@code sessions}.
63       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
64       * automatically duplicated using {@link IoBuffer#duplicate()}.
65       * 
66       * @param message The message to broadcast
67       * @param sessions The sessions that will receive the message
68       * @return The list of WriteFuture created for each broadcasted message
69       */
70      public static List<WriteFuture> broadcast(Object message, Iterable<IoSession> sessions) {
71          List<WriteFuture> answer = new ArrayList<>();
72          broadcast(message, sessions.iterator(), answer);
73          return answer;
74      }
75  
76      /**
77       * Writes the specified {@code message} to the specified {@code sessions}.
78       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
79       * automatically duplicated using {@link IoBuffer#duplicate()}.
80       * 
81       * @param message The message to write
82       * @param sessions The sessions the message has to be written to
83       * @return The list of {@link WriteFuture} for the written messages
84       */
85      public static List<WriteFuture> broadcast(Object message, Iterator<IoSession> sessions) {
86          List<WriteFuture> answer = new ArrayList<>();
87          broadcast(message, sessions, answer);
88          return answer;
89      }
90  
91      /**
92       * Writes the specified {@code message} to the specified {@code sessions}.
93       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
94       * automatically duplicated using {@link IoBuffer#duplicate()}.
95       * 
96       * @param message The message to write
97       * @param sessions The sessions the message has to be written to
98       * @return The list of {@link WriteFuture} for the written messages
99       */
100     public static List<WriteFuture> broadcast(Object message, IoSession... sessions) {
101         if (sessions == null) {
102             sessions = EMPTY_SESSIONS;
103         }
104 
105         List<WriteFuture> answer = new ArrayList<>(sessions.length);
106         if (message instanceof IoBuffer) {
107             for (IoSession s : sessions) {
108                 answer.add(s.write(((IoBuffer) message).duplicate()));
109             }
110         } else {
111             for (IoSession s : sessions) {
112                 answer.add(s.write(message));
113             }
114         }
115         return answer;
116     }
117 
118     private static void broadcast(Object message, Iterator<IoSession> sessions, Collection<WriteFuture> answer) {
119         if (message instanceof IoBuffer) {
120             while (sessions.hasNext()) {
121                 IoSession s = sessions.next();
122                 answer.add(s.write(((IoBuffer) message).duplicate()));
123             }
124         } else {
125             while (sessions.hasNext()) {
126                 IoSession s = sessions.next();
127                 answer.add(s.write(message));
128             }
129         }
130     }
131 
132     /**
133      * Wait on all the {@link IoFuture}s we get, or until one of the {@link IoFuture}s is interrupted
134      *  
135      * @param futures The {@link IoFuture}s we are waiting on
136      * @throws InterruptedException If one of the {@link IoFuture} is interrupted
137      */
138     public static void await(Iterable<? extends IoFuture> futures) throws InterruptedException {
139         for (IoFuture f : futures) {
140             f.await();
141         }
142     }
143 
144     /**
145      * Wait on all the {@link IoFuture}s we get. This can't get interrupted.
146      *  
147      * @param futures The {@link IoFuture}s we are waiting on
148      */
149     public static void awaitUninterruptably(Iterable<? extends IoFuture> futures) {
150         for (IoFuture f : futures) {
151             f.awaitUninterruptibly();
152         }
153     }
154 
155     /**
156      * Wait on all the {@link IoFuture}s we get, or until one of the {@link IoFuture}s is interrupted
157      *  
158      * @param futures The {@link IoFuture}s we are waiting on 
159      * @param timeout The maximum time we wait for the {@link IoFuture}s to complete
160      * @param unit The Time unit to use for the timeout
161      * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if
162      * at least one {@link IoFuture} haas been interrupted
163      * @throws InterruptedException If one of the {@link IoFuture} is interrupted
164      */
165     public static boolean await(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit)
166             throws InterruptedException {
167         return await(futures, unit.toMillis(timeout));
168     }
169 
170     /**
171      * Wait on all the {@link IoFuture}s we get, or until one of the {@link IoFuture}s is interrupted
172      *  
173      * @param futures The {@link IoFuture}s we are waiting on 
174      * @param timeoutMillis The maximum milliseconds we wait for the {@link IoFuture}s to complete
175      * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if
176      * at least one {@link IoFuture} has been interrupted
177      * @throws InterruptedException If one of the {@link IoFuture} is interrupted
178      */
179     public static boolean await(Iterable<? extends IoFuture> futures, long timeoutMillis) throws InterruptedException {
180         return await0(futures, timeoutMillis, true);
181     }
182 
183     /**
184      * Wait on all the {@link IoFuture}s we get.
185      *  
186      * @param futures The {@link IoFuture}s we are waiting on 
187      * @param timeout The maximum time we wait for the {@link IoFuture}s to complete
188      * @param unit The Time unit to use for the timeout
189      * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if
190      * at least one {@link IoFuture} has been interrupted
191      */
192     public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit) {
193         return awaitUninterruptibly(futures, unit.toMillis(timeout));
194     }
195 
196     /**
197      * Wait on all the {@link IoFuture}s we get.
198      *  
199      * @param futures The {@link IoFuture}s we are waiting on 
200      * @param timeoutMillis The maximum milliseconds we wait for the {@link IoFuture}s to complete
201      * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if
202      * at least one {@link IoFuture} has been interrupted
203      */
204     public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeoutMillis) {
205         try {
206             return await0(futures, timeoutMillis, false);
207         } catch (InterruptedException e) {
208             throw new InternalError();
209         }
210     }
211 
212     private static boolean await0(Iterable<? extends IoFuture> futures, long timeoutMillis, boolean interruptable)
213             throws InterruptedException {
214         long startTime = timeoutMillis <= 0 ? 0 : System.currentTimeMillis();
215         long waitTime = timeoutMillis;
216 
217         boolean lastComplete = true;
218         Iterator<? extends IoFuture> i = futures.iterator();
219         
220         while (i.hasNext()) {
221             IoFuture f = i.next();
222 
223             do {
224                 if (interruptable) {
225                     lastComplete = f.await(waitTime);
226                 } else {
227                     lastComplete = f.awaitUninterruptibly(waitTime);
228                 }
229 
230                 waitTime = timeoutMillis - (System.currentTimeMillis() - startTime);
231 
232                 if (waitTime <= 0) {
233                     break;
234                 }
235             } while (!lastComplete);
236 
237             if (waitTime <= 0) {
238                 break;
239             }
240         }
241 
242         return lastComplete && !i.hasNext();
243     }
244 }