1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 * 19 */ 20 package org.apache.mina.core; 21 22 import java.util.ArrayList; 23 import java.util.Collection; 24 import java.util.Iterator; 25 import java.util.List; 26 import java.util.concurrent.TimeUnit; 27 28 import org.apache.mina.core.buffer.IoBuffer; 29 import org.apache.mina.core.future.IoFuture; 30 import org.apache.mina.core.future.WriteFuture; 31 import org.apache.mina.core.session.IoSession; 32 33 /** 34 * A utility class that provides various convenience methods related with 35 * {@link IoSession} and {@link IoFuture}. 36 * 37 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 38 */ 39 public final class IoUtil { 40 private static final IoSessionml#IoSession">IoSession[] EMPTY_SESSIONS = new IoSession[0]; 41 42 private IoUtil() { 43 // Do nothing 44 } 45 46 /** 47 * Writes the specified {@code message} to the specified {@code sessions}. 48 * If the specified {@code message} is an {@link IoBuffer}, the buffer is 49 * automatically duplicated using {@link IoBuffer#duplicate()}. 50 * 51 * @param message The message to broadcast 52 * @param sessions The sessions that will receive the message 53 * @return The list of WriteFuture created for each broadcasted message 54 */ 55 public static List<WriteFuture> broadcast(Object message, Collection<IoSession> sessions) { 56 List<WriteFuture> answer = new ArrayList<>(sessions.size()); 57 broadcast(message, sessions.iterator(), answer); 58 return answer; 59 } 60 61 /** 62 * Writes the specified {@code message} to the specified {@code sessions}. 63 * If the specified {@code message} is an {@link IoBuffer}, the buffer is 64 * automatically duplicated using {@link IoBuffer#duplicate()}. 65 * 66 * @param message The message to broadcast 67 * @param sessions The sessions that will receive the message 68 * @return The list of WriteFuture created for each broadcasted message 69 */ 70 public static List<WriteFuture> broadcast(Object message, Iterable<IoSession> sessions) { 71 List<WriteFuture> answer = new ArrayList<>(); 72 broadcast(message, sessions.iterator(), answer); 73 return answer; 74 } 75 76 /** 77 * Writes the specified {@code message} to the specified {@code sessions}. 78 * If the specified {@code message} is an {@link IoBuffer}, the buffer is 79 * automatically duplicated using {@link IoBuffer#duplicate()}. 80 * 81 * @param message The message to write 82 * @param sessions The sessions the message has to be written to 83 * @return The list of {@link WriteFuture} for the written messages 84 */ 85 public static List<WriteFuture> broadcast(Object message, Iterator<IoSession> sessions) { 86 List<WriteFuture> answer = new ArrayList<>(); 87 broadcast(message, sessions, answer); 88 return answer; 89 } 90 91 /** 92 * Writes the specified {@code message} to the specified {@code sessions}. 93 * If the specified {@code message} is an {@link IoBuffer}, the buffer is 94 * automatically duplicated using {@link IoBuffer#duplicate()}. 95 * 96 * @param message The message to write 97 * @param sessions The sessions the message has to be written to 98 * @return The list of {@link WriteFuture} for the written messages 99 */ 100 public static List<WriteFuture> broadcast(Object message, IoSession... sessions) { 101 if (sessions == null) { 102 sessions = EMPTY_SESSIONS; 103 } 104 105 List<WriteFuture> answer = new ArrayList<>(sessions.length); 106 if (message instanceof IoBuffer) { 107 for (IoSession s : sessions) { 108 answer.add(s.write(((IoBuffer) message).duplicate())); 109 } 110 } else { 111 for (IoSession s : sessions) { 112 answer.add(s.write(message)); 113 } 114 } 115 return answer; 116 } 117 118 private static void broadcast(Object message, Iterator<IoSession> sessions, Collection<WriteFuture> answer) { 119 if (message instanceof IoBuffer) { 120 while (sessions.hasNext()) { 121 IoSession s = sessions.next(); 122 answer.add(s.write(((IoBuffer) message).duplicate())); 123 } 124 } else { 125 while (sessions.hasNext()) { 126 IoSession s = sessions.next(); 127 answer.add(s.write(message)); 128 } 129 } 130 } 131 132 /** 133 * Wait on all the {@link IoFuture}s we get, or until one of the {@link IoFuture}s is interrupted 134 * 135 * @param futures The {@link IoFuture}s we are waiting on 136 * @throws InterruptedException If one of the {@link IoFuture} is interrupted 137 */ 138 public static void await(Iterable<? extends IoFuture> futures) throws InterruptedException { 139 for (IoFuture f : futures) { 140 f.await(); 141 } 142 } 143 144 /** 145 * Wait on all the {@link IoFuture}s we get. This can't get interrupted. 146 * 147 * @param futures The {@link IoFuture}s we are waiting on 148 */ 149 public static void awaitUninterruptably(Iterable<? extends IoFuture> futures) { 150 for (IoFuture f : futures) { 151 f.awaitUninterruptibly(); 152 } 153 } 154 155 /** 156 * Wait on all the {@link IoFuture}s we get, or until one of the {@link IoFuture}s is interrupted 157 * 158 * @param futures The {@link IoFuture}s we are waiting on 159 * @param timeout The maximum time we wait for the {@link IoFuture}s to complete 160 * @param unit The Time unit to use for the timeout 161 * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if 162 * at least one {@link IoFuture} haas been interrupted 163 * @throws InterruptedException If one of the {@link IoFuture} is interrupted 164 */ 165 public static boolean await(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit) 166 throws InterruptedException { 167 return await(futures, unit.toMillis(timeout)); 168 } 169 170 /** 171 * Wait on all the {@link IoFuture}s we get, or until one of the {@link IoFuture}s is interrupted 172 * 173 * @param futures The {@link IoFuture}s we are waiting on 174 * @param timeoutMillis The maximum milliseconds we wait for the {@link IoFuture}s to complete 175 * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if 176 * at least one {@link IoFuture} has been interrupted 177 * @throws InterruptedException If one of the {@link IoFuture} is interrupted 178 */ 179 public static boolean await(Iterable<? extends IoFuture> futures, long timeoutMillis) throws InterruptedException { 180 return await0(futures, timeoutMillis, true); 181 } 182 183 /** 184 * Wait on all the {@link IoFuture}s we get. 185 * 186 * @param futures The {@link IoFuture}s we are waiting on 187 * @param timeout The maximum time we wait for the {@link IoFuture}s to complete 188 * @param unit The Time unit to use for the timeout 189 * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if 190 * at least one {@link IoFuture} has been interrupted 191 */ 192 public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit) { 193 return awaitUninterruptibly(futures, unit.toMillis(timeout)); 194 } 195 196 /** 197 * Wait on all the {@link IoFuture}s we get. 198 * 199 * @param futures The {@link IoFuture}s we are waiting on 200 * @param timeoutMillis The maximum milliseconds we wait for the {@link IoFuture}s to complete 201 * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if 202 * at least one {@link IoFuture} has been interrupted 203 */ 204 public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeoutMillis) { 205 try { 206 return await0(futures, timeoutMillis, false); 207 } catch (InterruptedException e) { 208 throw new InternalError(); 209 } 210 } 211 212 private static boolean await0(Iterable<? extends IoFuture> futures, long timeoutMillis, boolean interruptable) 213 throws InterruptedException { 214 long startTime = timeoutMillis <= 0 ? 0 : System.currentTimeMillis(); 215 long waitTime = timeoutMillis; 216 217 boolean lastComplete = true; 218 Iterator<? extends IoFuture> i = futures.iterator(); 219 220 while (i.hasNext()) { 221 IoFuture f = i.next(); 222 223 do { 224 if (interruptable) { 225 lastComplete = f.await(waitTime); 226 } else { 227 lastComplete = f.awaitUninterruptibly(waitTime); 228 } 229 230 waitTime = timeoutMillis - (System.currentTimeMillis() - startTime); 231 232 if (waitTime <= 0) { 233 break; 234 } 235 } while (!lastComplete); 236 237 if (waitTime <= 0) { 238 break; 239 } 240 } 241 242 return lastComplete && !i.hasNext(); 243 } 244 }