/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace Kafka.Client { using System; using System.IO; using System.Net.Sockets; using System.Threading; using Kafka.Client.Producers.Async; using Kafka.Client.Requests; using Kafka.Client.Serialization; using Kafka.Client.Utils; /// /// Manages connections to the Kafka. /// public class KafkaConnection : IDisposable { private readonly int bufferSize; private readonly int socketTimeout; private readonly TcpClient client; private volatile bool disposed; /// /// Initializes a new instance of the KafkaConnection class. /// /// The server to connect to. /// The port to connect to. public KafkaConnection(string server, int port, int bufferSize, int socketTimeout) { this.bufferSize = bufferSize; this.socketTimeout = socketTimeout; // connection opened this.client = new TcpClient(server, port) { ReceiveTimeout = socketTimeout, SendTimeout = socketTimeout, ReceiveBufferSize = bufferSize, SendBufferSize = bufferSize }; var stream = this.client.GetStream(); this.Reader = new KafkaBinaryReader(stream); } public KafkaBinaryReader Reader { get; private set; } /// /// Writes a producer request to the server asynchronously. /// /// The request to make. public void BeginWrite(ProducerRequest request) { this.EnsuresNotDisposed(); Guard.NotNull(request, "request"); NetworkStream stream = client.GetStream(); byte[] data = request.RequestBuffer.GetBuffer(); stream.BeginWrite(data, 0, data.Length, asyncResult => ((NetworkStream)asyncResult.AsyncState).EndWrite(asyncResult), stream); } /// /// Writes a producer request to the server asynchronously. /// /// The request to make. /// The code to execute once the message is completely sent. /// /// Do not dispose connection till callback is invoked, /// otherwise underlying network stream will be closed. /// public void BeginWrite(ProducerRequest request, MessageSent callback) { this.EnsuresNotDisposed(); Guard.NotNull(request, "request"); if (callback == null) { this.BeginWrite(request); return; } NetworkStream stream = client.GetStream(); var ctx = new RequestContext(stream, request); byte[] data = request.RequestBuffer.GetBuffer(); stream.BeginWrite( data, 0, data.Length, delegate(IAsyncResult asyncResult) { var context = (RequestContext)asyncResult.AsyncState; callback(context); context.NetworkStream.EndWrite(asyncResult); }, ctx); } /// /// Writes a producer request to the server. /// /// /// Write timeout is defaulted to infitite. /// /// The to send to the server. public void Write(ProducerRequest request) { this.EnsuresNotDisposed(); Guard.NotNull(request, "request"); this.Write(request.RequestBuffer.GetBuffer()); } /// /// Writes a multi-producer request to the server. /// /// /// Write timeout is defaulted to infitite. /// /// The to send to the server. public void Write(MultiProducerRequest request) { this.EnsuresNotDisposed(); Guard.NotNull(request, "request"); this.Write(request.RequestBuffer.GetBuffer()); } /// /// Writes data to the server. /// /// The data to write to the server. private void Write(byte[] data) { NetworkStream stream = this.client.GetStream(); //// Send the message to the connected TcpServer. stream.Write(data, 0, data.Length); } /// /// Writes a fetch request to the server. /// /// /// Write timeout is defaulted to infitite. /// /// The to send to the server. public void Write(FetchRequest request) { this.EnsuresNotDisposed(); Guard.NotNull(request, "request"); this.Write(request.RequestBuffer.GetBuffer()); } /// /// Writes a multifetch request to the server. /// /// /// Write timeout is defaulted to infitite. /// /// The to send to the server. public void Write(MultiFetchRequest request) { this.EnsuresNotDisposed(); Guard.NotNull(request, "request"); this.Write(request.RequestBuffer.GetBuffer()); } /// /// Writes a offset request to the server. /// /// /// Write timeout is defaulted to infitite. /// /// The to send to the server. public void Write(OffsetRequest request) { this.EnsuresNotDisposed(); Guard.NotNull(request, "request"); this.Write(request.RequestBuffer.GetBuffer()); } /// /// Close the connection to the server. /// public void Dispose() { if (this.disposed) { return; } this.disposed = true; if (this.client != null) { this.client.Close(); } } /// /// Ensures that object was not disposed /// private void EnsuresNotDisposed() { if (this.disposed) { throw new ObjectDisposedException(this.GetType().Name); } } } }