/* * Copyright (c) 2011 NeuStar, Inc. * All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * NeuStar, the Neustar logo and related names and logos are registered * trademarks, service marks or tradenames of NeuStar, Inc. All other * product names, company names, marks, logos and symbols may be trademarks * of their respective owners. */ package kafka import ( "encoding/binary" "bytes" ) type RequestType uint16 // Request Types const ( REQUEST_PRODUCE RequestType = 0 REQUEST_FETCH = 1 REQUEST_MULTIFETCH = 2 REQUEST_MULTIPRODUCE = 3 REQUEST_OFFSETS = 4 ) // Request Header: func (b *Broker) EncodeRequestHeader(requestType RequestType) *bytes.Buffer { request := bytes.NewBuffer([]byte{}) request.Write(uint32bytes(0)) // placeholder for request size request.Write(uint16bytes(int(requestType))) request.Write(uint16bytes(len(b.topic))) request.WriteString(b.topic) request.Write(uint32bytes(b.partition)) return request } // after writing to the buffer is complete, encode the size of the request in the request. func encodeRequestSize(request *bytes.Buffer) { binary.BigEndian.PutUint32(request.Bytes()[0:], uint32(request.Len()-4)) } // func (b *Broker) EncodeOffsetRequest(time int64, maxNumOffsets uint32) []byte { request := b.EncodeRequestHeader(REQUEST_OFFSETS) // specific to offset request request.Write(uint64ToUint64bytes(uint64(time))) request.Write(uint32toUint32bytes(maxNumOffsets)) encodeRequestSize(request) return request.Bytes() } // func (b *Broker) EncodeConsumeRequest(offset uint64, maxSize uint32) []byte { request := b.EncodeRequestHeader(REQUEST_FETCH) // specific to consume request request.Write(uint64ToUint64bytes(offset)) request.Write(uint32toUint32bytes(maxSize)) encodeRequestSize(request) return request.Bytes() } // func (b *Broker) EncodePublishRequest(messages ...*Message) []byte { // 4 + 2 + 2 + topicLength + 4 + 4 request := b.EncodeRequestHeader(REQUEST_PRODUCE) messageSetSizePos := request.Len() request.Write(uint32bytes(0)) // placeholder message len written := 0 for _, message := range messages { wrote, _ := request.Write(message.Encode()) written += wrote } // now add the accumulated size of that the message set was binary.BigEndian.PutUint32(request.Bytes()[messageSetSizePos:], uint32(written)) // now add the size of the whole to the first uint32 encodeRequestSize(request) return request.Bytes() }