001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.core; 021 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Iterator; 025import java.util.List; 026import java.util.concurrent.TimeUnit; 027 028import org.apache.mina.core.buffer.IoBuffer; 029import org.apache.mina.core.future.IoFuture; 030import org.apache.mina.core.future.WriteFuture; 031import org.apache.mina.core.session.IoSession; 032 033/** 034 * A utility class that provides various convenience methods related with 035 * {@link IoSession} and {@link IoFuture}. 036 * 037 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 038 */ 039public final class IoUtil { 040 private static final IoSession[] EMPTY_SESSIONS = new IoSession[0]; 041 042 /** 043 * Writes the specified {@code message} to the specified {@code sessions}. 044 * If the specified {@code message} is an {@link IoBuffer}, the buffer is 045 * automatically duplicated using {@link IoBuffer#duplicate()}. 046 * 047 * @param message The message to broadcast 048 * @param sessions The sessions that will receive the message 049 * @return The list of WriteFuture created for each broadcasted message 050 */ 051 public static List<WriteFuture> broadcast(Object message, Collection<IoSession> sessions) { 052 List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.size()); 053 broadcast(message, sessions.iterator(), answer); 054 return answer; 055 } 056 057 /** 058 * Writes the specified {@code message} to the specified {@code sessions}. 059 * If the specified {@code message} is an {@link IoBuffer}, the buffer is 060 * automatically duplicated using {@link IoBuffer#duplicate()}. 061 * 062 * @param message The message to broadcast 063 * @param sessions The sessions that will receive the message 064 * @return The list of WriteFuture created for each broadcasted message 065 */ 066 public static List<WriteFuture> broadcast(Object message, Iterable<IoSession> sessions) { 067 List<WriteFuture> answer = new ArrayList<WriteFuture>(); 068 broadcast(message, sessions.iterator(), answer); 069 return answer; 070 } 071 072 /** 073 * Writes the specified {@code message} to the specified {@code sessions}. 074 * If the specified {@code message} is an {@link IoBuffer}, the buffer is 075 * automatically duplicated using {@link IoBuffer#duplicate()}. 076 * 077 * @param message The message to write 078 * @param sessions The sessions the message has to be written to 079 * @return The list of {@link WriteFuture} for the written messages 080 */ 081 public static List<WriteFuture> broadcast(Object message, Iterator<IoSession> sessions) { 082 List<WriteFuture> answer = new ArrayList<WriteFuture>(); 083 broadcast(message, sessions, answer); 084 return answer; 085 } 086 087 /** 088 * Writes the specified {@code message} to the specified {@code sessions}. 089 * If the specified {@code message} is an {@link IoBuffer}, the buffer is 090 * automatically duplicated using {@link IoBuffer#duplicate()}. 091 * 092 * @param message The message to write 093 * @param sessions The sessions the message has to be written to 094 * @return The list of {@link WriteFuture} for the written messages 095 */ 096 public static List<WriteFuture> broadcast(Object message, IoSession... sessions) { 097 if (sessions == null) { 098 sessions = EMPTY_SESSIONS; 099 } 100 101 List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.length); 102 if (message instanceof IoBuffer) { 103 for (IoSession s : sessions) { 104 answer.add(s.write(((IoBuffer) message).duplicate())); 105 } 106 } else { 107 for (IoSession s : sessions) { 108 answer.add(s.write(message)); 109 } 110 } 111 return answer; 112 } 113 114 private static void broadcast(Object message, Iterator<IoSession> sessions, Collection<WriteFuture> answer) { 115 if (message instanceof IoBuffer) { 116 while (sessions.hasNext()) { 117 IoSession s = sessions.next(); 118 answer.add(s.write(((IoBuffer) message).duplicate())); 119 } 120 } else { 121 while (sessions.hasNext()) { 122 IoSession s = sessions.next(); 123 answer.add(s.write(message)); 124 } 125 } 126 } 127 128 public static void await(Iterable<? extends IoFuture> futures) throws InterruptedException { 129 for (IoFuture f : futures) { 130 f.await(); 131 } 132 } 133 134 public static void awaitUninterruptably(Iterable<? extends IoFuture> futures) { 135 for (IoFuture f : futures) { 136 f.awaitUninterruptibly(); 137 } 138 } 139 140 public static boolean await(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit) 141 throws InterruptedException { 142 return await(futures, unit.toMillis(timeout)); 143 } 144 145 public static boolean await(Iterable<? extends IoFuture> futures, long timeoutMillis) throws InterruptedException { 146 return await0(futures, timeoutMillis, true); 147 } 148 149 public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit) { 150 return awaitUninterruptibly(futures, unit.toMillis(timeout)); 151 } 152 153 public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeoutMillis) { 154 try { 155 return await0(futures, timeoutMillis, false); 156 } catch (InterruptedException e) { 157 throw new InternalError(); 158 } 159 } 160 161 private static boolean await0(Iterable<? extends IoFuture> futures, long timeoutMillis, boolean interruptable) 162 throws InterruptedException { 163 long startTime = timeoutMillis <= 0 ? 0 : System.currentTimeMillis(); 164 long waitTime = timeoutMillis; 165 166 boolean lastComplete = true; 167 Iterator<? extends IoFuture> i = futures.iterator(); 168 while (i.hasNext()) { 169 IoFuture f = i.next(); 170 do { 171 if (interruptable) { 172 lastComplete = f.await(waitTime); 173 } else { 174 lastComplete = f.awaitUninterruptibly(waitTime); 175 } 176 177 waitTime = timeoutMillis - (System.currentTimeMillis() - startTime); 178 179 if (lastComplete || waitTime <= 0) { 180 break; 181 } 182 } while (!lastComplete); 183 184 if (waitTime <= 0) { 185 break; 186 } 187 } 188 189 return lastComplete && !i.hasNext(); 190 } 191 192 private IoUtil() { 193 // Do nothing 194 } 195}