001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.core;
021
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Iterator;
025import java.util.List;
026import java.util.concurrent.TimeUnit;
027
028import org.apache.mina.core.buffer.IoBuffer;
029import org.apache.mina.core.future.IoFuture;
030import org.apache.mina.core.future.WriteFuture;
031import org.apache.mina.core.session.IoSession;
032
033/**
034 * A utility class that provides various convenience methods related with
035 * {@link IoSession} and {@link IoFuture}.
036 * 
037 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
038 */
039public final class IoUtil {
040    private static final IoSession[] EMPTY_SESSIONS = new IoSession[0];
041
042    /**
043     * Writes the specified {@code message} to the specified {@code sessions}.
044     * If the specified {@code message} is an {@link IoBuffer}, the buffer is
045     * automatically duplicated using {@link IoBuffer#duplicate()}.
046     * 
047     * @param message The message to broadcast
048     * @param sessions The sessions that will receive the message
049     * @return The list of WriteFuture created for each broadcasted message
050     */
051    public static List<WriteFuture> broadcast(Object message, Collection<IoSession> sessions) {
052        List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.size());
053        broadcast(message, sessions.iterator(), answer);
054        return answer;
055    }
056
057    /**
058     * Writes the specified {@code message} to the specified {@code sessions}.
059     * If the specified {@code message} is an {@link IoBuffer}, the buffer is
060     * automatically duplicated using {@link IoBuffer#duplicate()}.
061     * 
062     * @param message The message to broadcast
063     * @param sessions The sessions that will receive the message
064     * @return The list of WriteFuture created for each broadcasted message
065     */
066    public static List<WriteFuture> broadcast(Object message, Iterable<IoSession> sessions) {
067        List<WriteFuture> answer = new ArrayList<WriteFuture>();
068        broadcast(message, sessions.iterator(), answer);
069        return answer;
070    }
071
072    /**
073     * Writes the specified {@code message} to the specified {@code sessions}.
074     * If the specified {@code message} is an {@link IoBuffer}, the buffer is
075     * automatically duplicated using {@link IoBuffer#duplicate()}.
076     * 
077     * @param message The message to write
078     * @param sessions The sessions the message has to be written to
079     * @return The list of {@link WriteFuture} for the written messages
080     */
081    public static List<WriteFuture> broadcast(Object message, Iterator<IoSession> sessions) {
082        List<WriteFuture> answer = new ArrayList<WriteFuture>();
083        broadcast(message, sessions, answer);
084        return answer;
085    }
086
087    /**
088     * Writes the specified {@code message} to the specified {@code sessions}.
089     * If the specified {@code message} is an {@link IoBuffer}, the buffer is
090     * automatically duplicated using {@link IoBuffer#duplicate()}.
091     * 
092     * @param message The message to write
093     * @param sessions The sessions the message has to be written to
094     * @return The list of {@link WriteFuture} for the written messages
095     */
096    public static List<WriteFuture> broadcast(Object message, IoSession... sessions) {
097        if (sessions == null) {
098            sessions = EMPTY_SESSIONS;
099        }
100
101        List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.length);
102        if (message instanceof IoBuffer) {
103            for (IoSession s : sessions) {
104                answer.add(s.write(((IoBuffer) message).duplicate()));
105            }
106        } else {
107            for (IoSession s : sessions) {
108                answer.add(s.write(message));
109            }
110        }
111        return answer;
112    }
113
114    private static void broadcast(Object message, Iterator<IoSession> sessions, Collection<WriteFuture> answer) {
115        if (message instanceof IoBuffer) {
116            while (sessions.hasNext()) {
117                IoSession s = sessions.next();
118                answer.add(s.write(((IoBuffer) message).duplicate()));
119            }
120        } else {
121            while (sessions.hasNext()) {
122                IoSession s = sessions.next();
123                answer.add(s.write(message));
124            }
125        }
126    }
127
128    public static void await(Iterable<? extends IoFuture> futures) throws InterruptedException {
129        for (IoFuture f : futures) {
130            f.await();
131        }
132    }
133
134    public static void awaitUninterruptably(Iterable<? extends IoFuture> futures) {
135        for (IoFuture f : futures) {
136            f.awaitUninterruptibly();
137        }
138    }
139
140    public static boolean await(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit)
141            throws InterruptedException {
142        return await(futures, unit.toMillis(timeout));
143    }
144
145    public static boolean await(Iterable<? extends IoFuture> futures, long timeoutMillis) throws InterruptedException {
146        return await0(futures, timeoutMillis, true);
147    }
148
149    public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit) {
150        return awaitUninterruptibly(futures, unit.toMillis(timeout));
151    }
152
153    public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeoutMillis) {
154        try {
155            return await0(futures, timeoutMillis, false);
156        } catch (InterruptedException e) {
157            throw new InternalError();
158        }
159    }
160
161    private static boolean await0(Iterable<? extends IoFuture> futures, long timeoutMillis, boolean interruptable)
162            throws InterruptedException {
163        long startTime = timeoutMillis <= 0 ? 0 : System.currentTimeMillis();
164        long waitTime = timeoutMillis;
165
166        boolean lastComplete = true;
167        Iterator<? extends IoFuture> i = futures.iterator();
168        while (i.hasNext()) {
169            IoFuture f = i.next();
170            do {
171                if (interruptable) {
172                    lastComplete = f.await(waitTime);
173                } else {
174                    lastComplete = f.awaitUninterruptibly(waitTime);
175                }
176
177                waitTime = timeoutMillis - (System.currentTimeMillis() - startTime);
178
179                if (lastComplete || waitTime <= 0) {
180                    break;
181                }
182            } while (!lastComplete);
183
184            if (waitTime <= 0) {
185                break;
186            }
187        }
188
189        return lastComplete && !i.hasNext();
190    }
191
192    private IoUtil() {
193        // Do nothing
194    }
195}