View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core;
21  
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.apache.mina.core.buffer.IoBuffer;
29  import org.apache.mina.core.future.IoFuture;
30  import org.apache.mina.core.future.WriteFuture;
31  import org.apache.mina.core.session.IoSession;
32  
33  /**
34   * A utility class that provides various convenience methods related with
35   * {@link IoSession} and {@link IoFuture}.
36   * 
37   * @author The Apache MINA Project (dev@mina.apache.org)
38   */
39  public class IoUtil {
40      
41      private static final IoSession[] EMPTY_SESSIONS = new IoSession[0];
42  
43      /**
44       * Writes the specified {@code message} to the specified {@code sessions}.
45       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
46       * automatically duplicated using {@link IoBuffer#duplicate()}.
47       */
48      public static List<WriteFuture> broadcast(Object message, Collection<IoSession> sessions) {
49          List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.size());
50          broadcast(message, sessions.iterator(), answer);
51          return answer;
52      }
53  
54      /**
55       * Writes the specified {@code message} to the specified {@code sessions}.
56       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
57       * automatically duplicated using {@link IoBuffer#duplicate()}.
58       */
59      public static List<WriteFuture> broadcast(Object message, Iterable<IoSession> sessions) {
60          List<WriteFuture> answer = new ArrayList<WriteFuture>();
61          broadcast(message, sessions.iterator(), answer);
62          return answer;
63      }
64      
65      /**
66       * Writes the specified {@code message} to the specified {@code sessions}.
67       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
68       * automatically duplicated using {@link IoBuffer#duplicate()}.
69       */
70      public static List<WriteFuture> broadcast(Object message, Iterator<IoSession> sessions) {
71          List<WriteFuture> answer = new ArrayList<WriteFuture>();
72          broadcast(message, sessions, answer);
73          return answer;
74      }
75      
76      /**
77       * Writes the specified {@code message} to the specified {@code sessions}.
78       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
79       * automatically duplicated using {@link IoBuffer#duplicate()}.
80       */
81      public static List<WriteFuture> broadcast(Object message, IoSession... sessions) {
82          if (sessions == null) {
83              sessions = EMPTY_SESSIONS;
84          }
85          
86          List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.length);
87          if (message instanceof IoBuffer) {
88              for (IoSession s: sessions) {
89                  answer.add(s.write(((IoBuffer) message).duplicate()));
90              }
91          } else {
92              for (IoSession s: sessions) {
93                  answer.add(s.write(message));
94              }
95          }
96          return answer;
97      }
98      
99      private static void broadcast(Object message, Iterator<IoSession> sessions, Collection<WriteFuture> answer) {
100         if (message instanceof IoBuffer) {
101             while (sessions.hasNext()) {
102                 IoSession s = sessions.next();
103                 answer.add(s.write(((IoBuffer) message).duplicate()));
104             }
105         } else {
106             while (sessions.hasNext()) {
107                 IoSession s = sessions.next();
108                 answer.add(s.write(message));
109             }
110         }
111     }
112     
113     public static void await(Iterable<? extends IoFuture> futures) throws InterruptedException {
114         for (IoFuture f: futures) {
115             f.await();
116         }
117     }
118     
119     public static void awaitUninterruptably(Iterable<? extends IoFuture> futures) {
120         for (IoFuture f: futures) {
121             f.awaitUninterruptibly();
122         }
123     }
124     
125     public static boolean await(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit) throws InterruptedException {
126         return await(futures, unit.toMillis(timeout));
127     }
128 
129     public static boolean await(Iterable<? extends IoFuture> futures, long timeoutMillis) throws InterruptedException {
130         return await0(futures, timeoutMillis, true);
131     }
132 
133     public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit) {
134         return awaitUninterruptibly(futures, unit.toMillis(timeout));
135     }
136 
137     public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeoutMillis) {
138         try {
139             return await0(futures, timeoutMillis, false);
140         } catch (InterruptedException e) {
141             throw new InternalError();
142         }
143     }
144 
145     private static boolean await0(Iterable<? extends IoFuture> futures, long timeoutMillis, boolean interruptable) throws InterruptedException {
146         long startTime = timeoutMillis <= 0 ? 0 : System.currentTimeMillis();
147         long waitTime = timeoutMillis;
148         
149         boolean lastComplete = true;
150         Iterator<? extends IoFuture> i = futures.iterator();
151         while (i.hasNext()) {
152             IoFuture f = i.next();
153             do {
154                 if (interruptable) {
155                     lastComplete = f.await(waitTime);
156                 } else {
157                     lastComplete = f.awaitUninterruptibly(waitTime);
158                 }
159                 
160                 waitTime = timeoutMillis - (System.currentTimeMillis() - startTime);
161 
162                 if (lastComplete || waitTime <= 0) {
163                     break;
164                 }
165             } while (!lastComplete);
166             
167             if (waitTime <= 0) {
168                 break;
169             }
170         }
171         
172         return lastComplete && !i.hasNext();
173     }
174 
175     private IoUtil() {
176         // Do nothing
177     }
178 }