Log Walkthrough

To wrap up this section, let's take a look at the log for a TaskRunner running on YARN. Logs like the one I'm going to show can be seen through the Samsa dashboard (e.g. https://eat1-qa463.corp:8443/jobs/logs/eat1-qa466.corp.linkedin.com:9999/container1360690887210003201000002/stdout), or by going straight to the YARN web app (e.g. http://eat1-qa466.corp.linkedin.com:9999/node/container/container1360690887210003201000002).

Let's go line by line and I'll explain what's happening. The first thing that happens in the logs is a dump of the command that was executed to start the TaskRunner.

/export/apps/jdk/JDK-1_6_0_21/bin/java -Dlog4j.configuration=jar:file:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/samsa-core-0.6.1.jar!/log4j.xml -cp /export/content/glu/apps/samsa-yarn-nm/i001/conf:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/activation-1.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/aopalliance-1.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/aqapi13-10.2.0.2.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/asm-3.2.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/aspectjrt-1.6.5.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/avro-1.4.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/avro-schemas-builder-3.0.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/camel-core-2.5.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/camel-josql-2.5.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/cfg-2.12.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/cfg-api-6.6.6.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/cfg-impl-6.6.6.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/cglib-2.2.1-v20090111.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/cglib-nodep-2.2.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/clover-3.0.2.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/com.linkedin.customlibrary.j2ee-1.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/com.linkedin.customlibrary.mx4j-3.0.2.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/com.linkedin.customlibrary.xmlparserv2-10.2.0.2.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/com.linkedin.customlibrary.xmsg-0.6.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-beanutils-1.7.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-beanutils-core-1.8.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-cli-1.2.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-codec-1.4.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-collections-3.2.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-compress-1.2.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-configuration-1.6.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-dbcp-1.2.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-digester-1.8.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-el-1.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-httpclient-3.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-io-2.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-lang-2.5.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-logging-1.1.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-logging-api-1.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-management-1.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-math-2.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-net-1.4.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/commons-pool-1.5.6.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/com.noelios.restlet-1.1.10.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/configuration-api-1.4.53.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/configuration-repository-impl-1.4.53.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/container-eventbus-api-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/container-http-impl-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/container-ic-api-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/container-ic-finder-factory-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/container-ic-impl-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/container-impl-3.2.20.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/container-jmx-factory-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/container-rpc-impl-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/container-rpc-trace-api-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/container-rpc-trace-factory-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/container-rpc-trace-impl-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/container-service-trace-api-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/container-service-trace-impl-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/container-service-trace-servlet-api-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/core-3.1.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/dal-api-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/dal-common-impl-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/dal-drc-api-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/dal-drc-impl-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/dal-dsc-api-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/dal-dsc-impl-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/data-1.5.12.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/data-avro-1.5.12.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/databus2-espresso-core-2.1.125.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/databus3-client-impl-2.1.125.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/databus-client-api-2.1.125.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/databus-client-impl-2.1.125.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/databus-cluster-manager-utils-2.1.125.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/databus-core-container-2.1.125.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/databus-core-fwk-test-2.1.125.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/databus-core-impl-2.1.125.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/databus-core-schemas-2.1.125.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/databus-group-leader-api-2.1.125.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/databus-group-leader-impl-2.1.125.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/data-transform-1.5.12.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/dom4j-1.6.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/espresso-client-pub-0.5.214.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/espresso-common-impl-0.5.214.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/espresso-schema-impl-0.5.214.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/espresso-store-impl-0.5.214.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/gentlyweb-utils-1.5.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/gnutar-1.1.5.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/grizzled-slf4j_2.8.1-0.6.10.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/guava-r09.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/guice-3.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/guice-servlet-3.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/hadoop-annotations-0.23.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/hadoop-auth-0.23.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/hadoop-common-0.23.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/hadoop-yarn-api-0.23.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/hadoop-yarn-common-0.23.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/healthcheck-api-3.2.109.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/helix-core-0.5.28.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/hsqldb-1.8.0.7.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/httpclient-4.1.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jackson-core-asl-1.8.8.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jackson-core-asl-1.8.8-sources.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jackson-jaxrs-1.8.5.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jackson-mapper-asl-1.8.8.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jackson-mapper-asl-1.8.8-sources.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jackson-xc-1.7.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jasper-compiler-5.5.23.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jasper-runtime-5.5.23.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/javax.inject-1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jaxb-api-2.2.2.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jaxb-impl-2.2.3-1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jaxen-1.0-FCS.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jdiff-1.0.9.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jdom-1.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jersey-core-1.8.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jersey-guice-1.8.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jersey-json-1.8.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jersey-server-1.8.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jersey-test-framework-grizzly2-1.8.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jets3t-0.6.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jettison-1.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jetty-6.1.26.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jetty-util-6.1.26.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jmx-impl-1.4.53.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/joda-time-1.6.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jopt-simple-3.2.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/josql-1.5.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/json-20070829.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/json-simple-1.1.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/json-simple-1.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jsp-api-2.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/jsqlparser-.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/kafka-0.8.0_0.8_25d77cc69-SNAPSHOT.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/kfs-0.3.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/lispring-lispring-core-1.4.53.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/lispring-lispring-servlet-1.4.53.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/log4j-1.2.17.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/lucene-core-2.9.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/mail-1.3.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/metrics-amf-0.0.9.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/metrics-core-0.0.9.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/metrics-core-2.9.91.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/mockito-all-1.8.4.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/mx4j-tools-3.0.2.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/mysql-connector-java-5.1.14.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/netty-3.2.4.Final.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/ojdbc14-10.2.0.2.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/org.restlet-1.1.10.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/oro-2.0.8.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/page-key-groups-aggregator-0.0.55.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/page-key-groups-counter-0.0.55.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/protobuf-java-2.4.0a.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/samsa-api-0.6.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/samsa-bundle-0.5.39.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/samsa-core-0.6.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/samsa-kafka-0.6.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/samsa-li-0.5.39.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/samsa-serializers-0.6.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/samsa-yarn-0.6.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/saxpath-1.0-FCS.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/scala-library-2.8.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/servlet-api-2.5.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/slf4j-api-1.7.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/slf4j-log4j12-1.6.2.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/slf4j-simple-1.6.4.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/snappy-java-1.0.4.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/spring-aop-3.0.3.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/spring-asm-3.0.3.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/spring-aspects-3.0.3.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/spring-beans-3.0.3.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/spring-context-3.0.3.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/spring-context-support-3.0.3.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/spring-core-3.0.3.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/spring-expression-3.0.3.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/spring-jdbc-3.0.3.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/spring-jms-3.0.3.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/spring-orm-3.0.3.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/spring-transaction-3.0.3.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/spring-web-3.0.3.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/spring-web-servlet-3.0.3.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/stax-api-1.0.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/tdgssconfig.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/terajdbc4.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/testng-6.4.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/util-core-4.1.41.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/util-core-factory-4.1.41.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/util-factory-4.1.41.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/util-factory-servlet-4.1.41.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/util-i18n-4.1.41.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/util-jmx-4.1.41.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/util-log-4.1.41.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/util-servlet-4.1.41.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/util-sql-4.0.12.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/util-xmsg-4.1.41.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/xdb-10.2.0.2.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/xml-apis-1.3.04.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/xmlenc-0.52.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/xpp3_min-1.1.4c.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/xstream-1.3.1.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/zkclient-0.3.0.jar:/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/container_1360690887210_0032_01_000002/package/lib/zookeeper-3.3.4.jar -XX:NewSize=192m -XX:MaxNewSize=192m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:/export/content/glu/apps/samsa-yarn-nm/i001/logs/application_1360690887210_0032/container_1360690887210_0032_01_000002/gc.log -d64 -server -Xmx512M samsa.task.JavaTaskRunner

This gets you the classpath, garbage collector settings, log information, heap size, etc. You can see that the TaskRunner that's running is samsa.task.JavaTaskRunner. The JavaTaskRunner class is just an extension of TaskRunner that manages the wiring and execution of the TaskRunner.

Once the task runner starts, it starts doing the wiring.

2013-02-20 22:23:29 JavaTaskRunner [INFO] No lifecycle listeners found

The JavaTaskRunner is letting you know that, during wiring, it didn't find any lifecycle listeners. This should be expected, unless you defined a lifecycle listener in your configuration. At LinkedIn, the samsa-li package has one lifecycle listener: GeneratorLifecycleListener. This class is used to manage the icFinder (invocation context) and rpcTraceHandler for RestLI. If you don't know what that stuff is, don't worry about it. It's only important when making RestLI service calls.

After the JavaTaskRunner has wired everything up, it calls run() on the TaskRunner, which is where all of the remaining logs come from. The TaskRunner starts by letting you know which StreamTask class it's going to run, and it gives you a full dump of the configuration it's seeing.

2013-02-20 22:23:29 JavaTaskRunner [INFO] Running task class com.linkedin.samsa.jobs.ingraphs.PageKeyGroupsInGraphAggregator with config {systems.kafka.producer.retry.backoff.ms=750, metrics.reporter.snapshot.class=samsa.metrics.reporter.MetricsSnapshotReporterFactory, streams.page-view-event-by-group.stream=PageViewEventByGroupJson, systems.kafka-checkpoints.producer.request.timeout.ms=5000, systems.kafka-checkpoints.zk.connect=eat1-app309.corp.linkedin.com:12913,eat1-app310.corp.linkedin.com:12913,eat1-app311.corp.linkedin.com:12913,eat1-app312.corp.linkedin.com:12913,eat1-app313.corp.linkedin.com:12913/kafka-samsa, streams.page-view-event-by-group.system=kafka, serializers.registry.avro-schemas.schemas=http://eat1-app110.stg.linkedin.com:10252/schemaRegistry/schemas, metrics.reporter.amf.http=http://eat1-amf-vip-z.corp.linkedin.com/api/v1, systems.kafka-checkpoints.producer.retry.backoff.ms=10000, streams.samsa-metrics.serde=metrics-snapshot, systems.kafka.autocommit.interval.ms=60000, systems.kafka.samsa.consumer.factory=samsa.stream.kafka.KafkaConsumerFactory, systems.kafka.producer.request.timeout.ms=5000, task.lifecycle.listener.generator.fabric=STG-BETA, systems.kafka.samsa.partition.manager=samsa.stream.kafka.KafkaPartitionManager, systems.kafka.producer.num.retries=2, task.class=com.linkedin.samsa.jobs.ingraphs.PageKeyGroupsInGraphAggregator, yarn.package.path=http://artifactory.corp.linkedin.com:8081/artifactory/simple/SNA/com/linkedin/samsa-hello-world/ingraphs-package/0.0.55/ingraphs-package-0.0.55.tgz, systems.kafka.zk.connectiontimeout.ms=1000000, yarn.container.memory.mb=768, job.factory.class=samsa.job.yarn.YarnJobFactory, systems.kafka.zk.connect=eat1-app309.corp.linkedin.com:12913,eat1-app310.corp.linkedin.com:12913,eat1-app311.corp.linkedin.com:12913,eat1-app312.corp.linkedin.com:12913,eat1-app313.corp.linkedin.com:12913/kafka-samsa, job.name=page-key-groups-aggregator-eat1-v3, task.checkpoint.factory=samsa.task.state.KafkaCheckpointManagerFactory, systems.kafka-checkpoints.broker.list=eat1-qa464.corp.linkedin.com:10251,eat1-qa465.corp.linkedin.com:10251,eat1-qa466.corp.linkedin.com:10251,eat1-qa467.corp.linkedin.com:10251,eat1-qa468.corp.linkedin.com:10251, systems.kafka.producer.type=async, metrics.reporters=snapshot,amf, systems.kafka.autooffset.reset=largest, task.checkpoint.system=kafka-checkpoints, serializers.registry.databus-avro-schemas.class=com.linkedin.samsa.serializers.DatabusEventAvroSerdeFactory, systems.kafka-checkpoints.partitioner.class=samsa.task.state.KafkaCheckpointPartitioner, systems.kafka-checkpoints.producer.type=sync, task.lifecycle.listener.generator.class=com.linkedin.samsa.task.GeneratorLifecycleListenerFactory, serializers.default=avro-schemas, metrics.reporter.amf.suffix=, serializers.registry.json.class=samsa.serializers.JsonSerdeFactory, systems.kafka.samsa.producer.factory=samsa.stream.kafka.KafkaProducerFactory, streams.samsa-metrics.stream=SamsaMetrics, systems.kafka-checkpoints.max.message.size=1300000, streams.page-view-event-by-group.serde=json, metrics.reporter.snapshot.stream=samsa-metrics, systems.kafka.max.message.size=1300000, yarn.jvm.args=-Xmx512M, streams.samsa-metrics.system=kafka, task.inputs=page-view-event-by-group, serializers.registry.metrics-snapshot.class=samsa.serializers.MetricsSnapshotSerdeFactory, systems.kafka-checkpoints.serializer.class=samsa.task.state.KafkaCheckpointEncoder, metrics.reporter.amf.class=com.linkedin.samsa.metrics.reporter.AmfReporterFactory, systems.kafka-checkpoints.queue.enqueueTimeout.ms=-1, systems.kafka.rebalance.retries.max=100, systems.kafka.queue.enqueueTimeout.ms=-1, systems.kafka-checkpoints.key.serializer.class=kafka.serializer.NullEncoder, serializers.registry.avro-schemas.class=com.linkedin.samsa.serializers.SchemaRegistrySerdeFactory, systems.kafka.queuedchunks.max=2, systems.kafka-checkpoints.producer.num.retries=3, systems.kafka.fetch.size=1200000, systems.kafka-checkpoints.producer.request.required.acks=-1, systems.kafka.broker.list=eat1-qa464.corp.linkedin.com:10251,eat1-qa465.corp.linkedin.com:10251,eat1-qa466.corp.linkedin.com:10251,eat1-qa467.corp.linkedin.com:10251,eat1-qa468.corp.linkedin.com:10251, systems.kafka-checkpoints.zk.connectiontimeout.ms=1000000}

Next, the TaskRunner is going to try and load the offsets for your job's input stream partitions.

2013-02-20 22:23:29 JavaTaskRunner [INFO] Restore all input offsets for each stream/partition pair

Since we're using the KafkaCheckpointManager, we'll use Kafka to pull in all of the offsets. The first thing that happens is that the TaskRunner gets the KafkaCheckpointManager from the KafkaCheckpointManagerFactory. The factory creates Kafka's VerifiableProperties class, which dumps some output about the properties that it sees.

2013-02-20 22:23:29 VerifiableProperties [INFO] Verifying properties
2013-02-20 22:23:29 VerifiableProperties [INFO] Property key.serializer.class is overridden to kafka.serializer.NullEncoder
2013-02-20 22:23:29 VerifiableProperties [WARN] Property zk.connect is not valid
2013-02-20 22:23:29 VerifiableProperties [INFO] Property serializer.class is overridden to samsa.task.state.KafkaCheckpointEncoder
2013-02-20 22:23:29 VerifiableProperties [INFO] Property producer.retry.backoff.ms is overridden to 10000
2013-02-20 22:23:29 VerifiableProperties [INFO] Property producer.request.required.acks is overridden to -1
2013-02-20 22:23:29 VerifiableProperties [INFO] Property broker.list is overridden to eat1-qa464.corp.linkedin.com:10251,eat1-qa465.corp.linkedin.com:10251,eat1-qa466.corp.linkedin.com:10251,eat1-qa467.corp.linkedin.com:10251,eat1-qa468.corp.linkedin.com:10251
2013-02-20 22:23:29 VerifiableProperties [WARN] Property zk.connectiontimeout.ms is not valid
2013-02-20 22:23:29 VerifiableProperties [INFO] Property max.message.size is overridden to 1300000
2013-02-20 22:23:29 VerifiableProperties [INFO] Property partitioner.class is overridden to samsa.task.state.KafkaCheckpointPartitioner
2013-02-20 22:23:29 VerifiableProperties [INFO] Property producer.type is overridden to sync
2013-02-20 22:23:29 VerifiableProperties [INFO] Property producer.num.retries is overridden to 3
2013-02-20 22:23:29 VerifiableProperties [INFO] Property queue.enqueueTimeout.ms is overridden to -1
2013-02-20 22:23:29 VerifiableProperties [INFO] Property producer.request.timeout.ms is overridden to 5000

Notice some of the WARNs that the class is complaining about. What's happening is that the configuration defined for the Kafka system (systems.kafka.) is being passed into Kafka's ProducerProperties object inside of the KafkaCheckpointManager. The systems.kafka. properties define config for both the consumer and producer. The WARNs are coming from consumer properties that the ProducerProperties doesn't need, or know about. This is harmless.

After the KafkaCheckpointManagerFactory creates the ProducerProperties (which will be used when writeCheckpoint is called), it uses the KafkaPartitionManager to figure out how many partitions the job has. It does this by taking the max of all input stream partition counts (e.g. if you have an input stream with 4 partitions, and another one with 8, the job will have 8 partitions). To figure out the partition information for all input topics, the KafkaPartitionManager uses ZooKeeper to get the topic's metadata.

2013-02-20 22:23:29 ZkEventThread [INFO] Starting ZkClient event thread.
2013-02-20 22:23:29 ZooKeeper [INFO] Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT
2013-02-20 22:23:29 ZooKeeper [INFO] Client environment:host.name=eat1-qa466.corp.linkedin.com
2013-02-20 22:23:29 ZooKeeper [INFO] Client environment:java.version=1.6.0_21
2013-02-20 22:23:29 ZooKeeper [INFO] Client environment:java.vendor=Sun Microsystems Inc.
2013-02-20 22:23:29 ZooKeeper [INFO] Client environment:java.home=/export/apps/jdk/JDK-1_6_0_21/jre
2013-02-20 22:23:29 ZooKeeper [INFO] Client environment:java.class.path=...
2013-02-20 22:23:29 ZooKeeper [INFO] Client environment:java.library.path=/export/apps/jdk/JDK-1_6_0_21/jre/lib/amd64/server:/export/apps/jdk/JDK-1_6_0_21/jre/lib/amd64:/export/apps/jdk/JDK-1_6_0_21/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2013-02-20 22:23:29 ZooKeeper [INFO] Client environment:java.io.tmpdir=/tmp
2013-02-20 22:23:29 ZooKeeper [INFO] Client environment:java.compiler=
2013-02-20 22:23:29 ZooKeeper [INFO] Client environment:os.name=Linux
2013-02-20 22:23:29 ZooKeeper [INFO] Client environment:os.arch=amd64
2013-02-20 22:23:29 ZooKeeper [INFO] Client environment:os.version=2.6.32-220.13.1.el6.x86_64
2013-02-20 22:23:29 ZooKeeper [INFO] Client environment:user.name=app
2013-02-20 22:23:29 ZooKeeper [INFO] Client environment:user.home=/home/app
2013-02-20 22:23:29 ZooKeeper [INFO] Client environment:user.dir=/export/content/data/samsa-yarn/usercache/app/appcache/application_1360690887210_0032/filecache/-3646196501335668904/ingraphs-package-0.0.55.tgz
2013-02-20 22:23:29 ZooKeeper [INFO] Initiating client connection, connectString=eat1-app309.corp.linkedin.com:12913,eat1-app310.corp.linkedin.com:12913,eat1-app311.corp.linkedin.com:12913,eat1-app312.corp.linkedin.com:12913,eat1-app313.corp.linkedin.com:12913/kafka-samsa sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@72e8a021
2013-02-20 22:23:30 ClientCnxn [INFO] Opening socket connection to server eat1-app312.corp.linkedin.com/172.20.72.74:12913
2013-02-20 22:23:30 ClientCnxn [INFO] Socket connection established to eat1-app312.corp.linkedin.com/172.20.72.74:12913, initiating session
2013-02-20 22:23:30 ClientCnxn [INFO] Session establishment complete on server eat1-app312.corp.linkedin.com/172.20.72.74:12913, sessionid = 0x43afd073e258b06, negotiated timeout = 30000
2013-02-20 22:23:30 ZkClient [INFO] zookeeper state changed (SyncConnected)
2013-02-20 22:23:30 ZkEventThread [INFO] Terminate ZkClient event thread.
2013-02-20 22:23:30 ZooKeeper [INFO] Session: 0x43afd073e258b06 closed
2013-02-20 22:23:30 ClientCnxn [INFO] EventThread shut down

Once that's done, the KafkaCheckpointManagerFactory will create a producer that will be used whenever writeCheckpoint is called on the KafkaCheckpointManager.

2013-02-20 22:23:30 KafkaCheckpointManagerFactory [INFO] Creating checkpoint producer with config: {key.serializer.class=kafka.serializer.NullEncoder, zk.connect=eat1-app309.corp.linkedin.com:12913,eat1-app310.corp.linkedin.com:12913,eat1-app311.corp.linkedin.com:12913,eat1-app312.corp.linkedin.com:12913,eat1-app313.corp.linkedin.com:12913/kafka-samsa, serializer.class=samsa.task.state.KafkaCheckpointEncoder, producer.retry.backoff.ms=10000, producer.request.required.acks=-1, broker.list=eat1-qa464.corp.linkedin.com:10251,eat1-qa465.corp.linkedin.com:10251,eat1-qa466.corp.linkedin.com:10251,eat1-qa467.corp.linkedin.com:10251,eat1-qa468.corp.linkedin.com:10251, zk.connectiontimeout.ms=1000000, partitioner.class=samsa.task.state.KafkaCheckpointPartitioner, max.message.size=1300000, producer.type=sync, producer.num.retries=3, queue.enqueueTimeout.ms=-1, producer.request.timeout.ms=5000}

After the producer is created, the KafkaCheckpointMAnager is fully instantiated, and lets you know that it's being created.

2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Creating KafkaCheckpointManager for page-key-groups-aggregator-eat1-v3 with job id 1 and total partitions 12

The TaskRunner then starts calling readCheckpoint for every partition. On the first readCheckpoint call, the KafkaCheckpointManager does some sanity checks to make sure that the checkpoint stream (_samsacheckpointpage-key-groups-aggregator-eat1-v31, in this case) exists, and is properly partitioned. To do this check, it again uses ZooKeeper to fetch topic metadata. This time, instead of fetching topic metadata about the input streams, it fetches the metadata for the checkpoint topic.

2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Checking whether topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 exists
2013-02-20 22:23:30 ZooKeeper [INFO] Initiating client connection, connectString=eat1-app309.corp.linkedin.com:12913,eat1-app310.corp.linkedin.com:12913,eat1-app311.corp.linkedin.com:12913,eat1-app312.corp.linkedin.com:12913,eat1-app313.corp.linkedin.com:12913/kafka-samsa sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5d15126e
2013-02-20 22:23:30 ZkEventThread [INFO] Starting ZkClient event thread.
2013-02-20 22:23:30 ClientCnxn [INFO] Opening socket connection to server eat1-app311.corp.linkedin.com/172.20.72.75:12913
2013-02-20 22:23:30 ClientCnxn [INFO] Socket connection established to eat1-app311.corp.linkedin.com/172.20.72.75:12913, initiating session
2013-02-20 22:23:30 ClientCnxn [INFO] Session establishment complete on server eat1-app311.corp.linkedin.com/172.20.72.75:12913, sessionid = 0x33ae15b0a668770, negotiated timeout = 6000
2013-02-20 22:23:30 ZkClient [INFO] zookeeper state changed (SyncConnected)
2013-02-20 22:23:30 ZkEventThread [INFO] Terminate ZkClient event thread.
2013-02-20 22:23:30 ZooKeeper [INFO] Session: 0x33ae15b0a668770 closed
2013-02-20 22:23:30 ClientCnxn [INFO] EventThread shut down

The topic exists, as expected.

2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Found topic metadata for \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1. Checking for partition 11.

The KafkaCheckpointManager then verifies that the total number of partitions for the checkpoint topic is equal to the total number of partitions for the job. Since this is the case, no error occurs. After the verification, the KafkaCheckpointManager tries to pick a leader from the topic metadata. The leader is the broker that will be used to read the last checkpoint message for this partition.

2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got leader eat1-qa465.corp.linkedin.com:10251 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 11

Once a leader is picked for this partition, the leader is asked what the offset is for the last message in the topic/partition.

2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got offset 7226 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 11

The KafkaCheckpointManager uses this offset to fetch the last message in the topic.

2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got checkpoint state for page-key-groups-aggregator-eat1-v3:1:11: Checkpoint [offsets={page-view-event-by-group=26692009}]

This message contains all input stream offsets for this partition. For example, if you're reading from topic A and topic B, and the TaskRunner called readLastCheckpoint for partition 3, the checkpoint contains the offsets for topic A's partition 3, and topic B's partition 3.

The KafkaCheckpointManager then goes on to do the same offset fetching and checkpoint reading for every partition that it's in charge of.

2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Checking whether topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 exists
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Found topic metadata for \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1. Checking for partition 6.
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got leader eat1-qa465.corp.linkedin.com:10251 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 6
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got offset 7227 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 6
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got checkpoint state for page-key-groups-aggregator-eat1-v3:1:6: Checkpoint [offsets={page-view-event-by-group=160927954}]
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Checking whether topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 exists
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Found topic metadata for \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1. Checking for partition 7.
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got leader eat1-qa468.corp.linkedin.com:10251 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 7
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got offset 7225 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 7
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got checkpoint state for page-key-groups-aggregator-eat1-v3:1:7: Checkpoint [offsets={page-view-event-by-group=5994131}]
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Checking whether topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 exists
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Found topic metadata for \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1. Checking for partition 2.
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got leader eat1-qa468.corp.linkedin.com:10251 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 2
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got offset 7225 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 2
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got checkpoint state for page-key-groups-aggregator-eat1-v3:1:2: Checkpoint [offsets={page-view-event-by-group=491284280}]
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Checking whether topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 exists
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Found topic metadata for \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1. Checking for partition 10.
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got leader eat1-qa466.corp.linkedin.com:10251 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 10
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got offset 7225 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 10
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got checkpoint state for page-key-groups-aggregator-eat1-v3:1:10: Checkpoint [offsets={page-view-event-by-group=648585899}]
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Checking whether topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 exists
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Found topic metadata for \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1. Checking for partition 1.
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got leader eat1-qa468.corp.linkedin.com:10251 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 1
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got offset 7226 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 1
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got checkpoint state for page-key-groups-aggregator-eat1-v3:1:1: Checkpoint [offsets={page-view-event-by-group=102346102}]
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Checking whether topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 exists
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Found topic metadata for \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1. Checking for partition 3.
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got leader eat1-qa465.corp.linkedin.com:10251 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 3
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got offset 7225 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 3
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got checkpoint state for page-key-groups-aggregator-eat1-v3:1:3: Checkpoint [offsets={page-view-event-by-group=218117506}]
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Checking whether topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 exists
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Found topic metadata for \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1. Checking for partition 8.
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got leader eat1-qa466.corp.linkedin.com:10251 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 8
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got offset 7225 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 8
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got checkpoint state for page-key-groups-aggregator-eat1-v3:1:8: Checkpoint [offsets={page-view-event-by-group=437775431}]
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Checking whether topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 exists
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Found topic metadata for \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1. Checking for partition 4.
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got leader eat1-qa465.corp.linkedin.com:10251 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 4
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got offset 7225 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 4
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got checkpoint state for page-key-groups-aggregator-eat1-v3:1:4: Checkpoint [offsets={page-view-event-by-group=69782630}]
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Checking whether topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 exists
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Found topic metadata for \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1. Checking for partition 0.
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got leader eat1-qa468.corp.linkedin.com:10251 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 0
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got offset 7227 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 0
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got checkpoint state for page-key-groups-aggregator-eat1-v3:1:0: Checkpoint [offsets={page-view-event-by-group=8578798}]
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Checking whether topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 exists
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Found topic metadata for \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1. Checking for partition 9.
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got leader eat1-qa468.corp.linkedin.com:10251 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 9
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got offset 7226 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 9
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got checkpoint state for page-key-groups-aggregator-eat1-v3:1:9: Checkpoint [offsets={page-view-event-by-group=46825511}]
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Checking whether topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 exists
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Found topic metadata for \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1. Checking for partition 5.
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got leader eat1-qa468.corp.linkedin.com:10251 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 5
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got offset 7226 for topic \__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1 and partition 5
2013-02-20 22:23:30 KafkaCheckpointManager [INFO] Got checkpoint state for page-key-groups-aggregator-eat1-v3:1:5: Checkpoint [offsets={page-view-event-by-group=29469558}]

In a case where the KafkaCheckpointManager finds that the state topic doesn't exist, or that a specific partition for a state topic doesn't exist, it will use a null offset, which will tell StreamConsumerFactories that no offset is defined. In the KafkaConsumerFactory case, it will fall back to the autooffset.reset, if one exists. If the setting is not defined, the consumer will fail.

Once all checkpoints have been bootstrapped, the TaskRunner will print the results.

2013-02-20 22:23:30 JavaTaskRunner [INFO] Restored offsets: Map(StreamPartition [partition=Partition [partition=8], name=page-view-event-by-group, system=kafka,
stream=PageViewEventByGroupJson, serde=json] -> 437775431, StreamPartition [partition=Partition [partition=11], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] -> 26692009, StreamPartition [partition=Partition [partition=5], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] -> 29469558, StreamPartition [partition=Partition [partition=7], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] -> 5994131, StreamPartition [partition=Partition [partition=0], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] -> 8578798, StreamPartition [partition=Partition [partition=4], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] -> 69782630, StreamPartition [partition=Partition [partition=1], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] -> 102346102, StreamPartition [partition=Partition [partition=9], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] -> 46825511, StreamPartition [partition=Partition [partition=6], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] -> 160927954, StreamPartition [partition=Partition [partition=3], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] -> 218117506, StreamPartition [partition=Partition [partition=2], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] -> 491284280, StreamPartition [partition=Partition [partition=10], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] -> 648585899)

Now, the TaskRunner will use these offsets to create StreamConsumers (one stream consumer for every input stream/partition pair).

2013-02-20 22:23:30 JavaTaskRunner [INFO] Adding a watcher for each input stream partition that adds to envelope queue

The TaskRunner starts by calling getConsumer on each StreamConsumerFactory for each input stream.

2013-02-20 22:23:30 SystemConfig [INFO] Creating new consumer factory samsa.stream.kafka.KafkaConsumerFactory

When getConsumer is called, the KafkaConsumerFactory will make a ZooKeeper connection to fetch topic metadata for the topic and partition that's given to it.

At this point, the ordering of your log's output might look slightly different. This is just because each StreamConsumer is instantiated in its own thread, and therefore the logging gets interleaved differently every time.

2013-02-20 22:23:30 ZooKeeper [INFO] Initiating client connection, connectString=eat1-app309.corp.linkedin.com:12913,eat1-app310.corp.linkedin.com:12913,eat1-app311.corp.linkedin.com:12913,eat1-app312.corp.linkedin.com:12913,eat1-app313.corp.linkedin.com:12913/kafka-samsa sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@52cd19d
2013-02-20 22:23:30 ZkEventThread [INFO] Starting ZkClient event thread.
2013-02-20 22:23:30 ClientCnxn [INFO] Opening socket connection to server eat1-app312.corp.linkedin.com/172.20.72.74:12913
2013-02-20 22:23:30 ClientCnxn [INFO] Socket connection established to eat1-app312.corp.linkedin.com/172.20.72.74:12913, initiating session
2013-02-20 22:23:30 ClientCnxn [INFO] Session establishment complete on server eat1-app312.corp.linkedin.com/172.20.72.74:12913, sessionid = 0x43afd073e258b07, negotiated timeout = 6000
2013-02-20 22:23:30 ZkClient [INFO] zookeeper state changed (SyncConnected)
2013-02-20 22:23:30 ZkEventThread [INFO] Terminate ZkClient event thread.
2013-02-20 22:23:30 ZooKeeper [INFO] Session: 0x43afd073e258b07 closed
2013-02-20 22:23:30 ClientCnxn [INFO] EventThread shut down

Each KafkaConsumerFactory will use the topic metadata to find a leader (broker) for the partition.

2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Got leader eat1-qa468.corp.linkedin.com:10251 for topic PageViewEventByGroupJson with partition 5
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Got leader eat1-qa465.corp.linkedin.com:10251 for topic PageViewEventByGroupJson with partition 6
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Got leader eat1-qa465.corp.linkedin.com:10251 for topic PageViewEventByGroupJson with partition 4
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Got leader eat1-qa466.corp.linkedin.com:10251 for topic PageViewEventByGroupJson with partition 10
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Got leader eat1-qa465.corp.linkedin.com:10251 for topic PageViewEventByGroupJson with partition 11
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Got leader eat1-qa468.corp.linkedin.com:10251 for topic PageViewEventByGroupJson with partition 1
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Got leader eat1-qa468.corp.linkedin.com:10251 for topic PageViewEventByGroupJson with partition 2
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connecting to eat1-qa468.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=1], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Got leader eat1-qa465.corp.linkedin.com:10251 for topic PageViewEventByGroupJson with partition 3
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connecting to eat1-qa468.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=2], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connecting to eat1-qa465.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=3], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connecting to eat1-qa465.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=11], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connecting to eat1-qa466.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=10], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connecting to eat1-qa465.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=4], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connecting to eat1-qa465.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=6], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Got leader eat1-qa468.corp.linkedin.com:10251 for topic PageViewEventByGroupJson with partition 0
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connecting to eat1-qa468.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=5], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Got leader eat1-qa468.corp.linkedin.com:10251 for topic PageViewEventByGroupJson with partition 7
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Got leader eat1-qa466.corp.linkedin.com:10251 for topic PageViewEventByGroupJson with partition 8
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Got leader eat1-qa468.corp.linkedin.com:10251 for topic PageViewEventByGroupJson with partition 9
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connecting to eat1-qa468.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=7], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connecting to eat1-qa468.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=0], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connecting to eat1-qa466.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=8], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connecting to eat1-qa468.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=9], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connected to eat1-qa465.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=3], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connected to eat1-qa465.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=11], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connected to eat1-qa466.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=10], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connected to eat1-qa465.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=6], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connected to eat1-qa465.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=4], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].

