Running Apache Streams


Running Apache Streams Embedded


Streams can be embedded in a stand-alone Java Application using the streams-runtime-local module.

First, in your application's build definition, import:

org.apache.streams:streams-runtime-local:0.1

Next, in the class you intend to run, create a StreamLocalBuilder.

StreamBuilder builder = new LocalStreamBuilder(100);

Add data providers for your application by importing additional provider modules, for example:

org.apache.streams:streams-provider-twitter:0.1

and adding providers to the builder:

builder.newPerpetualStream("provider", new TwitterStreamProvider());

The provided documents can be transformed by your application with processors:

builder.addStreamsProcessor("converter", new TwitterTypeConverter(ObjectNode.class, Activity.class);, 2, "provider");

Common database persistence libraries are also available as modules.

org.apache.streams:streams-persist-hdfs:0.1

Your stream can write documents to databases with a PersistWriter:

builder.addStreamsPersistWriter("hdfs", new WebHdfsPersistWriter(), 1, "converter");

Finally, your program should call

builder.start();

At which point the stream will initialize, begin execution, and run until all providers have completed.

Perpetual stream providers can run indefinitely, while other provider modes typically complete after retrieving a finite dataset.

Streams may be configured to time-out and terminate if no new documents are provided with some amount of time:

Map<String, Object> streamConfig = Maps.newHashMap();
streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 20 * 60 * 1000);
StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);

The shell running your application stream may also terminate the stream.

Upon receipt of a halt signal, the stream will attempt to process and write any documents in flight before shutting down.


Running Apache Streams Server


Before starting you will need the following software:

ServiceMix

Cassandra

Tomcat

Streams Master

Streams Source

Streams is designed for deployment in Apache Tomcat, Apache ServiceMix provides a JMS connection, and Apache Cassandra acts as a persistence layer.

To begin run mvn install from within the streams-master directory and then from within the streams-source directory:

 mvn install

This will generate a war file located in the streams-web/target directory. Before starting Tomcat, you will need to start Cassandra. To do this, run:

 ./bin/cassandra -f

from within your Cassandra installation

Copy the streams-web.war to the /webapps directory of tomcat and start Tomcat:

 ./bin/startup.sh

from within your Tomcat installation

Start ServiceMix:

 ./bin/servicemix

from within your ServiceMix installation

Testing Throughput

You can use any utility capable a sending HTTP requests, but Chrome's Advanced Rest Client browser plugin is simple and easy to use.

With your HTTP utility of choice set up a publisher by POSTing JSON like the following to http://localhost:8080/streams-web/publisher/register

 {
     "authToken": "token",
     "@class":"org.apache.streams.osgi.components.activityconsumer.impl.PushActivityConsumer",    
     "input_type": "http",
     "method": "post",
     "src": "http.example.com:8888",
     "delivery_frequency": "60",            
     "auth_type": "none",
     "username": "username",
     "password": "password"        
 }

The service will return a URL that can be POSTed to with ActivityStrea.ms formatted JSON.

Next setup a subscriber by POSTing JSON like the following to http://localhost:8080/streams-web/subscriber/register

 {
     "authToken": "token",
     "@class":"org.apache.streams.osgi.components.activitysubscriber.impl.ActivityStreamsSubscriptionImpl",
     "filters": [
         "tags"
     ]
 }

This will return a unique URL that can take GET and POST requests. POST requests should contain JSON like the above to reconfigure the endpoint, GET requests will return activity streams JSON.

Now, POST some JSON activitystrea.ms to the URL you received for the Publisher and then GET the URL you received for the Subscriber. The subscriber will return activity with tags that correspond to the subscribers filter. The following is an example of an activity that will be returned to the subscriber example:

 {
     "id": "id",
     "verb": "verb",
     "tags": "tags",
     "provider": {
          "url": "www.providerexample.com"
     },
     "actor": {
          "id": "actorid",
          "objectType": "actorobject",
          "displayName": "actorname",
          "url": "www.actorexampleurl.com"
     },
     "target": {
           "id": "targetid",
           "displayName": "targetname",
           "url": "www.targeturl.com"
     },
     "object": {
           "id": "objectid",
           "displayName": "objectname",
           "objectType": "object",
           "url": "www.objecturl.org"
       }
 }

Apache Streams Web Service Setup


Before starting you will need the following software:

Cassandra

Tomcat

Streams Master

Streams Source

Streams is designed for deployment in Apache Tomcat. Apache Cassandra acts as a persistence layer.

To begin run mvn install from within the streams-master directory and then from within the streams-source directory:

 mvn install

This will generate a war file located in the streams-web/target directory. Before starting Tomcat, you will need to start Cassandra. To do this, run:

 ./bin/cassandra -f

from within your Cassandra installation. The -f starts Cassandra in the foreground, omission of -f will start it in the background

Copy the streams-web.war to the /webapps directory of tomcat and start Tomcat:

 ./bin/startup.sh

from within your Tomcat installation

Testing Throughput

You can use any utility capable a sending HTTP requests, but Chrome's Advanced Rest Client browser plugin is simple and easy to use.

With your HTTP utility of choice set up a publisher by POSTing JSON like the following to http://localhost:8080/streams-web/app/publisherRegister

 {
     "src": "www.example.com"      
 }

The service will return a URL that can be POSTed to with ActivityStrea.ms formatted JSON.

Next setup a subscriber by POSTing JSON like the following to http://localhost:8080/streams-web/app/subscriberRegister

 {
     "filters": [
         "tags"
     ]
 }

This will return a unique URL that can take GET and POST requests. POST requests should contain JSON like the above to reconfigure the endpoint, GET requests will return activity streams JSON.

Now, POST some JSON activitystrea.ms to the URL you received for the Publisher and then GET the URL you received for the Subscriber. Note that the publisher src url and activity provider url are required to match. The subscriber will return activity with tags that correspond to the subscribers filter. The following is an example of a valid activity a publisher can POST:

 {
     "verb": "verb",
     "tags": "tags",
     "provider": {
          "url": "www.example.com"
     },
     "actor": {
          "id": "actorid",
          "objectType": "actorobject",
          "displayName": "actorname",
          "url": "www.actorexampleurl.com"
     },
     "target": {
           "id": "targetid",
           "displayName": "targetname",
           "url": "www.targeturl.com"
     },
     "object": {
           "id": "objectid",
           "displayName": "objectname",
           "objectType": "object",
           "url": "www.objecturl.org"
       }
 }