Collocated Processing
The disk-centric systems, like RDBMS or NoSQL, generally utilize the classic client-server approach, where the data is brought from the server to the client side where it gets processed and then is usually discarded. This approach does not scale well as moving the data over the network is the most expensive operation in a distributed system.
A much more scalable approach is collocated
processing that reverses the flow by bringing
the computations to the servers where the data actually resides. This approach allows you to
execute advanced logic or distributed SQL with JOINs exactly where the data is stored avoiding
expensive serialization and network trips.
To start benefiting from the collocated processing, we need to ensure that the data is properly collocated
in the first place. If the business logic requires to access more than one entry, it is usually best to
collocate dependent entries on a single cluster node. This technique is also known as
affinity collocation
of the data.
In the example below, we have Country
and City
tables and want to collocate
City
entries with their corresponding Country
entries. To achieve this,
we use the WITH
clause and specify affinityKey=CountryCode
as shown below:
CREATE TABLE Country ( Code CHAR(3), Name CHAR(52), Continent CHAR(50), Region CHAR(26), SurfaceArea DECIMAL(10,2), Population INT(11), Capital INT(11), PRIMARY KEY (Code) ) WITH "template=partitioned, backups=1"; CREATE TABLE City ( ID INT(11), Name CHAR(35), CountryCode CHAR(3), District CHAR(20), Population INT(11), PRIMARY KEY (ID, CountryCode) ) WITH "template=partitioned, backups=1, affinityKey=CountryCode";
By collocating the tables together we can ensure that all the entries with the same affinityKey
will be stored on the same cluster node, hence avoiding costly network trips to fetch data from other
remote nodes.
Apache Ignite SQL engine will always perform much more efficiently if a query is run against the collocated data. It is especially crucial for execution of distributed JOINs within the cluster.
Taking the example of the two tables created above, let's get the most populated cities across China,
Russia and the USA joining the data stored in the Country
and City
tables, as follows:
SELECT country.name, city.name, MAX(city.population) as max_pop FROM country JOIN city ON city.countrycode = country.code WHERE country.code IN ('USA','RUS','CHN') GROUP BY country.name, city.name ORDER BY max_pop DESC;
Since all the cities were collocated with their countries, the JOIN will execute only on the nodes that store China, Russia and the USA entries. This approach avoids expensive data movement across the network, and therefore scales better and provides the fastest performance.
Apache Ignite compute grid and machine learning components allow to perform computations and execute machine learning algorithms in parallel to achieve high performance, low latency, and linear scalability. Furthermore, both components work best with collocated data and collocated processing in general.
For instance, let's assume that a blizzard is approaching New York. As a telecommunication company,
you have to send a warning text message to 8 million New Yorkers.
With the client-server approach the company has to move all
A much more efficient approach would be to send the text-messaging logic to the cluster node responsible for storing the New York residents. This approach moves only 1 computation instead of 8 million records across the network, and performs a lot better.
Here is an example of how this logic might look like:
Ignite ignite = ... // NewYork ID. long newYorkId = 2; // Sending the logic to a cluster node that stores NewYork and all its inhabitants. ignite.compute().affinityRun("City", newYorkId, new IgniteRunnable() { @IgniteInstanceResource Ignite ignite; @Override public void run() { // Getting an access to Persons cache. IgniteCache<BinaryObject, BinaryObject> people = ignite.cache("Person").withKeepBinary(); ScanQuery<BinaryObject, BinaryObject> query = new ScanQuery <BinaryObject, BinaryObject>(); try (QueryCursor<Cache.Entry<BinaryObject, BinaryObject>> cursor = people.query(query)) { // Iteration over the local cluster node data using the scan query. for (Cache.Entry<BinaryObject, BinaryObject> entry : cursor) { BinaryObject personKey = entry.getKey(); // Picking NewYorker's only. if (personKey.<Long>field("CITY_ID") == newYorkId) { person = entry.getValue(); // Sending the warning message to the person. } } } } }
Feature | Description |
---|---|
Affinity Collocation |
If business logic requires to access more than one entry it can be reasonable to collocate dependent entries by storing them on a single cluster node: |
Collocated Computations |
It is also possible to route computations to the nodes where the data is stored: |
Compute Grid |
Distributed computations are performed in parallel fashion to gain high performance, low latency, and linear scalability: |
Distributed JOINs |
Ignite supports collocated and non-collocated distributed SQL joins: |
Machine Learning |
Ignite machine learning component allows users to run ML/DL training and inference directly on the data stored in an Ignite cluster and provides ML and DL algorithms: |