Once the KafkaConsumerFactory has gotten a lead for a partition, and connected to the broker, it will start validating the offsets that were given to it (the ones that KafkaCheckpointManager returned). If the offset is valid, the KafkaConsumerFactory will be able to read a message from it. If the offset is not valid, the KafkaConsumerFactory will try and use the autooffset.reset setting to reset to either the head or tail of the queue. If no autooffset.reset is supplied, a SamsaException will be thrown.

2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Validating offset 69782630 for topic PageViewEventByGroupJson and partition 4.
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Validating offset 648585899 for topic PageViewEventByGroupJson and partition 10.
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Validating offset 160927954 for topic PageViewEventByGroupJson and partition 6.
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Validating offset 26692009 for topic PageViewEventByGroupJson and partition 11.
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connected to eat1-qa468.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=7], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Validating offset 218117506 for topic PageViewEventByGroupJson and partition 3.
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Validating offset 5994131 for topic PageViewEventByGroupJson and partition 7.
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connected to eat1-qa468.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=1], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Validating offset 102346102 for topic PageViewEventByGroupJson and partition 1.
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connected to eat1-qa468.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=5], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Validating offset 29469558 for topic PageViewEventByGroupJson and partition 5.
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connected to eat1-qa468.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=0], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Validating offset 8578798 for topic PageViewEventByGroupJson and partition 0.
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connected to eat1-qa468.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=9], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Validating offset 46825511 for topic PageViewEventByGroupJson and partition 9.
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connected to eat1-qa466.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=8], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Validating offset 437775431 for topic PageViewEventByGroupJson and partition 8.
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Connected to eat1-qa468.corp.linkedin.com:10251 for StreamPartition [partition=Partition [partition=2], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json].
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Validating offset 491284280 for topic PageViewEventByGroupJson and partition 2.
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Able to successfully read from offset 5994131 for topic PageViewEventByGroupJson and partition 7. Using it to instantiate a Kafka consumer.
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Able to successfully read from offset 648585899 for topic PageViewEventByGroupJson and partition 10. Using it to instantiate a Kafka consumer.
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Able to successfully read from offset 26692009 for topic PageViewEventByGroupJson and partition 11. Using it to instantiate a Kafka consumer.
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Able to successfully read from offset 437775431 for topic PageViewEventByGroupJson and partition 8. Using it to instantiate a Kafka consumer.
2013-02-20 22:23:30 KafkaConsumer [INFO] Creating KafkaConsumer for StreamPartition [partition=Partition [partition=11], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] with start offset: 26692009
2013-02-20 22:23:30 KafkaConsumer [INFO] Creating KafkaConsumer for StreamPartition [partition=Partition [partition=8], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] with start offset: 437775431
2013-02-20 22:23:30 KafkaConsumer [INFO] Creating KafkaConsumer for StreamPartition [partition=Partition [partition=7], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] with start offset: 5994131
2013-02-20 22:23:30 KafkaConsumer [INFO] Creating KafkaConsumer for StreamPartition [partition=Partition [partition=10], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] with start offset: 648585899
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Able to successfully read from offset 69782630 for topic PageViewEventByGroupJson and partition 4. Using it to instantiate a Kafka consumer.
2013-02-20 22:23:30 KafkaConsumer [INFO] Creating KafkaConsumer for StreamPartition [partition=Partition [partition=4], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] with start offset: 69782630
2013-02-20 22:23:30 KafkaConsumerFactory [INFO] Able to successfully read from offset 29469558 for topic PageViewEventByGroupJson and partition 5. Using it to instantiate a Kafka consumer.
2013-02-20 22:23:30 KafkaConsumer [INFO] Creating KafkaConsumer for StreamPartition [partition=Partition [partition=5], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] with start offset: 29469558
2013-02-20 22:23:31 KafkaConsumerFactory [INFO] Able to successfully read from offset 8578798 for topic PageViewEventByGroupJson and partition 0. Using it to instantiate a Kafka consumer.
2013-02-20 22:23:31 KafkaConsumer [INFO] Creating KafkaConsumer for StreamPartition [partition=Partition [partition=0], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] with start offset: 8578798
2013-02-20 22:23:31 KafkaConsumerFactory [INFO] Able to successfully read from offset 46825511 for topic PageViewEventByGroupJson and partition 9. Using it to instantiate a Kafka consumer.
2013-02-20 22:23:31 KafkaConsumer [INFO] Creating KafkaConsumer for StreamPartition [partition=Partition [partition=9], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] with start offset: 46825511
2013-02-20 22:23:31 KafkaConsumerFactory [INFO] Able to successfully read from offset 160927954 for topic PageViewEventByGroupJson and partition 6. Using it to instantiate a Kafka consumer.
2013-02-20 22:23:31 KafkaConsumer [INFO] Creating KafkaConsumer for StreamPartition [partition=Partition [partition=6], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] with start offset: 160927954
2013-02-20 22:23:31 KafkaConsumerFactory [INFO] Able to successfully read from offset 218117506 for topic PageViewEventByGroupJson and partition 3. Using it to instantiate a Kafka consumer.
2013-02-20 22:23:31 KafkaConsumer [INFO] Creating KafkaConsumer for StreamPartition [partition=Partition [partition=3], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] with start offset: 218117506
2013-02-20 22:23:31 KafkaConsumerFactory [INFO] Able to successfully read from offset 102346102 for topic PageViewEventByGroupJson and partition 1. Using it to instantiate a Kafka consumer.
2013-02-20 22:23:31 KafkaConsumer [INFO] Creating KafkaConsumer for StreamPartition [partition=Partition [partition=1], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] with start offset: 102346102
2013-02-20 22:23:31 KafkaConsumerFactory [INFO] Able to successfully read from offset 491284280 for topic PageViewEventByGroupJson and partition 2. Using it to instantiate a Kafka consumer.
2013-02-20 22:23:31 KafkaConsumer [INFO] Creating KafkaConsumer for StreamPartition [partition=Partition [partition=2], name=page-view-event-by-group, system=kafka, stream=PageViewEventByGroupJson, serde=json] with start offset: 491284280
As you can see above, after the KafkaConsumerFactory validates the offsets successfully, it creates a KafkaConsumer that can be used to read messages from the topic/partition. The TaskRunner uses these consumers to start reading messages.

