/* * (c) Copyright 2010 Talis Information Ltd. * All rights reserved. * [See end of file] */ /* Project Voldemort * Copyright 2008-2009 LinkedIn, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ package riot.comms.server.socket; import java.io.IOException ; import java.net.BindException ; import java.net.InetSocketAddress ; import java.net.ServerSocket ; import java.net.Socket ; import java.net.SocketException ; import java.util.concurrent.Executor ; import java.util.concurrent.RejectedExecutionHandler ; import java.util.concurrent.SynchronousQueue ; import java.util.concurrent.ThreadFactory ; import java.util.concurrent.ThreadPoolExecutor ; import java.util.concurrent.TimeUnit ; import java.util.concurrent.atomic.AtomicLong ; import org.slf4j.Logger ; import org.slf4j.LoggerFactory ; import riot.comms.CommsException ; import riot.comms.server.Server ; import riot.comms.server.Service ; import riot.comms.server.ServiceState ; import riot.comms.server.ServiceType ; /** Classical socket-based, non NIO, server */ public class SocketServer extends Server implements Service, Runnable { // Acknowlegments: // Understanding how this should be done is taken from looking at // Voldemort (Apache license) and Jetty (Eclipse license), then // writing what Cohort needs. // This code borrows from Voldemort (Apache license/copyright LinkedIn). // Many thanks to those open source projects. Saved me a lot of // time underdtanding how this should all work. // There seems to be both NIO and sockets-based implements aronud (guess: the tradeoffs aren't clear cut). private static AtomicLong counter = new AtomicLong(0) ; private AtomicLong counterChannels = new AtomicLong(0) ; private Logger log = null ; private int port ; // Externalize parameters? private final int corePoolSize = 5 ; // Later, more private final int maxPoolSize = 10 ; private final int sessionsPerConnection = 10 ; private final Executor executor ; private final long id ; private final ServiceType serviceType ; private final ThreadGroup threadGroup ; // Thread group for all request handlers (not this server thread). private final String label ; private final ServerRequestHandler requestHandler ; private Thread thread ; private ServerSocket serverSocket ; public SocketServer(int port, String label, ServiceType serviceType, ServerRequestHandler requestHandler) { super(serviceType) ; this.label = label ; this.serviceType = serviceType ; this.requestHandler = requestHandler ; this.id = counter.incrementAndGet() ; String logName = String.format("%s:%s[%d]", this.getClass().getName(), label, id) ; log = LoggerFactory.getLogger(logName) ; this.port = port; ThreadFactory threadFactory = new ThreadFactory() { private AtomicLong threadId = new AtomicLong(0); @Override public Thread newThread(Runnable runnable) { String name = SocketServer.this.label + "-" + threadId.getAndIncrement(); log.info("Thread: "+name) ; Thread t = new Thread(threadGroup, runnable, name); t.setDaemon(true); return t; } } ; RejectedExecutionHandler rejHandler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { log.warn("Reject execution") ; } } ; // Executor for incoming accept connections // This is the most general ThreadPoolExecutor constructor. this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 1, TimeUnit.SECONDS, new SynchronousQueue(), // Zero buffered queue ???? Easier for now. threadFactory, // ThreadFactory rejHandler); // This is the threadGroup for all this.threadGroup = new ThreadGroup(logName) ; } private boolean isInterrupted() { return Thread.currentThread().isInterrupted(); } @Override public void run() { log.info("Starting "+serviceType.getLabel()+" server [" + label + "] on port " + port); try { serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(port)); //serverSocket.setReceiveBufferSize(this.socketBufferSize); //startedStatusQueue.put(SUCCESS); // Interrupted? while( !serverSocket.isClosed() ) { final Socket socket = serverSocket.accept(); log.info("Accept: "+socket.getRemoteSocketAddress().toString()) ; setupSocket(socket); long channelId = counterChannels.getAndIncrement(); executor.execute(new ServerChannel(label, socket, requestHandler, channelId)); } } catch(BindException e) { log.error("Could not bind to port " + port + "."); //startedStatusQueue.offer(e); throw new CommsException(e); } catch(SocketException e) { //startedStatusQueue.offer(e); // If we have been manually shutdown, ignore if(!isInterrupted()) log.error("Error in server: ", e); } catch(IOException e) { //startedStatusQueue.offer(e); throw new CommsException(e); } catch(Throwable t) { log.error("Throwable", t); //startedStatusQueue.offer(t); if(t instanceof Error) throw (Error) t; else if(t instanceof RuntimeException) throw (RuntimeException) t; throw new CommsException(t); } finally { if(serverSocket != null) { try { serverSocket.close(); } catch(IOException e) { log.warn("Error while closing server socket", e); } } } } private void setupSocket(Socket socket) throws SocketException { socket.setTcpNoDelay(true); // socket.setSendBufferSize(this.socketBufferSize); // if(socket.getReceiveBufferSize() != this.socketBufferSize) // log.debug("Requested socket receive buffer size was " + this.socketBufferSize // + " bytes but actual size is " + socket.getReceiveBufferSize() + " bytes."); // if(socket.getSendBufferSize() != this.socketBufferSize) // log.debug("Requested socket send buffer size was " + this.socketBufferSize // + " bytes but actual size is " + socket.getSendBufferSize() + " bytes."); } @Override public void start() { if ( getState() != ServiceState.CREATED ) log.error("Out of sequence call to 'start'") ; setState(ServiceState.ACTIVE) ; this.thread = new Thread(this) ; this.thread.start() ; } @Override public void stop() { if ( getState() == ServiceState.CREATED ) log.error("Not started") ; threadGroup.interrupt() ; thread.interrupt() ; setState(ServiceState.FINISHED) ; } } /* * (c) Copyright 2010 Talis Information Ltd. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. The name of the author may not be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */