/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Net.Sockets; using System.Threading; namespace Avro.ipc { public class SocketServer { public static ManualResetEvent allDone = new ManualResetEvent(false); private readonly string hostName; private readonly int port; private Responder responder; private bool cancellationRequested; private Socket channel; private List sockets = new List(); private Thread serverThread; public SocketServer(string hostName, int port, Responder responder = null) { if (hostName == null) throw new ArgumentNullException("hostName"); if (port < 0) throw new ArgumentOutOfRangeException("port"); this.responder = responder; this.hostName = hostName; this.port = port; } public bool IsBound { get { return channel.IsBound; } } public int Port { get { return ((IPEndPoint) channel.LocalEndPoint).Port; } } public void SetResponder(Responder responder) { this.responder = responder; } public void Start() { channel = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); serverThread = new Thread(Run); serverThread.Start(); while (!IsBound) { Thread.Sleep(10); } } public void Stop() { cancellationRequested = true; while (serverThread.IsAlive) { Thread.Sleep(10); } } private void Run() { IPHostEntry host = Dns.GetHostEntry(hostName); IPAddress ipAddress = host.AddressList.FirstOrDefault(x => x.AddressFamily == AddressFamily.InterNetwork); if (ipAddress == null) throw new InvalidDataException( string.Format("There is not IP Address with the hostname {0} and AddressFamily InterNetwork", hostName)); var localEndPoint = new IPEndPoint(ipAddress, port); channel.Bind(localEndPoint); channel.Listen(100); var results = new List(); while (true) { // Set the event to nonsignaled state. allDone.Reset(); // Start an asynchronous socket to listen for connections. IAsyncResult t = channel.BeginAccept(AcceptCallback, channel); results.Add(t); // Wait until a connection is made before continuing. while (!allDone.WaitOne(1000)) { if (cancellationRequested) { try { channel.Close(); } catch { } try { CloseSockets(); } catch { } return; } } } } private void CloseSockets() { lock (this) { try { foreach (var socket in sockets) { var myOpts = new LingerOption(true, 1); socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger, true); socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Linger, myOpts); socket.SendTimeout = 1; socket.ReceiveTimeout = 1; socket.Shutdown(SocketShutdown.Both); socket.Disconnect(false); } sockets = new List(); } catch (Exception) { } } } public void AddSocket(Socket socket) { lock (this) { sockets.Add(socket); } } public void RemoveSocket(Socket socket) { lock (this) { sockets.Remove(socket); } } private void AcceptCallback(IAsyncResult ar) { // Signal the main thread to continue. allDone.Set(); // Get the socket that handles the client request. var listener = (Socket) ar.AsyncState; if (cancellationRequested) { return; } Socket socket = listener.EndAccept(ar); AddSocket(socket); // Create the state object. var xc = new SocketTransceiver(socket); while (true) { try { IList request = xc.ReadBuffers(); IList response = responder.Respond(request, xc); xc.WriteBuffers(response); } catch (ObjectDisposedException) { break; } catch (SocketException) { break; } catch (AvroRuntimeException) { break; } catch (Exception) { break; } } try { xc.Disconnect(); } catch (Exception) { } RemoveSocket(socket); } } }