Next, the TaskRunner sets up the metrics reporters.

2013-02-20 22:23:30 JavaTaskRunner [INFO] Setting up metrics
2013-02-20 22:23:30 AmfReporterFactory [INFO] creating new amf metrics reporter
2013-02-20 22:23:30 AmfClient [INFO] Creating connection manager for URL: http://eat1-amf-vip-z.corp.linkedin.com/api/v1
2013-02-20 22:23:30 AmfClient [INFO] Created connection manager for URL: http://eat1-amf-vip-z.corp.linkedin.com/api/v1
2013-02-20 22:23:30 AmfReporterFactory [INFO] got amf reporter properties [jobName: page-key-groups-aggregator-eat1-v3, taskName: container_1360690887210_0032_01_000002, jobId: 1, host: eat1-qa466.corp.linkedin.com]
2013-02-20 22:23:30 MetricsSnapshotReporterFactory [INFO] creating new metrics snapshot reporter
2013-02-20 22:23:30 MetricsSnapshotReporterFactory [WARN] unable to find implementation version in jar's meta info. defaulting to 0.0.1
2013-02-20 22:23:30 MetricsSnapshotReporterFactory [WARN] unable to find implementation samsa version in jar's meta info. defaulting to 0.0.1
2013-02-20 22:23:30 MetricsSnapshotReporter [INFO] got metrics snapshot reporter properties [job name: page-key-groups-aggregator-eat1-v3, job id: 1, taskName: container_1360690887210_0032_01_000002, version: 0.0.1, host: eat1-qa466.corp.linkedin.com]

