/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Linq; using System.Text; using Kafka.Client.Util; namespace Kafka.Client { /// /// Message for Kafka. /// /// /// A message. The format of an N byte message is the following: /// /// /// 1 byte "magic" identifier to allow format changes /// /// /// 4 byte CRC32 of the payload /// /// /// N - 5 byte payload /// /// /// public class Message { /// /// Magic identifier for Kafka. /// private static readonly byte DefaultMagicIdentifier = 0; /// /// Initializes a new instance of the Message class. /// /// /// Uses the as a default. /// /// The data for the payload. public Message(byte[] payload) : this(payload, DefaultMagicIdentifier) { } /// /// Initializes a new instance of the Message class. /// /// /// Initializes the checksum as null. It will be automatically computed. /// /// The data for the payload. /// The magic identifier. public Message(byte[] payload, byte magic) : this(payload, magic, null) { } /// /// Initializes a new instance of the Message class. /// /// The data for the payload. /// The magic identifier. /// The checksum for the payload. public Message(byte[] payload, byte magic, byte[] checksum) { Payload = payload; Magic = magic; Checksum = checksum == null ? CalculateChecksum() : checksum; } /// /// Gets the magic bytes. /// public byte Magic { get; private set; } /// /// Gets the CRC32 checksum for the payload. /// public byte[] Checksum { get; private set; } /// /// Gets the payload. /// public byte[] Payload { get; private set; } /// /// Parses a message from a byte array given the format Kafka likes. /// /// The data for a message. /// The message. public static Message ParseFrom(byte[] data) { int size = BitConverter.ToInt32(BitWorks.ReverseBytes(data.Take(4).ToArray()), 0); byte magic = data[4]; byte[] checksum = data.Skip(5).Take(4).ToArray(); byte[] payload = data.Skip(9).Take(size).ToArray(); return new Message(payload, magic, checksum); } /// /// Converts the message to bytes in the format Kafka likes. /// /// The byte array. public byte[] GetBytes() { byte[] encodedMessage = new byte[Payload.Length + 1 + Checksum.Length]; encodedMessage[0] = Magic; Buffer.BlockCopy(Checksum, 0, encodedMessage, 1, Checksum.Length); Buffer.BlockCopy(Payload, 0, encodedMessage, 1 + Checksum.Length, Payload.Length); return encodedMessage; } /// /// Determines if the message is valid given the payload and its checksum. /// /// True if valid and false otherwise. public bool IsValid() { return Checksum.SequenceEqual(CalculateChecksum()); } /// /// Try to show the payload as decoded to UTF-8. /// /// The decoded payload as string. public override string ToString() { return Encoding.UTF8.GetString(Payload); } /// /// Calculates the CRC32 checksum on the payload of the message. /// /// The checksum given the payload. private byte[] CalculateChecksum() { Crc32 crc32 = new Crc32(); return crc32.ComputeHash(Payload); } } }