/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Linq; using System.Text; using Kafka.Client.Request; using Kafka.Client.Util; namespace Kafka.Client { /// /// Consumes messages from Kafka. /// public class Consumer { /// /// Maximum size. /// private static readonly int MaxSize = 1048576; /// /// Initializes a new instance of the Consumer class. /// /// The server to connect to. /// The port to connect to. public Consumer(string server, int port) { Server = server; Port = port; } /// /// Gets the server to which the connection is to be established. /// public string Server { get; private set; } /// /// Gets the port to which the connection is to be established. /// public int Port { get; private set; } /// /// Consumes messages from Kafka. /// /// The topic to consume from. /// The partition to consume from. /// The offset to start at. /// A list of messages from Kafka. public List Consume(string topic, int partition, long offset) { return Consume(topic, partition, offset, MaxSize); } /// /// Consumes messages from Kafka. /// /// The topic to consume from. /// The partition to consume from. /// The offset to start at. /// The maximum size. /// A list of messages from Kafka. public List Consume(string topic, int partition, long offset, int maxSize) { return Consume(new FetchRequest(topic, partition, offset, maxSize)); } /// /// Consumes messages from Kafka. /// /// The request to send to Kafka. /// A list of messages from Kafka. public List Consume(FetchRequest request) { List messages = new List(); using (KafkaConnection connection = new KafkaConnection(Server, Port)) { connection.Write(request.GetBytes()); int dataLength = BitConverter.ToInt32(BitWorks.ReverseBytes(connection.Read(4)), 0); if (dataLength > 0) { byte[] data = connection.Read(dataLength); int errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Take(2).ToArray()), 0); if (errorCode != KafkaException.NoError) { throw new KafkaException(errorCode); } // skip the error code and process the rest byte[] unbufferedData = data.Skip(2).ToArray(); int processed = 0; int length = unbufferedData.Length - 4; int messageSize = 0; while (processed <= length) { messageSize = BitConverter.ToInt32(BitWorks.ReverseBytes(unbufferedData.Skip(processed).Take(4).ToArray()), 0); messages.Add(Message.ParseFrom(unbufferedData.Skip(processed).Take(messageSize + 4).ToArray())); processed += 4 + messageSize; } } } return messages; } /// /// Executes a multi-fetch operation. /// /// The request to push to Kafka. /// /// A list containing sets of messages. The message sets should match the request order. /// public List> Consume(MultiFetchRequest request) { int fetchRequests = request.ConsumerRequests.Count; List> messages = new List>(); using (KafkaConnection connection = new KafkaConnection(Server, Port)) { connection.Write(request.GetBytes()); int dataLength = BitConverter.ToInt32(BitWorks.ReverseBytes(connection.Read(4)), 0); if (dataLength > 0) { byte[] data = connection.Read(dataLength); int position = 0; int errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Take(2).ToArray()), 0); if (errorCode != KafkaException.NoError) { throw new KafkaException(errorCode); } // skip the error code and process the rest position = position + 2; for (int ix = 0; ix < fetchRequests; ix++) { messages.Add(new List()); int messageSetSize = BitConverter.ToInt32(BitWorks.ReverseBytes(data.Skip(position).Take(4).ToArray()), 0); position = position + 4; errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Skip(position).Take(2).ToArray()), 0); if (errorCode != KafkaException.NoError) { throw new KafkaException(errorCode); } // skip the error code and process the rest position = position + 2; byte[] messageSetBytes = data.Skip(position).ToArray().Take(messageSetSize).ToArray(); int processed = 0; int messageSize = 0; // dropped 2 bytes at the end...padding??? while (processed < messageSetBytes.Length - 2) { messageSize = BitConverter.ToInt32(BitWorks.ReverseBytes(messageSetBytes.Skip(processed).Take(4).ToArray()), 0); messages[ix].Add(Message.ParseFrom(messageSetBytes.Skip(processed).Take(messageSize + 4).ToArray())); processed += 4 + messageSize; } position = position + processed; } } } return messages; } /// /// Get a list of valid offsets (up to maxSize) before the given time. /// /// The topic to check. /// The partition on the topic. /// time in millisecs (if -1, just get from the latest available) /// That maximum number of offsets to return. /// List of offsets, in descending order. public IList GetOffsetsBefore(string topic, int partition, long time, int maxNumOffsets) { return GetOffsetsBefore(new OffsetRequest(topic, partition, time, maxNumOffsets)); } /// /// Get a list of valid offsets (up to maxSize) before the given time. /// /// The offset request. /// List of offsets, in descending order. public IList GetOffsetsBefore(OffsetRequest request) { List offsets = new List(); using (KafkaConnection connection = new KafkaConnection(Server, Port)) { connection.Write(request.GetBytes()); int dataLength = BitConverter.ToInt32(BitWorks.ReverseBytes(connection.Read(4)), 0); if (dataLength > 0) { byte[] data = connection.Read(dataLength); int errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Take(2).ToArray()), 0); if (errorCode != KafkaException.NoError) { throw new KafkaException(errorCode); } // skip the error code and process the rest byte[] unbufferedData = data.Skip(2).ToArray(); // first four bytes are the number of offsets int numOfOffsets = BitConverter.ToInt32(BitWorks.ReverseBytes(unbufferedData.Take(4).ToArray()), 0); int position = 0; for (int ix = 0; ix < numOfOffsets; ix++) { position = (ix * 8) + 4; offsets.Add(BitConverter.ToInt64(BitWorks.ReverseBytes(unbufferedData.Skip(position).Take(8).ToArray()), 0)); } } } return offsets; } } }