At LinkedIn, we have InGraphs. Samsa uses AMF to send metrics to InGraphs. You can see, above, that we're setting up an InGraph reporter. The second reporter, MetricsSnapshotReporter is Samsa's own internal metrics reporter. This is what the Samsa dashboard uses to get metrics about your job. They both report the same metrics, just to different places.

Like StreamConsumers, the metrics reporters are set up on their own thread. This is to allow us to send metrics, even if your StreamTask blocks the TaskRunner's main event loop for a long period of time.

Once the metrics reporters have been setup, the TaskRunner sets up "committing", which is responsible for calling writeCheckpoint periodically.

2013-02-20 22:23:30 JavaTaskRunner [INFO] Setting up committing

After committing, the TaskRunner sets up counters for windowing, if your task uses it. Recall that windowing is the ability to have the TaskRunner call window() on your task ever N milliseconds (defined in configuration), if your task implements WindowableTask.

2013-02-20 22:23:30 JavaTaskRunner [INFO] Setting up windowing

The TaskRunner then creates an instance of your StreamTask for every partition that it's in charge of. For example, if the TaskRunner is responsible for three partitions, it will create three instances of your StreamTask (one for every partition).

2013-02-20 22:23:30 JavaTaskRunner [INFO] Initializing tasks

Finally, the "process" loop is started. This is the event loop that's responsible for taking messages from the incoming message queue, and feeding them to the appropriate StreamTask (based on the partition that the message came from).

