This is a Hadoop job that pulls data from kafka server into HDFS. It requires the following inputs from a configuration file (test/test.properties is an example) kafka.etl.topic : the topic to be fetched; input : input directory containing topic offsets and it can be generated by DataGenerator; the number of files in this directory determines the number of mappers in the hadoop job; output : output directory containing kafka data and updated topic offsets; kafka.request.limit : it is used to limit the number events fetched. KafkaETLRecordReader is a record reader associated with KafkaETLInputFormat. It fetches kafka data from the server. It starts from provided offsets (specified by "input") and stops when it reaches the largest available offsets or the specified limit (specified by "kafka.request.limit"). KafkaETLJob contains some helper functions to initialize job configuration. SimpleKafkaETLJob sets up job properties and files Hadoop job. SimpleKafkaETLMapper dumps kafka data into hdfs. HOW TO RUN: In order to run this, make sure the HADOOP_HOME environment variable points to your hadoop installation directory. 1. Complile using "sbt" to create a package for hadoop consumer code. ./sbt package 2. Run the hadoop-setup.sh script that enables write permission on the required HDFS directory 3. Produce test events in server and generate offset files 1) Start kafka server [ Follow the quick start - http://sna-projects.com/kafka/quickstart.php ] 2) Update test/test.properties to change the following parameters: kafka.etl.topic : topic name event.count : number of events to be generated kafka.server.uri : kafka server uri; input : hdfs directory of offset files 3) Produce test events to Kafka server and generate offset files ./run-class.sh kafka.etl.impl.DataGenerator test/test.properties 4. Fetch generated topic into HDFS: 1) Update test/test.properties to change the following parameters: hadoop.job.ugi : id and group input : input location output : output location kafka.request.limit: limit the number of events to be fetched; -1 means no limitation. hdfs.default.classpath.dir : hdfs location of jars 2) copy jars into hdfs ./copy-jars.sh ${hdfs.default.classpath.dir} 2) Fetch data ./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties