Exploring how events are dispatched to, from and within S4 nodes
Events are dispatched according to their key.
The key is identified in an Event
through a KeyFinder
.
Dispatch can be configured for:
A stream can be defined with a KeyFinder, as :
Stream<TopicEvent> topicSeenStream = createStream("TopicSeen", new KeyFinder<TopicEvent>() {
@Override
public List<String> get(final TopicEvent arg0) {
return ImmutableList.of(arg0.getTopic());
}
}, topicCountAndReportPE);
When an event is sent to the “TopicSeen” stream, its key will be identified through the KeyFinder implementation, hashed and dispatched to the matching partition.
The same logic applies when defining output streams.
If we use an AdapterApp subclass, the remoteStreamKeyFinder
should be defined in the onInit()
method, before calling super.onInit()
:
@Override
protected void onInit() {
...
remoteStreamKeyFinder = new KeyFinder<Event>() {
@Override
public List<String> get(Event event) {
return ImmutableList.of(event.get("theKeyField"));
}
};
super.onInit()
...
If we use a standard App, we use the createOutputStream(String name, KeyFinder<Event> keyFinder)
method.
If the KeyFinder is not defined for the output streams, events are sent to partitions of the connected cluster in a round robin fashion.
When receiving events from a remote application, we must define how external events are dispatched internally, to which PEs and based on which keys. For that purpose, we simply define and input stream with the corresponding KeyFinder:
createInputStream("names", new KeyFinder<Event>() {
@Override
public List<String> get(Event event) {
return Arrays.asList(new String[] { event.get("name") });
}
}, helloPE);
In this case, a name is extracted from each event, the PE instance with this key is retrieved or created, and the event sent to that instance.
Alternatively, we can use a unique PE instance for processing events in a given node. For that we simply define the input stream without a KeyFinder, and use a singleton PE:
HelloPE helloPE = createPE(HelloPE.class);
helloPE.setSingleton(true);
createInputStream("names", helloPE);
In this case, all events will be dispatched to the only HelloPE instance in this partition, regardless of the content of the event.
S4 follows a staged event driven architecture and uses a pipeline of executors to process messages.
An executor is an object that executes tasks. It usually keeps a bounded queue of task items and schedules their execution through a pool of threads.
When processing queues are full, executors may adopt various possible behaviours, in particular, in S4:
S4 provides various default implementations of these behaviours and you can also define your own custom executors as appropriate.
The following picture illustrates the pipeline of executors.
DefaultCommModule
and DefaultCodeModule
)setParallelism()
method of Stream
default.s4.comm.properties