2013-02-20 22:23:30 JavaTaskRunner [INFO] Starting process loop

The first time a stream is written to, the TaskRunner's ProducerMultiplexer class will create a new StreamProducer. In this case, the MetricsSnapshotReporter writes to a stream named samsa-metrics.

2013-02-20 22:23:30 ProducerMultiplexer [INFO] Creating a new producer multiplexer stream for output stream Stream [name=samsa-metrics, system=kafka, stream=SamsaMetrics, serde=metrics-snapshot].
2013-02-20 22:23:30 SystemConfig [INFO] Creating new producer factory samsa.stream.kafka.KafkaProducerFactory
2013-02-20 22:23:30 ProducerMultiplexerStream [INFO] Creating a new producer for task partition Partition [partition=11] and stream Stream [name=samsa-metrics, system=kafka, stream=SamsaMetrics, serde=metrics-snapshot].

As with the KafkaCheckpointManagerFactory, the KafkaStreamProducerFactory instantiates a ProducerProperties object, which verifies the configuration. Again, the WARNs can be disregarded.

2013-02-20 22:23:30 KafkaProducerFactory [INFO] Creating producer for Stream [name=samsa-metrics, system=kafka, stream=SamsaMetrics, serde=metrics-snapshot] with config {rebalance.retries.max=100, samsa.consumer.factory=samsa.stream.kafka.KafkaConsumerFactory, zk.connect=eat1-app309.corp.linkedin.com:12913,eat1-app310.corp.linkedin.com:12913,eat1-app311.corp.linkedin.com:12913,eat1-app312.corp.linkedin.com:12913,eat1-app313.corp.linkedin.com:12913/kafka-samsa, queuedchunks.max=2, producer.retry.backoff.ms=750, zk.connectiontimeout.ms=1000000, broker.list=eat1-qa464.corp.linkedin.com:10251,eat1-qa465.corp.linkedin.com:10251,eat1-qa466.corp.linkedin.com:10251,eat1-qa467.corp.linkedin.com:10251,eat1-qa468.corp.linkedin.com:10251, fetch.size=1200000, samsa.partition.manager=samsa.stream.kafka.KafkaPartitionManager, autocommit.interval.ms=60000, max.message.size=1300000, samsa.producer.factory=samsa.stream.kafka.KafkaProducerFactory, producer.type=async, autooffset.reset=largest, producer.num.retries=2, queue.enqueueTimeout.ms=-1, producer.request.timeout.ms=5000}
2013-02-20 22:23:30 VerifiableProperties [INFO] Verifying properties
2013-02-20 22:23:30 VerifiableProperties [WARN] Property rebalance.retries.max is not valid
2013-02-20 22:23:30 VerifiableProperties [WARN] Property samsa.consumer.factory is not valid
2013-02-20 22:23:30 VerifiableProperties [WARN] Property zk.connect is not valid
2013-02-20 22:23:30 VerifiableProperties [WARN] Property queuedchunks.max is not valid
2013-02-20 22:23:30 VerifiableProperties [INFO] Property producer.retry.backoff.ms is overridden to 750
2013-02-20 22:23:30 VerifiableProperties [WARN] Property zk.connectiontimeout.ms is not valid
2013-02-20 22:23:30 VerifiableProperties [INFO] Property broker.list is overridden to eat1-qa464.corp.linkedin.com:10251,eat1-qa465.corp.linkedin.com:10251,eat1-qa466.corp.linkedin.com:10251,eat1-qa467.corp.linkedin.com:10251,eat1-qa468.corp.linkedin.com:10251
2013-02-20 22:23:30 VerifiableProperties [WARN] Property fetch.size is not valid
2013-02-20 22:23:30 VerifiableProperties [INFO] Property max.message.size is overridden to 1300000
2013-02-20 22:23:30 VerifiableProperties [WARN] Property autocommit.interval.ms is not valid
2013-02-20 22:23:30 VerifiableProperties [WARN] Property samsa.partition.manager is not valid
2013-02-20 22:23:30 VerifiableProperties [WARN] Property samsa.producer.factory is not valid
2013-02-20 22:23:30 VerifiableProperties [INFO] Property producer.type is overridden to async
2013-02-20 22:23:30 VerifiableProperties [INFO] Property producer.num.retries is overridden to 2
2013-02-20 22:23:30 VerifiableProperties [WARN] Property autooffset.reset is not valid
2013-02-20 22:23:30 VerifiableProperties [INFO] Property queue.enqueueTimeout.ms is overridden to -1
2013-02-20 22:23:30 VerifiableProperties [INFO] Property producer.request.timeout.ms is overridden to 5000

