/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Linq;
using System.Text;
using Kafka.Client.Util;
namespace Kafka.Client
{
///
/// Message for Kafka.
///
///
/// A message. The format of an N byte message is the following:
///
/// -
/// 1 byte "magic" identifier to allow format changes
///
/// -
/// 4 byte CRC32 of the payload
///
/// -
/// N - 5 byte payload
///
///
///
public class Message
{
///
/// Magic identifier for Kafka.
///
private static readonly byte DefaultMagicIdentifier = 0;
///
/// Initializes a new instance of the Message class.
///
///
/// Uses the as a default.
///
/// The data for the payload.
public Message(byte[] payload) : this(payload, DefaultMagicIdentifier)
{
}
///
/// Initializes a new instance of the Message class.
///
///
/// Initializes the checksum as null. It will be automatically computed.
///
/// The data for the payload.
/// The magic identifier.
public Message(byte[] payload, byte magic) : this(payload, magic, null)
{
}
///
/// Initializes a new instance of the Message class.
///
/// The data for the payload.
/// The magic identifier.
/// The checksum for the payload.
public Message(byte[] payload, byte magic, byte[] checksum)
{
Payload = payload;
Magic = magic;
Checksum = checksum == null ? CalculateChecksum() : checksum;
}
///
/// Gets the magic bytes.
///
public byte Magic { get; private set; }
///
/// Gets the CRC32 checksum for the payload.
///
public byte[] Checksum { get; private set; }
///
/// Gets the payload.
///
public byte[] Payload { get; private set; }
///
/// Parses a message from a byte array given the format Kafka likes.
///
/// The data for a message.
/// The message.
public static Message ParseFrom(byte[] data)
{
int size = BitConverter.ToInt32(BitWorks.ReverseBytes(data.Take(4).ToArray()), 0);
byte magic = data[4];
byte[] checksum = data.Skip(5).Take(4).ToArray();
byte[] payload = data.Skip(9).Take(size).ToArray();
return new Message(payload, magic, checksum);
}
///
/// Converts the message to bytes in the format Kafka likes.
///
/// The byte array.
public byte[] GetBytes()
{
byte[] encodedMessage = new byte[Payload.Length + 1 + Checksum.Length];
encodedMessage[0] = Magic;
Buffer.BlockCopy(Checksum, 0, encodedMessage, 1, Checksum.Length);
Buffer.BlockCopy(Payload, 0, encodedMessage, 1 + Checksum.Length, Payload.Length);
return encodedMessage;
}
///
/// Determines if the message is valid given the payload and its checksum.
///
/// True if valid and false otherwise.
public bool IsValid()
{
return Checksum.SequenceEqual(CalculateChecksum());
}
///
/// Try to show the payload as decoded to UTF-8.
///
/// The decoded payload as string.
public override string ToString()
{
return Encoding.UTF8.GetString(Payload);
}
///
/// Calculates the CRC32 checksum on the payload of the message.
///
/// The checksum given the payload.
private byte[] CalculateChecksum()
{
Crc32 crc32 = new Crc32();
return crc32.ComputeHash(Payload);
}
}
}