Version: 0.10.2

Message Passing Implementation

(Note: this walkthrough is out of date as of 0.8.0. 0.8.0 revamped the message passing infrastructure to be based on the Disruptor)

This page walks through how emitting and transferring tuples works in Storm.

  • Worker is responsible for message transfer
    • refresh-connections is called every "task.refresh.poll.secs" or whenever assignment in ZK changes. It manages connections to other workers and maintains a mapping from task -> worker code
    • Provides a "transfer function" that is used by tasks to send tuples to other tasks. The transfer function takes in a task id and a tuple, and it serializes the tuple and puts it onto a "transfer queue". There is a single transfer queue for each worker. code
    • The serializer is thread-safe code
    • The worker has a single thread which drains the transfer queue and sends the messages to other workers code
    • Message sending happens through this protocol: code
    • The implementation for distributed mode uses ZeroMQ code
    • The implementation for local mode uses in memory Java queues (so that it's easy to use Storm locally without needing to get ZeroMQ installed) code
  • Receiving messages in tasks works differently in local mode and distributed mode
    • In local mode, the tuple is sent directly to an in-memory queue for the receiving task code
    • In distributed mode, each worker listens on a single TCP port for incoming messages and then routes those messages in-memory to tasks. The TCP port is called a "virtual port", because it receives [task id, message] and then routes it to the actual task. code
      • The virtual port implementation is here: code
      • Tasks listen on an in-memory ZeroMQ port for messages from the virtual port code
      • Bolts listen here: code
      • Spouts listen here: code
  • Tasks are responsible for message routing. A tuple is emitted either to a direct stream (where the task id is specified) or a regular stream. In direct streams, the message is only sent if that bolt subscribes to that direct stream. In regular streams, the stream grouping functions are used to determine the task ids to send the tuple to.
    • Tasks have a routing map from {stream id} -> {component id} -> {stream grouping function} code
    • The "tasks-fn" returns the task ids to send the tuples to for either regular stream emit or direct stream emit code
    • After getting the output task ids, bolts and spouts use the transfer-fn provided by the worker to actually transfer the tuples
      • Bolt transfer code here: code
      • Spout transfer code here: code