What is Collocated Processing?

The disk-centric systems, like RDBMS or NoSQL, generally utilize the classic client-server approach, where the data is brought from the server to the client side where it gets processed and then is usually discarded. This approach does not scale well as moving the data over the network is the most expensive operation in a distributed system.

A much more scalable approach is collocated processing that reverses the flow by bringing the computations to the servers where the data actually resides. This approach allows you to execute advanced logic or distributed SQL with JOINs exactly where the data is stored avoiding expensive serialization and network trips.

Usage Example

Let's assume that a blizzard is approaching New York City. You, as a telecommunication company has to warn all the people sending a message to everyone with precise instructions on how to behave during such weather conditions. There are around 8 million New Yorkers in your database that have to receive the text message.

With the client-server approach the company has to connect to the database, move all 8 million (!) records from there to a client application that will text to everyone. This is highly inefficient that wastes network and computational resources of company's IT infrastructure.

However, if the company initially collocates all the cities it covers with the people who live there then it can send a single computation (!) to the cluster node that stores information about all New Yorkers and send the text message from there. This approach avoids 8 million records movement over the network and helps utilizing cluster resources for computation needs. That's the collocated processing in action!

Code snippet below shows a way how this computation might look like:


Ignite ignite = ...

// NewYork ID.
long newYorkId = 2;

// Sending the logic to a cluster node that stores NewYork and all its inhabitants.
ignite.compute().affinityRun("City", newYorkId, new IgniteRunnable() {

  @IgniteInstanceResource
  Ignite ignite;

  @Override
  public void run() {
    // Getting an access to Persons cache.
    IgniteCache<BinaryObject, BinaryObject> people = ignite.cache("Person").withKeepBinary();


    ScanQuery<BinaryObject, BinaryObject> query = new ScanQuery <BinaryObject, BinaryObject>();

    try (QueryCursor<Cache.Entry<BinaryObject, BinaryObject>> cursor = people.query(query)) {
      // Iteration over the local cluster node data using the scan query.
      for (Cache.Entry<BinaryObject, BinaryObject> entry : cursor) {
        BinaryObject personKey = entry.getKey();

        // Picking NewYorker's only.
        if (personKey.<Long>field("CITY_ID") == newYorkId) {
            person = entry.getValue();

            // Sending the warning message to the person.
        }
      }
    }
  }
}

                        

Check more here.