View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core;
21  
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.apache.mina.core.buffer.IoBuffer;
29  import org.apache.mina.core.future.IoFuture;
30  import org.apache.mina.core.future.WriteFuture;
31  import org.apache.mina.core.session.IoSession;
32  
33  /**
34   * A utility class that provides various convenience methods related with
35   * {@link IoSession} and {@link IoFuture}.
36   * 
37   * @author The Apache MINA Project (dev@mina.apache.org)
38   * @version $Rev: 671827 $, $Date: 2008-06-26 10:49:48 +0200 (jeu, 26 jun 2008) $
39   */
40  public class IoUtil {
41      
42      private static final IoSession[] EMPTY_SESSIONS = new IoSession[0];
43  
44      /**
45       * Writes the specified {@code message} to the specified {@code sessions}.
46       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
47       * automatically duplicated using {@link IoBuffer#duplicate()}.
48       */
49      public static List<WriteFuture> broadcast(Object message, Collection<IoSession> sessions) {
50          List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.size());
51          broadcast(message, sessions.iterator(), answer);
52          return answer;
53      }
54  
55      /**
56       * Writes the specified {@code message} to the specified {@code sessions}.
57       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
58       * automatically duplicated using {@link IoBuffer#duplicate()}.
59       */
60      public static List<WriteFuture> broadcast(Object message, Iterable<IoSession> sessions) {
61          List<WriteFuture> answer = new ArrayList<WriteFuture>();
62          broadcast(message, sessions.iterator(), answer);
63          return answer;
64      }
65      
66      /**
67       * Writes the specified {@code message} to the specified {@code sessions}.
68       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
69       * automatically duplicated using {@link IoBuffer#duplicate()}.
70       */
71      public static List<WriteFuture> broadcast(Object message, Iterator<IoSession> sessions) {
72          List<WriteFuture> answer = new ArrayList<WriteFuture>();
73          broadcast(message, sessions, answer);
74          return answer;
75      }
76      
77      /**
78       * Writes the specified {@code message} to the specified {@code sessions}.
79       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
80       * automatically duplicated using {@link IoBuffer#duplicate()}.
81       */
82      public static List<WriteFuture> broadcast(Object message, IoSession... sessions) {
83          if (sessions == null) {
84              sessions = EMPTY_SESSIONS;
85          }
86          
87          List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.length);
88          if (message instanceof IoBuffer) {
89              for (IoSession s: sessions) {
90                  answer.add(s.write(((IoBuffer) message).duplicate()));
91              }
92          } else {
93              for (IoSession s: sessions) {
94                  answer.add(s.write(message));
95              }
96          }
97          return answer;
98      }
99      
100     private static void broadcast(Object message, Iterator<IoSession> sessions, Collection<WriteFuture> answer) {
101         if (message instanceof IoBuffer) {
102             while (sessions.hasNext()) {
103                 IoSession s = sessions.next();
104                 answer.add(s.write(((IoBuffer) message).duplicate()));
105             }
106         } else {
107             while (sessions.hasNext()) {
108                 IoSession s = sessions.next();
109                 answer.add(s.write(message));
110             }
111         }
112     }
113     
114     public static void await(Iterable<? extends IoFuture> futures) throws InterruptedException {
115         for (IoFuture f: futures) {
116             f.await();
117         }
118     }
119     
120     public static void awaitUninterruptably(Iterable<? extends IoFuture> futures) {
121         for (IoFuture f: futures) {
122             f.awaitUninterruptibly();
123         }
124     }
125     
126     public static boolean await(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit) throws InterruptedException {
127         return await(futures, unit.toMillis(timeout));
128     }
129 
130     public static boolean await(Iterable<? extends IoFuture> futures, long timeoutMillis) throws InterruptedException {
131         return await0(futures, timeoutMillis, true);
132     }
133 
134     public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit) {
135         return awaitUninterruptibly(futures, unit.toMillis(timeout));
136     }
137 
138     public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeoutMillis) {
139         try {
140             return await0(futures, timeoutMillis, false);
141         } catch (InterruptedException e) {
142             throw new InternalError();
143         }
144     }
145 
146     private static boolean await0(Iterable<? extends IoFuture> futures, long timeoutMillis, boolean interruptable) throws InterruptedException {
147         long startTime = timeoutMillis <= 0 ? 0 : System.currentTimeMillis();
148         long waitTime = timeoutMillis;
149         
150         boolean lastComplete = true;
151         Iterator<? extends IoFuture> i = futures.iterator();
152         while (i.hasNext()) {
153             IoFuture f = i.next();
154             do {
155                 if (interruptable) {
156                     lastComplete = f.await(waitTime);
157                 } else {
158                     lastComplete = f.awaitUninterruptibly(waitTime);
159                 }
160                 
161                 waitTime = timeoutMillis - (System.currentTimeMillis() - startTime);
162 
163                 if (lastComplete || waitTime <= 0) {
164                     break;
165                 }
166             } while (!lastComplete);
167             
168             if (waitTime <= 0) {
169                 break;
170             }
171         }
172         
173         return lastComplete && !i.hasNext();
174     }
175 
176     private IoUtil() {
177     }
178 }