/* * Copyright (c) 2011 NeuStar, Inc. * All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * NeuStar, the Neustar logo and related names and logos are registered * trademarks, service marks or tradenames of NeuStar, Inc. All other * product names, company names, marks, logos and symbols may be trademarks * of their respective owners. */ package kafka import ( "hash/crc32" "encoding/binary" "bytes" "log" ) const ( // Compression Support uses '1' - https://cwiki.apache.org/confluence/display/KAFKA/Compression MAGIC_DEFAULT = 1 // magic + compression + chksum NO_LEN_HEADER_SIZE = 1 + 1 + 4 ) type Message struct { magic byte compression byte checksum [4]byte payload []byte offset uint64 // only used after decoding totalLength uint32 // total length of the raw message (from decoding) } func (m *Message) Offset() uint64 { return m.offset } func (m *Message) Payload() []byte { return m.payload } func (m *Message) PayloadString() string { return string(m.payload) } func NewMessageWithCodec(payload []byte, codec PayloadCodec) *Message { message := &Message{} message.magic = byte(MAGIC_DEFAULT) message.compression = codec.Id() message.payload = codec.Encode(payload) binary.BigEndian.PutUint32(message.checksum[0:], crc32.ChecksumIEEE(message.payload)) return message } // Default is is create a message with no compression func NewMessage(payload []byte) *Message { return NewMessageWithCodec(payload, DefaultCodecsMap[NO_COMPRESSION_ID]) } // Create a Message using the default compression method (gzip) func NewCompressedMessage(payload []byte) *Message { return NewCompressedMessages(NewMessage(payload)) } func NewCompressedMessages(messages ...*Message) *Message { buf := bytes.NewBuffer([]byte{}) for _, message := range messages { buf.Write(message.Encode()) } return NewMessageWithCodec(buf.Bytes(), DefaultCodecsMap[GZIP_COMPRESSION_ID]) } // MESSAGE SET: func (m *Message) Encode() []byte { msgLen := NO_LEN_HEADER_SIZE + len(m.payload) msg := make([]byte, 4+msgLen) binary.BigEndian.PutUint32(msg[0:], uint32(msgLen)) msg[4] = m.magic msg[5] = m.compression copy(msg[6:], m.checksum[0:]) copy(msg[10:], m.payload) return msg } func DecodeWithDefaultCodecs(packet []byte) (uint32, []Message) { return Decode(packet, DefaultCodecsMap) } func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message) { messages := []Message{} length, message := decodeMessage(packet, payloadCodecsMap) if length > 0 && message != nil { if message.compression != NO_COMPRESSION_ID { // wonky special case for compressed messages having embedded messages payloadLen := uint32(len(message.payload)) messageLenLeft := payloadLen for messageLenLeft > 0 { start := payloadLen - messageLenLeft innerLen, innerMsg := decodeMessage(message.payload[start:], payloadCodecsMap) messageLenLeft = messageLenLeft - innerLen - 4 // message length uint32 messages = append(messages, *innerMsg) } } else { messages = append(messages, *message) } } return length, messages } func decodeMessage(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, *Message) { length := binary.BigEndian.Uint32(packet[0:]) if length > uint32(len(packet[4:])) { log.Printf("length mismatch, expected at least: %X, was: %X\n", length, len(packet[4:])) return 0, nil } msg := Message{} msg.totalLength = length msg.magic = packet[4] rawPayload := []byte{} if msg.magic == 0 { msg.compression = byte(0) copy(msg.checksum[:], packet[5:9]) payloadLength := length - 1 - 4 rawPayload = packet[9 : 9+payloadLength] } else if msg.magic == MAGIC_DEFAULT { msg.compression = packet[5] copy(msg.checksum[:], packet[6:10]) payloadLength := length - NO_LEN_HEADER_SIZE rawPayload = packet[10 : 10+payloadLength] } else { log.Printf("incorrect magic, expected: %X was: %X\n", MAGIC_DEFAULT, msg.magic) return 0, nil } payloadChecksum := make([]byte, 4) binary.BigEndian.PutUint32(payloadChecksum, crc32.ChecksumIEEE(rawPayload)) if !bytes.Equal(payloadChecksum, msg.checksum[:]) { msg.Print() log.Printf("checksum mismatch, expected: % X was: % X\n", payloadChecksum, msg.checksum[:]) return 0, nil } msg.payload = payloadCodecsMap[msg.compression].Decode(rawPayload) return length, &msg } func (msg *Message) Print() { log.Println("----- Begin Message ------") log.Printf("magic: %X\n", msg.magic) log.Printf("compression: %X\n", msg.compression) log.Printf("checksum: %X\n", msg.checksum) if len(msg.payload) < 1048576 { // 1 MB log.Printf("payload: % X\n", msg.payload) log.Printf("payload(string): %s\n", msg.PayloadString()) } else { log.Printf("long payload, length: %d\n", len(msg.payload)) } log.Printf("length: %d\n", msg.totalLength) log.Printf("offset: %d\n", msg.offset) log.Println("----- End Message ------") }