001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.core.service;
021
022import java.util.Collections;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.CopyOnWriteArrayList;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicLong;
030
031import org.apache.mina.core.filterchain.IoFilterChain;
032import org.apache.mina.core.future.IoFuture;
033import org.apache.mina.core.future.IoFutureListener;
034import org.apache.mina.core.session.IoSession;
035import org.apache.mina.util.ExceptionMonitor;
036
037/**
038 * A helper class which provides addition and removal of {@link IoServiceListener}s and firing
039 * events.
040 *
041 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
042 */
043public class IoServiceListenerSupport {
044    /** The {@link IoService} that this instance manages. */
045    private final IoService service;
046
047    /** A list of {@link IoServiceListener}s. */
048    private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>();
049
050    /** Tracks managed sessions. */
051    private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<Long, IoSession>();
052
053    /**  Read only version of {@link #managedSessions}. */
054    private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap(managedSessions);
055
056    private final AtomicBoolean activated = new AtomicBoolean();
057
058    /** Time this listenerSupport has been activated */
059    private volatile long activationTime;
060
061    /** A counter used to store the maximum sessions we managed since the listenerSupport has been activated */
062    private volatile int largestManagedSessionCount = 0;
063
064    /** A global counter to count the number of sessions managed since the start */
065    private AtomicLong cumulativeManagedSessionCount = new AtomicLong(0);
066
067    /**
068     * Creates a new instance of the listenerSupport.
069     * 
070     * @param service The associated IoService
071     */
072    public IoServiceListenerSupport(IoService service) {
073        if (service == null) {
074            throw new IllegalArgumentException("service");
075        }
076
077        this.service = service;
078    }
079
080    /**
081     * Adds a new listener.
082     * 
083     * @param listener The added listener
084     */
085    public void add(IoServiceListener listener) {
086        if (listener != null) {
087            listeners.add(listener);
088        }
089    }
090
091    /**
092     * Removes an existing listener.
093     * 
094     * @param listener The listener to remove
095     */
096    public void remove(IoServiceListener listener) {
097        if (listener != null) {
098            listeners.remove(listener);
099        }
100    }
101
102    /**
103     * @return The time (in ms) this instance has been activated
104     */
105    public long getActivationTime() {
106        return activationTime;
107    }
108
109    public Map<Long, IoSession> getManagedSessions() {
110        return readOnlyManagedSessions;
111    }
112
113    public int getManagedSessionCount() {
114        return managedSessions.size();
115    }
116
117    /**
118     * @return The largest number of managed session since the creation of this
119     * listenerSupport
120     */
121    public int getLargestManagedSessionCount() {
122        return largestManagedSessionCount;
123    }
124
125    /**
126     * @return The total number of sessions managed since the initilization of this
127     * ListenerSupport
128     */
129    public long getCumulativeManagedSessionCount() {
130        return cumulativeManagedSessionCount.get();
131    }
132
133    /**
134     * @return true if the instance is active
135     */
136    public boolean isActive() {
137        return activated.get();
138    }
139
140    /**
141     * Calls {@link IoServiceListener#serviceActivated(IoService)}
142     * for all registered listeners.
143     */
144    public void fireServiceActivated() {
145        if (!activated.compareAndSet(false, true)) {
146            // The instance is already active
147            return;
148        }
149
150        activationTime = System.currentTimeMillis();
151
152        // Activate all the listeners now
153        for (IoServiceListener listener : listeners) {
154            try {
155                listener.serviceActivated(service);
156            } catch (Exception e) {
157                ExceptionMonitor.getInstance().exceptionCaught(e);
158            }
159        }
160    }
161
162    /**
163     * Calls {@link IoServiceListener#serviceDeactivated(IoService)}
164     * for all registered listeners.
165     */
166    public void fireServiceDeactivated() {
167        if (!activated.compareAndSet(true, false)) {
168            // The instance is already desactivated
169            return;
170        }
171
172        // Desactivate all the listeners
173        try {
174            for (IoServiceListener listener : listeners) {
175                try {
176                    listener.serviceDeactivated(service);
177                } catch (Exception e) {
178                    ExceptionMonitor.getInstance().exceptionCaught(e);
179                }
180            }
181        } finally {
182            disconnectSessions();
183        }
184    }
185
186    /**
187     * Calls {@link IoServiceListener#sessionCreated(IoSession)} for all registered listeners.
188     * 
189     * @param session The session which has been created
190     */
191    public void fireSessionCreated(IoSession session) {
192        boolean firstSession = false;
193
194        if (session.getService() instanceof IoConnector) {
195            synchronized (managedSessions) {
196                firstSession = managedSessions.isEmpty();
197            }
198        }
199
200        // If already registered, ignore.
201        if (managedSessions.putIfAbsent(session.getId(), session) != null) {
202            return;
203        }
204
205        // If the first connector session, fire a virtual service activation event.
206        if (firstSession) {
207            fireServiceActivated();
208        }
209
210        // Fire session events.
211        IoFilterChain filterChain = session.getFilterChain();
212        filterChain.fireSessionCreated();
213        filterChain.fireSessionOpened();
214
215        int managedSessionCount = managedSessions.size();
216
217        if (managedSessionCount > largestManagedSessionCount) {
218            largestManagedSessionCount = managedSessionCount;
219        }
220
221        cumulativeManagedSessionCount.incrementAndGet();
222
223        // Fire listener events.
224        for (IoServiceListener l : listeners) {
225            try {
226                l.sessionCreated(session);
227            } catch (Exception e) {
228                ExceptionMonitor.getInstance().exceptionCaught(e);
229            }
230        }
231    }
232
233    /**
234     * Calls {@link IoServiceListener#sessionDestroyed(IoSession)} for all registered listeners.
235     * 
236     * @param session The session which has been destroyed
237     */
238    public void fireSessionDestroyed(IoSession session) {
239        // Try to remove the remaining empty session set after removal.
240        if (managedSessions.remove(session.getId()) == null) {
241            return;
242        }
243
244        // Fire session events.
245        session.getFilterChain().fireSessionClosed();
246
247        // Fire listener events.
248        try {
249            for (IoServiceListener l : listeners) {
250                try {
251                    l.sessionDestroyed(session);
252                } catch (Exception e) {
253                    ExceptionMonitor.getInstance().exceptionCaught(e);
254                }
255            }
256        } finally {
257            // Fire a virtual service deactivation event for the last session of the connector.
258            if (session.getService() instanceof IoConnector) {
259                boolean lastSession = false;
260
261                synchronized (managedSessions) {
262                    lastSession = managedSessions.isEmpty();
263                }
264
265                if (lastSession) {
266                    fireServiceDeactivated();
267                }
268            }
269        }
270    }
271
272    /**
273     * Close all the sessions
274     * TODO disconnectSessions.
275     *
276     */
277    private void disconnectSessions() {
278        if (!(service instanceof IoAcceptor)) {
279            // We don't disconnect sessions for anything but an Acceptor
280            return;
281        }
282
283        if (!((IoAcceptor) service).isCloseOnDeactivation()) {
284            return;
285        }
286
287        Object lock = new Object();
288        IoFutureListener<IoFuture> listener = new LockNotifyingListener(lock);
289
290        for (IoSession s : managedSessions.values()) {
291            s.close(true).addListener(listener);
292        }
293
294        try {
295            synchronized (lock) {
296                while (!managedSessions.isEmpty()) {
297                    lock.wait(500);
298                }
299            }
300        } catch (InterruptedException ie) {
301            // Ignored
302        }
303    }
304
305    /**
306     * A listener in charge of releasing the lock when the close has been completed
307     */
308    private static class LockNotifyingListener implements IoFutureListener<IoFuture> {
309        private final Object lock;
310
311        public LockNotifyingListener(Object lock) {
312            this.lock = lock;
313        }
314
315        public void operationComplete(IoFuture future) {
316            synchronized (lock) {
317                lock.notifyAll();
318            }
319        }
320    }
321}