Data Ingestion and Streaming
Ignite streaming capabilities allows ingesting never-ending streams of data in a scalable and fault-tolerant fashion. The rates at which data can be injected into Ignite can be very high and easily exceed millions of events per second on a moderately sized cluster.
- Clients inject streams of data into Ignite.
- Data is automatically partitioned between Ignite data nodes.
- Data is concurrently processed across all cluster nodes.
- Clients perform concurrent
SQL queries
on the streamed data. - Clients subscribe to
continuous queries
as data changes.
// Get the data streamer reference and stream data. try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer("myStreamCache")) { // Stream entries. for (int i = 0; i < 100000; i++) stmr.addData(i, Integer.toString(i)); }
CacheConfiguration cfg = new CacheConfiguration("wordCountCache"); IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg); try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) { // Allow data updates. stmr.allowOverwrite(true); // Configure data transformation to count instances of the same word. stmr.receiver(StreamTransformer.from((e, arg) -> { // Get current count. Long val = e.getValue(); // Increment count by 1. e.setValue(val == null ? 1L : val + 1); return null; })); // Stream words into the streamer cache. for (String word : text) stmr.addData(word, 1L); }
CacheConfiguration<String, Instrument> cfg = new CacheConfiguration<>("instCache"); // LRU window holding 1,000,000 entries. cfg.setEvictionPolicyFactory(new LruEvictionPolicy(1_000_000)); // Index some fields for querying portfolio positions. cfg.setIndexedTypes(String.class, Instrument.class); // Get a handle on the cache (create it if necessary). IgniteCache<String, Instrument< instCache = ignite.getOrCreateCache(cfg); // Select top 3 best performing instruments from the sliding window. SqlFieldsQuery top3qry = new SqlFieldsQuery( "select symbol, (latestPrice - openPrice) as change " + "from Instrument " + "order by change " + "desc limit 3" ); // List of rows. Every row is represented as a List as well. List<List<?>> top3 = instCache.query(top3qry).getAll();
Also see continuous queries, word count, and other streaming examples available on GitHub.
Streaming Features
Feature | Description |
---|---|
Data Streamers |
Data streamers are defined by |
Collocated Processing |
For cases when you need to execute some custom logic instead of just adding new data,
you can take advantage of Stream receivers allow you to react to the streamed data in collocated fashion, directly on the nodes where it will be cached. You can change the data or add any custom pre-processing logic to it, before putting the data into cache. |
Continuous Queries |
Continuous queries are useful for cases when you want to execute a query and then continue to get notified about the data changes that fall into your query filter. |
JMS Data Streamer |
Ignite JMS Data Streamer consumes messages from JMS brokers and inserts them into Ignite caches. |
Apache Flume Sink |
IgniteSink is a Flume sink that extracts events from an associated Flume channel and injects into an Ignite cache. |
MQTT Streamer |
Ignite MQTT Streamer consumes messages from a MQTT topic and feeds transformed key-value pairs into an Ignite cache. |
Twitter Streamer |
Ignite Twitter Streamer consumes messages from a Twitter Streaming API and inserts them into an Ignite cache. |
Apache Kafka Streamer |
Ignite Kafka Data Streamer consumes messages for a given Kafka Topic from Kafka Broker and inserts them into an Ignite cache. |
Apache Camel streamer |
Ignite Camel streamer consumes messages from an Apache Camel consumer endpoint and feeds them into an Ignite cache. |
Apache Storm Streamer |
Ignite Storm Streamer consumes messages from an Apache Storm consumer endpoint and feeds them into an Ignite cache. |