Minimizing Network Utilization With Co-located Processing
Apache Ignite® supports co-located processing technique for compute-intensive and data-intensive calculations as well as machine learning algorithms. This technique increases performance by eliminating the impact of network latency.
In traditional disk-based systems, such as relational or NoSQL databases, client applications usually bring data from servers, use the records for local calculations, and discard the data as soon as the business task is complete. This approach does not scale well if a significant volume of data gets transferred over the network.
To overcome this issue, Apache Ignite supports a co-located processing technique. The primary aim of the technique is to increase the performance of your complex calculations or SQL with JOINs by running them straight on Ignite cluster nodes. In co-located processing, calculations are done on local data sets of the cluster nodes. This avoids records shuffling over the network and eliminates the impact of network latency on the performance of your applications.
Data Co-location
To use co-located processing in practice, first, you need to co-locate data sets by storing related records on the same cluster node. This process is also known as affinity co-location in Ignite.
For example, let's introduce Country
and City
tables and co-locate
all City
records that have the same Country
identifier on a single node. To
achieve this, you need to set CountryCode
as an affinityKey
in City
table:
CREATE TABLE Country ( Code CHAR(3), Name CHAR(52), Continent CHAR(50), Region CHAR(26), SurfaceArea DECIMAL(10,2), Population INT(11), Capital INT(11), PRIMARY KEY (Code) ) WITH "template=partitioned, backups=1"; CREATE TABLE City ( ID INT(11), Name CHAR(35), CountryCode CHAR(3), District CHAR(20), Population INT(11), PRIMARY KEY (ID, CountryCode) ) WITH "template=partitioned, backups=1, affinityKey=CountryCode";
This way, you instruct Ignite to store all the Cities
with the same CountryCode
on a single cluster node. As soon as the data is co-located, Ignite can execute compute and
data-intensive logic, and SQL with JOINs straight on the cluster nodes minimizing or even eliminating network
utilization.
SQL and Distributed JOINs
Ignite SQL engine performs much faster if a query gets executed against co-located records. This is especially crucial for SQL with JOINs that can span many cluster nodes.
Using the previous example with Country
and City
tables,
let's join the two tables returning the most populated cities in the given countries:
SELECT country.name, city.name, MAX(city.population) as max_pop FROM country JOIN city ON city.countrycode = country.code WHERE country.code IN ('USA','RUS','CHN') GROUP BY country.name, city.name ORDER BY max_pop DESC;
This query is executed only on the nodes that store records of China, Russia, and the USA. Also, during
the JOIN, the records are not shuffled between the nodes since all the Cities
with the same city.countrycode
are stored on a single node.
Distributed Collocated Computations
Apache Ignite compute and machine learning APIs allow you to perform computations and execute machine learning algorithms in parallel to achieve high performance, low latency, and linear scalability. Furthermore, both components work best with co-located data sets.
Let's take another example by imagining that a winter storm is about to hit a highly-populated city. As a telecommunication company, you have to send a text message to 20 million residents notifying about the blizzard. With the client-server approach, the company would read all 20 million records from a database to an application that needs to execute some logic and send a message to the residents eventually.
A much more efficient approach would be to run the logic on and send text messages from the cluster nodes that store the records of the residents. With this technique, instead of pulling 20 million records via the network, you execute the logic in place and eliminate the network impact on the performance of the calculation.
Here is an example of how this logic might look like:
Ignite ignite = ... // NewYork ID. long newYorkId = 2; // Send the logic to the cluster node that stores NewYork and all its inhabitants. ignite.compute().affinityRun("City", newYorkId, new IgniteRunnable() { @IgniteInstanceResource Ignite ignite; @Override public void run() { // Get access to the Person cache. IgniteCache<BinaryObject, BinaryObject> people = ignite.cache("Person").withKeepBinary(); ScanQuery<BinaryObject, BinaryObject> query = new ScanQuery <BinaryObject, BinaryObject>(); try (QueryCursor<Cache.Entry<BinaryObject, BinaryObject>> cursor = people.query(query)) { // Iteration over the local cluster node data using the scan query. for (Cache.Entry<BinaryObject, BinaryObject> entry : cursor) { BinaryObject personKey = entry.getKey(); // Pick NewYorkers only. if (personKey.<Long>field("CITY_ID") == newYorkId) { person = entry.getValue(); // Send the warning message to the person. } } } } }