/* * Copyright (c) 2011 NeuStar, Inc. * All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * NeuStar, the Neustar logo and related names and logos are registered * trademarks, service marks or tradenames of NeuStar, Inc. All other * product names, company names, marks, logos and symbols may be trademarks * of their respective owners. */ package kafka import ( "log" "net" "os" "fmt" "encoding/binary" "io" "bufio" ) const ( NETWORK = "tcp" ) type Broker struct { topic string partition int hostname string } func newBroker(hostname string, topic string, partition int) *Broker { return &Broker{topic: topic, partition: partition, hostname: hostname} } func (b *Broker) connect() (conn *net.TCPConn, error os.Error) { raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname) if err != nil { log.Println("Fatal Error: ", err) return nil, err } conn, err = net.DialTCP(NETWORK, nil, raddr) if err != nil { log.Println("Fatal Error: ", err) return nil, err } return conn, error } // returns length of response & payload & err func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, os.Error) { reader := bufio.NewReader(conn) length := make([]byte, 4) lenRead, err := io.ReadFull(reader, length) if err != nil { return 0, []byte{}, err } if lenRead != 4 || lenRead < 0 { return 0, []byte{}, os.NewError("invalid length of the packet length field") } expectedLength := binary.BigEndian.Uint32(length) messages := make([]byte, expectedLength) lenRead, err = io.ReadFull(reader, messages) if err != nil { return 0, []byte{}, err } if uint32(lenRead) != expectedLength { return 0, []byte{}, os.NewError(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength)) } errorCode := binary.BigEndian.Uint16(messages[0:2]) if errorCode != 0 { log.Println("errorCode: ", errorCode) return 0, []byte{}, os.NewError( fmt.Sprintf("Broker Response Error: %d", errorCode)) } return expectedLength, messages[2:], nil }