public class TopologyBuilder extends Object
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("1", new TestWordSpout(true), 5); builder.setSpout("2", new TestWordSpout(true), 3); builder.setBolt("3", new TestWordCounter(), 3) .fieldsGrouping("1", new Fields("word")) .fieldsGrouping("2", new Fields("word")); builder.setBolt("4", new TestGlobalCount()) .globalGrouping("1"); Map conf = new HashMap(); conf.put(Config.TOPOLOGY_WORKERS, 4); StormSubmitter.submitTopology("mytopology", conf, builder.createTopology());Running the exact same topology in local mode (in process), and configuring it to log all tuples emitted, looks like the following. Note that it lets the topology run for 10 seconds before shutting down the local cluster.
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("1", new TestWordSpout(true), 5); builder.setSpout("2", new TestWordSpout(true), 3); builder.setBolt("3", new TestWordCounter(), 3) .fieldsGrouping("1", new Fields("word")) .fieldsGrouping("2", new Fields("word")); builder.setBolt("4", new TestGlobalCount()) .globalGrouping("1"); Map conf = new HashMap(); conf.put(Config.TOPOLOGY_WORKERS, 4); conf.put(Config.TOPOLOGY_DEBUG, true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("mytopology", conf, builder.createTopology()); Utils.sleep(10000); cluster.shutdown();
The pattern for TopologyBuilder is to map component ids to components using the setSpout and setBolt methods. Those methods return objects that are then used to declare the inputs for that component.
Modifier and Type | Class and Description |
---|---|
protected class |
TopologyBuilder.BoltGetter |
protected class |
TopologyBuilder.ConfigGetter<T extends ComponentConfigurationDeclarer> |
protected class |
TopologyBuilder.SpoutGetter |
Constructor and Description |
---|
TopologyBuilder() |
Modifier and Type | Method and Description |
---|---|
StormTopology |
createTopology() |
BoltDeclarer |
setBolt(String id,
IBasicBolt bolt)
Define a new bolt in this topology.
|
BoltDeclarer |
setBolt(String id,
IBasicBolt bolt,
Number parallelism_hint)
Define a new bolt in this topology.
|
BoltDeclarer |
setBolt(String id,
IRichBolt bolt)
Define a new bolt in this topology with parallelism of just one thread.
|
BoltDeclarer |
setBolt(String id,
IRichBolt bolt,
Number parallelism_hint)
Define a new bolt in this topology with the specified amount of parallelism.
|
SpoutDeclarer |
setSpout(String id,
IRichSpout spout)
Define a new spout in this topology.
|
SpoutDeclarer |
setSpout(String id,
IRichSpout spout,
Number parallelism_hint)
Define a new spout in this topology with the specified parallelism.
|
void |
setStateSpout(String id,
IRichStateSpout stateSpout) |
void |
setStateSpout(String id,
IRichStateSpout stateSpout,
Number parallelism_hint) |
public StormTopology createTopology()
public BoltDeclarer setBolt(String id, IRichBolt bolt)
id
- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.bolt
- the boltpublic BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint)
id
- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.bolt
- the boltparallelism_hint
- the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.public BoltDeclarer setBolt(String id, IBasicBolt bolt)
id
- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.bolt
- the basic boltpublic BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint)
id
- the id of this component. This id is referenced by other components that want to consume this bolt's outputs.bolt
- the basic boltparallelism_hint
- the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.public SpoutDeclarer setSpout(String id, IRichSpout spout)
id
- the id of this component. This id is referenced by other components that want to consume this spout's outputs.spout
- the spoutpublic SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint)
id
- the id of this component. This id is referenced by other components that want to consume this spout's outputs.parallelism_hint
- the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somwehere around the cluster.spout
- the spoutpublic void setStateSpout(String id, IRichStateSpout stateSpout)
public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallelism_hint)
Copyright © 2016 The Apache Software Foundation. All Rights Reserved.