The KafkaProducer will fetch the topic metadata for the SamsaMetrics, so it knows which brokers to produce to. Unlike previous topic metadata fetches, it uses Kafka's FetchTopicMetadataRequest, instead of using ZooKeeper.

2013-02-20 22:23:35 ClientUtils$ [INFO] Fetching metadata for topic Set(SamsaMetrics)

After that, the producers connect to their brokers.

2013-02-20 22:23:35 SyncProducer [INFO] Connected to eat1-qa464.corp.linkedin.com:10251 for producing
2013-02-20 22:23:36 SyncProducer [INFO] Disconnecting from eat1-qa464.corp.linkedin.com:10251
2013-02-20 22:23:36 SyncProducer [INFO] Connected to eat1-qa465.corp.linkedin.com:10251 for producing
2013-02-20 22:23:36 SyncProducer [INFO] Connected to eat1-qa466.corp.linkedin.com:10251 for producing
2013-02-20 22:23:36 SyncProducer [INFO] Connected to eat1-qa468.corp.linkedin.com:10251 for producing

60 seconds pass. It's now time to commit our checkpoints. The first time writeCheckpoint is called on the KafkaCheckpointManager, it will do some quick validation to make the topic and partition exist, as expected. If they don't, an exception is thrown.

2013-02-20 22:24:30 ZooKeeper [INFO] Initiating client connection, connectString=eat1-app309.corp.linkedin.com:12913,eat1-app310.corp.linkedin.com:12913,eat1-app311.corp.linkedin.com:12913,eat1-app312.corp.linkedin.com:12913,eat1-app313.corp.linkedin.com:12913/kafka-samsa sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@9856ec1
2013-02-20 22:24:30 ZkEventThread [INFO] Starting ZkClient event thread.
2013-02-20 22:24:30 ClientCnxn [INFO] Opening socket connection to server eat1-app311.corp.linkedin.com/172.20.72.75:12913
2013-02-20 22:24:30 ClientCnxn [INFO] Socket connection established to eat1-app311.corp.linkedin.com/172.20.72.75:12913, initiating session
2013-02-20 22:24:30 ClientCnxn [INFO] Session establishment complete on server eat1-app311.corp.linkedin.com/172.20.72.75:12913, sessionid = 0x33ae15b0a668771, negotiated timeout = 6000
2013-02-20 22:24:30 ZkClient [INFO] zookeeper state changed (SyncConnected)
2013-02-20 22:24:30 ZkEventThread [INFO] Terminate ZkClient event thread.
2013-02-20 22:24:30 ZooKeeper [INFO] Session: 0x33ae15b0a668771 closed
2013-02-20 22:24:30 ClientCnxn [INFO] EventThread shut down

Once the KafkaCheckpointManager is convinced that the topic and partition exists for each partition that the TaskRunner is responsible for, it starts up the producers that send the checkpoints.

2013-02-20 22:24:30 ClientUtils$ [INFO] Fetching metadata for topic Set(\__samsa_checkpoint_page-key-groups-aggregator-eat1-v3_1)
2013-02-20 22:24:30 SyncProducer [INFO] Connected to eat1-qa464.corp.linkedin.com:10251 for producing
2013-02-20 22:24:31 SyncProducer [INFO] Disconnecting from eat1-qa464.corp.linkedin.com:10251
2013-02-20 22:24:31 SyncProducer [INFO] Connected to eat1-qa465.corp.linkedin.com:10251 for producing
2013-02-20 22:24:31 SyncProducer [INFO] Connected to eat1-qa466.corp.linkedin.com:10251 for producing
2013-02-20 22:24:31 SyncProducer [INFO] Connected to eat1-qa468.corp.linkedin.com:10251 for producing

From here on out, the logs should be quiet. If a Kafka broker fails at some point, you'll probably see an exception logged, but there's no need to worry. Samsa will wait for a while, and then retry the read or write that failed. It'll do this until the operation succeeds.