The Slider Role History module

For a higher level view of Slider placement, look at "placement".

Important: the code samples in here are inevitably out of date. Trust the source and its tests.

Last updated 2016-01-06

Changes

Slider 0.80-incubating

A major rework of placement has taken place, Über-JIRA : placement phase 2

  1. Slider manages the process of relaxing a request from a specific host to "anywhere in the cluster".
  2. Each role/component type may have a configurable timeout for escalation to begin.
  3. Slider periodically checks (every 30s by default) to see if there are outstanding requests that have reached their escalation timeout and yet have not been satisfied.
  4. Such requests are cancelled and "relaxed" requests re-issued.
  5. Labels are always respected; even relaxed requests use any labels specified in resources.json
  6. If a node is considered unreliable (as per-the slider-0.70-incubating changes), it is not used in the initial request. YARN may still allocate relaxed instances on such nodes. That is: there is no explicit blacklisting, merely deliberate exclusion of unreliable nodes from explicitly placed requests.
  7. Node and component failure counts are reset on a regular schedule. The "recently failed" counters are the ones used to decide if a node is unreliable or a component has failed too many times. Long-lived applications can therefore tolerate a low rate of component failures.
  8. The notion of "failed" differentiates between application failures, node failures and pre-emption.
    • YARN container pre-emption is not considered a failure.
    • Node failures are: anything reported as such by YARN, and any unexpected application exit (as these may be caused by node-related issues; port conflict with other applications...etc)
    • Application failures are resource limits being exceeded (RAM, VRAM), and unexpected application exit.
    • Only "application failures" are added to the "failed recently" count —and so only they are used to decide whether a component has a failed too many times for the application to be considered working.
Role History Reloading Enhancements

How persisted role history has also been improved [SLIDER-600]((https://issues.apache.org/jira/browse/SLIDER-600)

  1. Reloading of persistent history has been made resilient to changes in the number of roles.
  2. If there are more roles in the history than the current set, the surplus entries are discarded.
  3. If there are fewer, then the smaller set of roles are loaded
  4. The mapping of role name to role ID is persisted.
  5. There are tests to verify that the original role histories (those without maps) are reloaded.

There are no checks that the mapping of name to ID has not changed; the current model is "don't do that". If it is done while an application instance is stopped, the rebuilt placement may hurt performance of those roles which require an absolute location. If it is done during AM restart, the AM will be unable to reliably rebuild its state model. SLIDER-849 proposes future handling of these situations. By saving the current rolename to ID mappings, the placement history will be ready for such a feature when it is implemented.

The enhanced role history format is backwards compatible; there are tests to verify that. However Slider 0.70 and earlier cannot process the 0.80 version entries. If there are problems reloading entries there are two tactics

  1. Delete the JSON files
  2. Edit the most recent JSON file and delete any entry with the type RoleHistoryMapping.

Slider 0.70-incubating

The Slider AM tracks the recent failure count of each role on each (known) YARN node; when the failure count of a role is above a role-specific threshold, no more explicit requests are made for that host. Exception: "strict placement" component deployments are always requsted against the node irrespective of their history.

This failure count is reset on a regular schedule to avoid a past history of contaminating Slider's placement history indefinitely. The fact that a node was unreliable two weeks ago should not mean that it should be blacklisted this week.

Note that this failure history is not persisted; it is lost on AM start/restart.

Slider 0.60-incubating

  1. Slider supports strict placement in which a request is when made for a previously used node and never used again.
  2. Different component types can be assigned specific labels to be deployed upon. This allows cluster administrators to partition the YARN cluster up into different functions, with specific Slider Application instances only requesting components on differently labelled sets of nodes. As such, it offers significant control of component placement, without actually dedicating hardware to specific applications. Note that the HDFS filesystem is still shared across the cluster, so IO from other in-cluster applications will impinge on each other.

The RoleHistory Datastructure

The RoleHistory is a datastructure which models the component assignment, and can persist it to and restore it from the (shared) filesystem.

  • For each component, there is a list of YARN cluster nodes which have supported this component used in the past.

  • This history is used when selecting a node for a component.

  • This history remembers when nodes were allocated. These are re-requested when starting an application instance again, and when re-requesting components after any node failure.

  • It must also remember when nodes were released —these are re-requested when expanding a set of instances of a specific component to a previous size during flex operations.

  • It has to track nodes for which Slider has an outstanding container request with YARN. This ensures that the same node is not requested more than once due to outstanding requests.

  • It does not retain a complete history of the component —and does not need to. All it needs to retain is the recent history for every node onto which a component instance has been deployed. Specifically, the last allocation or release operation on a node is all that needs to be persisted.

  • On AM startup, all nodes in the history are considered candidates, even those nodes currently marked as active —as they were from the previous instance.

  • On AM restart, nodes in the history marked as active have to be considered still active —the YARN RM will have to provide the full list of which are not.

  • During flexing, nodes marked as released —and for which there is no outstanding request — are considered candidates for requesting new instances.

  • When choosing a candidate node for hosting a component instance, it from the head of the time-ordered list of nodes that last ran an instance of that component

Persistence

The state of the component is persisted to HDFS on changes —but not on Application Instance termination.

  1. When nodes are allocated, the RoleHistory is marked as dirty
  2. When container release callbacks are received, the RoleHistory is marked as dirty
  3. When nodes are requested or a release request made, the RoleHistory is not marked as dirty. This information is not relevant on AM restart.

As at startup, a large number of allocations may arrive in a short period of time, the RoleHistory may be updated very rapidly —yet as the containers are only recently activated, it is not likely that an immediately restarted Slider Application Instance would gain by re-requesting containers on them —their historical value is more important than their immediate past.

Accordingly, the RoleHistory may be persisted to HDFS asynchronously, with the dirty bit triggering an flushing of the state to HDFS. The datastructure will still need to be synchronized for cross thread access, but the sync operation will not be a major deadlock, compared to saving the file on every container allocation response (which will actually be the initial implementation).

There's no need to persist the format in a human-readable form; while protobuf might seem the approach most consistent with the rest of YARN, it's not an easy structure to work with.

The initial implementation will use Apache Avro as the persistence format, with the data saved in JSON or compressed format.

Weaknesses in this design

Blacklisting: even if a node fails repeatedly, this design will still try to re-request instances on this node; there is no blacklisting. As a central blacklist for YARN has been proposed, it is hoped that this issue will be addressed centrally, without Slider having to remember which nodes are unreliable for that particular Application Instance.

Anti-affinity: If multiple role instances are assigned to the same node, Slider has to choose on restart or flexing whether to ask for multiple nodes on that node again, or to pick other nodes. The assumed policy is "only ask for one node"

Bias towards recent nodes over most-used: re-requesting the most recent nodes, rather than those with the most history of use, may push Slider to requesting nodes that were only briefly in use —and so have on a small amount of local state, over nodes that have had long-lived instances. This is a problem that could perhaps be addressed by preserving more history of a node —maintaining some kind of moving average of node use and picking the heaviest used, or some other more-complex algorithm. This may be possible, but we'd need evidence that the problem existed before trying to address it.

The NodeMap

A core data structure, the NodeMap, is a map of every known node in the YARN cluster, tracking how many containers are allocated to specific roles in it, and, when there are no active instances, when it was last used. This history is used to choose where to request new containers. Because of the asynchronous allocation and release of containers, the History also needs to track outstanding release requests —and, more critically, outstanding allocation requests. If Slider has already requested a container for a specific role on a host, then asking for another container of that role would break anti-affinity requirements. Note that not tracking outstanding requests would radically simplify some aspects of the design, especially the complexity of correlating allocation responses with the original requests —and so the actual hosts originally requested.

  1. Slider builds up a map of which nodes have recently been used.
  2. Every node counts the number. of active containers in each role.
  3. Nodes are only chosen for allocation requests when there are no active or requested containers on that node.
  4. When choosing which instances to release, Slider could pick the node with the most containers on it. This would spread the load.
  5. When there are no empty nodes to request containers on, a request would let YARN choose.

Strengths

  • Handles the multi-container on one node problem
  • By storing details about every role, cross-role decisions could be possible
  • Simple counters can track the state of pending add/release requests
  • Scales well to a rapidly flexing Application Instance
  • Simple to work with and persist
  • Easy to view and debug
  • Would support cross-role collection of node failures in future

Weaknesses

  • Size of the data structure is O(nodes * role-instances). This could be mitigated by regular cleansing of the structure. For example, at startup time (or intermittently) all unused nodes > 2 weeks old could be dropped.
  • Locating a free node could take O(nodes) lookups —and if the criteria of "newest" is included, will take exactly O(nodes) lookups. As an optimization, a list of recently explicitly released nodes can be maintained.
  • Need to track outstanding requests against nodes, so that if a request was satisfied on a different node, the original node's request count is decremented, not that of the node actually allocated.
  • In a virtual YARN cluster, may fill with node entries that are no longer in the cluster. Slider should query the RM (or topology scripts?) to determine if nodes are still parts of the YARN cluster.

Data Structures

RoleHistory

startTime: long
saveTime: long
dirty: boolean
nodemap: NodeMap
roles: RoleStatus[]
outstandingRequests: transient OutstandingRequestTracker
availableNodes: transient List<NodeInstance>[]

This is the aggregate data structure that is persisted to/from file

NodeMap

clusterNodes: Map: NodeId -> NodeInstance
clusterNodes(): Iterable<NodeInstance>
getOrCreate(NodeId): NodeInstance

Maps a YARN NodeID record to a Slider NodeInstance structure

NodeInstance

Every node in the YARN cluster is modeled as an ragged array of NodeEntry instances, indexed by role index.

NodeEntry[roles]
get(roleId): NodeEntry or null
create(roleId): NodeEntry
getNodeEntries(): NodeEntry[roles]
getOrCreate(roleId): NodeEntry
remove(roleId): NodeEntry

This could be implemented in a map or an indexed array; the array is more efficient but it does mandate that the number of roles are bounded and fixed.

NodeEntry

Records the details about all of a roles containers on a node. The active field records the number of containers currently active.

active: int
requested: transient int
releasing: transient int
last_used: long

NodeEntry.available(): boolean = active - releasing == 0 && requested == 0

The two fields releasing and requested are used to track the ongoing state of YARN requests; they do not need to be persisted across stop/start cycles. They may be relevant across AM restart, but without other data structures in the AM, not enough to track what the AM was up to before it was restarted. The strategy will be to ignore unexpected allocation responses (which may come from pre-restart) requests, while treating unexpected container release responses as failures.

The active counter is only decremented after a container release response has been received.

RoleStatus

This is the existing org.apache.hoya.yarn.appmaster.state.RoleStatus class

RoleList

A list mapping role to int enum is needed to index NodeEntry elements in the NodeInstance arrays. Although such an enum is already implemented in the Slider Providers, explicitly serializing and deserializing it would make the persistent structure easier to parse in other tools, and resilient to changes in the number or position of roles.

This list could also retain information about recently used/released nodes, so that the selection of containers to request could shortcut a search

ContainerPriority

The container priority field (a 32 bit integer) is used by Slider (0.5.x) to index the specific role in a container so as to determine which role has been offered in a container allocation message, and which role has been released on a release event.

The History needs to track outstanding requests, so that when an allocation comes in, it can be mapped back to the original request. Simply looking up the nodes on the provided container and decrementing its request counter is not going to work —the container may be allocated on a different node from that requested.

Proposal: The priority field of a request is divided by Slider into 8 bits for roleID and 24 bits for requestID. The request ID will be a simple rolling integer —Slider will assume that after 2^24 requests per role, it can be rolled, -though as we will be retaining a list of outstanding requests, a clash should not occur. The main requirement is: not have > 2^24 outstanding requests for instances of a specific role, which places an upper bound on the size of the Application Instance.

The splitting and merging will be implemented in a ContainerPriority class, for uniform access.

OutstandingRequest

Tracks an outstanding request. This is used to correlate an allocation response (whose Container Priority file is used to locate this request), with the node and role used in the request.

  roleId:  int
  requestID :  int
  node: string (may be null)
  requestedTime: long
  priority: int = requestID << 24 | roleId

The node identifier may be null —which indicates that a request was made without a specific target node

OutstandingRequestTracker

Contains a map from requestID to the specific OutstandingRequest made, and generates the request ID

nextRequestId: int
requestMap(RequestID) -> OutstandingRequest

Operations

addRequest(NodeInstance, RoleId): OutstandingRequest
    (and an updated request Map with a new entry)
lookup(RequestID): OutstandingRequest
remove(RequestID): OutstandingRequest
listRequestsForNode(ClusterID): [OutstandingRequest]

The list operation can be implemented inefficiently unless it is found to be important —if so a more complex structure will be needed.

AvailableNodes

This is a field in RoleHistory

availableNodes: List<NodeInstance>[]

For each role, lists nodes that are available for data-local allocation, ordered by more recently released — To accelerate node selection

The performance benefit is most significant when requesting multiple nodes, as the scan for M locations from N nodes is reduced from M*N comparisons to 1 Sort + M list lookups.

Each list can be created off the Node Map by building for each role a sorted list of all Nodes which are available for an instance of that role, using a comparator that places the most recently released node ahead of older nodes.

This list is not persisted —when the Application Instance is stopped it is moot, and when an AM is restarted this structure will be rebuilt.

  1. When a node is needed for a new request, this list is consulted first.
  2. After the request is issued it can be removed from the list
  3. Whenever a container is released, if the node is now available for requests for that node, should be added to to the front of the list for that role.

If the list is empty during a container request operation, it means that the Role History does not know of any nodes in the YARN cluster that have hosted instances of that role and which are not in use. There are then two possible strategies to select a role

  1. Ask for an instance anywhere in the YARN cluster (policy in Slider 0.5)
  2. Search the node map to identify other nodes which are (now) known about, but which are not hosting instances of a specific role —this can be used as the target for the next resource request.

Strategy #1 is simpler; Strategy #2 may decrease the affinity of component placement, as the AM will be explicitly requesting an instance on a node which it knows is not running an instance of that role.

ISSUE What to do about failing nodes?

Should a node whose container just failed be placed at the top of the stack, ready for the next request?

If the container failed due to an unexpected crash in the application, asking for that container back is the absolute right strategy —it will bring back a new role instance on that machine.

If the container failed because the node is now offline, the container request will not be satisfied by that node.

If there is a problem with the node, such that containers repeatedly fail on it, then re-requesting containers on it will amplify the damage.

Actions

Bootstrap

Persistent Role History file not found; empty data structures created.

Restart

When starting an Application Instance, the Role History should be loaded.

If the history is missing or cannot be loaded for any reason, Slider must revert to the bootstrap actions.

If found, the Role History will contain Slider's view of the Application Instance's state at the time the history was saved, explicitly recording the last-used time of all nodes no longer hosting a role's container. By noting which roles were actually being served, it implicitly notes which nodes have a last_used value greater than any of the last_used fields persisted in the file. That is: all node entries listed as having active nodes at the time the history was saved must have more recent data than those nodes listed as inactive.

When rebuilding the data structures, the fact that nodes were active at save time must be converted into the data that indicates that the nodes were at least in use at the time the data was saved. The state of the Application Instance after the last save is unknown.

1: History loaded; Failure => Bootstrap. 2: Future: if role list enum != current enum, remapping could take place. Until then: fail. 3: Mark all nodes as active at save time to that of the

//define a threshold
threshold = rolehistory.saveTime - 7*24*60*60* 1000

for (clusterId, clusternode) in rolehistory.clusterNodes().entries() :
  for (role, nodeEntry) in clusterNode.getNodeEntries():
    nodeEntry.requested = 0
    nodeEntry.releasing = 0
    if nodeEntry.active > 0 :
      nodeEntry.last_used = rolehistory.saveTime;
    nodeEntry.n.active = 0
    if nodeEntry.last_used < threshold :
      clusterNode.remove(role)
    else:
     availableNodes[role].add(clusterId)
   if clusterNode.getNodeEntries() isEmpty :
     rolehistory.clusterNodes.remove(clusterId)


for availableNode in availableNodes:
  sort(availableNode,new last_used_comparator())

After this operation, the structures are purged with all out of date entries, and the available node list contains a sorted list of the remainder.

AM Restart

1: Create the initial data structures as a normal start operation 2: update the structure with the list of live nodes, removing those nodes from the list of available nodes

now = time()
activeContainers = RM.getActiveContainers()

for container in activeContainers:
   nodeId = container.nodeId
   clusterNode = roleHistory.nodemap.getOrCreate(nodeId)
   role = extractRoleId(container.getPriority)
   nodeEntry = clusterNode.getOrCreate(role)
   nodeEntry.active++
   nodeEntry.last_used = now
   availableNodes[role].remove(nodeId)

There's no need to resort the available node list —all that has happened is that some entries have been removed

Teardown

  1. If dirty, save role history to its file.
  2. Issue release requests
  3. Maybe update data structures on responses, but do not mark RoleHistory as dirty or flush it to disk.

This strategy is designed to eliminate the expectation that there will ever be a clean shutdown —and so that the startup-time code should expect the RoleHistory data to have been written during shutdown. Instead the code should assume that the history was saved to disk at some point during the life of the Application Instance —ideally after the most recent change, and that the information in it is only an approximate about what the previous state of the Application Instance was.

Flex: Requesting a container in role role

node = availableNodes[roleId].pop() 
if node != null :
  node.nodeEntry[roleId].requested++;
outstanding = outstandingRequestTracker.addRequest(node, roleId)
request.node = node
request.priority = outstanding.priority

//update existing Slider role status
roleStatus[roleId].incRequested();

There is a bias here towards previous nodes, even if the number of nodes in the Application Instance has changed. This is why a node is picked where the number of active-releasing == 0 and requested == 0, rather than where it is simply the lowest value of active + requested - releasing: if there is no node in the nodemap that is not running an instance of that role, it is left to the RM to decide where the role instance should be instantiated.

This bias towards previously used nodes also means that (lax) requests will be made of nodes that are currently unavailable either because they are offline or simply overloaded with other work. In such circumstances, the node will have an active count of zero —so the search will find these nodes and request them —even though the requests cannot be satisfied. As a result, the request will be downgraded to a rack-local or cluster-wide, request —an acceptable degradation on an Application Instance where all the other entries in the nodemap have instances of that specific node —but not when there are empty nodes.

Solutions

  1. Add some randomness in the search of the datastructure, rather than simply iterate through the values. This would prevent the same unsatisfiable node from being requested first.

  2. Keep track of requests, perhaps through a last-requested counter —and use this in the selection process. This would radically complicate the selection algorithm, and would not even distinguish "node recently released that was also the last requested" from "node that has not recently satisfied requests even though it was recently requested".

  3. Keep track of requests that weren't satisfied, so identify a node that isn't currently satisfying requests.

History Issues

Without using that history, there is a risk that a very old assignment is used in place of a recent one and the value of locality decreased.

But there are consequences:

Performance:

Using the history to pick a recent node may increase selection times on a large Application Instance, as for every instance needed, a scan of all nodes in the nodemap is required (unless there is some clever bulk assignment list being built up), or a sorted version of the nodemap is maintained, with a node placed at the front of this list whenever its is updated.

Startup-time problems

There is also the risk that while starting an applicatin instance, the rolehistory.saved flag may be updated while the flex is in progress, so making the saved nodes appear out of date. Perhaps the list of recently released nodes could be rebuilt at startup time.

The proposed recentlyReleasedList addresses this, though it creates another data structure to maintain and rebuild at startup time from the last-used fields in the node entries.

AM Callback : onContainersAllocated

void onContainersAllocated(List<Container> allocatedContainers)

This is the callback received when containers have been allocated. Due to (apparently) race conditions, the AM may receive duplicate container allocations —Slider already has to recognize this and currently simply discards any surplus.

If the AM tracks outstanding requests made for specific hosts, it will need to correlate allocations with the original requests, so as to decrement the node-specific request count. Decrementing the request count on the allocated node will not work, as the allocation may not be to the node originally requested.

assignments = []
operations =  []
for container in allocatedContainers:
  cid = container.getId();
  roleId = container.priority & 0xff
  nodeId = container.nodeId
  outstanding = outstandingRequestTracker.remove(C.priority)
  roleStatus = lookupRoleStatus(container);
  roleStatus.decRequested();
  allocated = roleStatus.incActual();
  if outstanding == null || allocated > desired :
    operations.add(new ContainerReleaseOperation(cid))
    surplusNodes.add(cid);
    surplusContainers++
    roleStatus.decActual();
  else:
    assignments.add(new ContainerAssignment(container, role))
    node = nodemap.getOrCreate(nodeId)
    nodeentry = node.get(roleId)
    if nodeentry == null :
      nodeentry = new NodeEntry()
      node[roleId] = nodeentry
      nodeentry.active = 1
    else:
      if nodeentry.requested > 0 :
        nodeentry.requested--
      nodeentry.active++
    nodemap.dirty = true

    // work back from request ID to node where the 
    // request was outstanding
    requestID = outstanding != null? outstanding.nodeId : null
    if requestID != null:
      reqNode = nodeMap.get(requestID)
      reqNodeEntry = reqNode.get(roleId)
      reqNodeEntry.requested--
      if reqNodeEntry.available() :
        availableNodeList.insert(reqNodeEntry)
  1. At end of this, there is a node in the nodemap, which has recorded that there is now an active node entry for that role. The outstanding request has been removed.

  2. If a callback comes in for which there is no outstanding request, it is rejected (logged, ignored, etc). This handles duplicate responses as well as any other sync problem.

  3. The node selected for the original request has its request for a role instance decremented, so that it may be viewed as available again. The node is also re-inserted into the AvailableNodes list —not at its head, but at its position in the total ordering of the list.

NMClientAsync Callback: onContainerStarted()

onContainerStarted(ContainerId containerId)

The AM uses this as a signal to remove the container from the list of starting containers, moving it into the map of live nodes; the counters in the associated RoleInstance are updated accordingly; the node entry adjusted to indicate it has one more live node and one less starting node.

NMClientAsync Callback: onContainerStartFailed()

The AM uses this as a signal to remove the container from the list of starting containers —the count of starting containers for the relevant NodeEntry is decremented. If the node is now available for instances of this container, it is returned to the queue of available nodes.

Flex: Releasing a Component instance from the Application Instance

Simple strategy: find a node with at least one active container of that type

select a node N in nodemap where for NodeEntry[roleId]: active > releasing; 
nodeentry = node.get(roleId)
nodeentry.active--;

Advanced Strategy:

Scan through the map looking for a node where active >1 && active > releasing.
If none are found, fall back to the previous strategy

This is guaranteed to release a container on any node with >1 container in use, if such a node exists. If not, the scan time has increased to #(nodes).

Once a node has been identified

  1. a container on it is located (via the existing container map). This container must: be of the target role, and not already be queued for release.
  2. A release operation is queued trigger a request for the RM.
  3. The (existing) containersBeingReleased Map has the container inserted into it

After the AM processes the request, it triggers a callback

AM callback onContainersCompleted:

void onContainersCompleted(List<ContainerStatus> completedContainers)

This callback returns a list of containers that have completed.

These need to be split into successful completion of a release request and containers which have failed.

This is currently done by tracking which containers have been queued for release, as well as which were rejected as surplus before even having any role allocated onto them.

A container is considered to have failed if it was an active container which has completed although it wasn't on the list of containers to release

shouldReview = false
for container in completedContainers:
  containerId = container.containerId
  nodeId = container.nodeId
  node = nodemap.get(nodeId)
  if node == null :
    // unknown node
    continue
  roleId = node.roleId
  nodeentry = node.get(roleId)
  nodeentry.active --
  nodemap.dirty = true
  if getContainersBeingReleased().containsKey(containerId) :
    // handle container completion
    nodeentry.releasing --

    // update existing Slider role status
    roleStatus[roleId].decReleasing();
    containersBeingReleased.remove(containerId)
  else: 
    //failure of a live node
    roleStatus[roleId].decActual();
    shouldReview = true

  if nodeentry.available():
    nodentry.last_used = now()
    availableNodes[roleId].insert(node)      
  //trigger a comparison of actual vs desired
if shouldReview :
  reviewRequestAndReleaseNodes()

By calling reviewRequestAndReleaseNodes() the AM triggers a re-evaluation of how many instances of each node an Application Instance has, and how many it needs. If a container has failed and that freed up all role instances on that node, it will have been inserted at the front of the availableNodes list. As a result, it is highly likely that a new container will be requested on the same node. (The only way a node the list would be newer is be if other containers were completed in the same callback)

Implementation Notes

Notes made while implementing the design.

OutstandingRequestTracker should also track requests made with no target node; this makes seeing what is going on easier. ARMClientImpl is doing something similar, on a priority-by-priority basis —if many requests are made, each with their own priority, that base class's hash tables may get overloaded. (it assumes a limited set of priorities)

Access to the RoleHistory datastructures was restricted to avoid synchronization problems. Protected access is permitted so that a test subclass can examine (and change?) the internals.

`NodeEntries need to add a launching value separate from active so that when looking for nodes to release, no attempt is made to release a node that has been allocated but is not yet live.

We can't reliably map from a request to a response. Does that matter? If we issue a request for a host and it comes in on a different port, do we care? Yes —but only because we are trying to track nodes which have requests outstanding so as not to issue new ones. But if we just pop the entry off the available list, that becomes moot.

Proposal: don't track the requesting numbers in the node entries, just in the role status fields.

but: this means that we never re-insert nodes onto the available list if a node on them was requested but not satisfied.

Other issues: should we place nodes on the available list as soon as all the entries have been released? I.e. Before YARN has replied

RoleStats were removed —left in app state. Although the rolestats would belong here, leaving them where they were reduced the amount of change in the AppState class, so risk of something breaking.

MiniYARNCluster node IDs

Mini YARN cluster NodeIDs all share the same hostname , at least when running against file://; so mini tests with >1 NM don't have a 1:1 mapping of NodeId:NodeInstance. What will happen is that NodeInstance getOrCreateNodeInstance(Container container) ' will always return the same (now shared)NodeInstance`.

Releasing Containers when shrinking an Application Instance

When identifying instances to release in a bulk downscale operation, the full list of targets must be identified together. This is not just to eliminate multiple scans of the data structures, but because the containers are not released until the queued list of actions are executed —the nodes' release-in-progress counters will not be incremented until after all the targets have been identified.

It also needs to handle the scenario where there are many role instances on a single server —it should prioritize those.

The NodeMap/NodeInstance/NodeEntry structure is adequate for identifying nodes, at least provided there is a 1:1 mapping of hostname to NodeInstance. But it is not enough to track containers in need of release: the AppState needs to be able to work backwards from a NodeEntry to container(s) stored there.

The AppState class currently stores this data in a ConcurrentMap<ContainerId, RoleInstance>

To map from NodeEntry/NodeInstance to containers to delete, means that either a new datastructure is created to identify containers in a role on a specific host (e.g a list of ContainerIds under each NodeEntry), or we add an index reference in a RoleInstance that identifies the node. We already effectively have that in the container

dropping any available nodes that are busy

When scanning the available list, any nodes that are no longer idle for that role should be dropped from the list.

This can happen when an instance was allocated on a different node from that requested.

Finding a node when a component has some live instances and no available hosts known to

Slider.

One condition found during testing is the following:

  1. A role has one or more instances running in the YARN cluster
  2. A role has no entries in its available list: there is no history of the role ever being on nodes other than which is currently in use.
  3. A new instance is requested.

In this situation, the findNodeForNewInstance method returns null: there is no recommended location for placement. However, this is untrue: all nodes in the YARN cluster other than those in use are the recommended nodes.

It would be possible to build up a list of all known nodes in the YARN cluster that are not running this role and use that in the request, effectively telling the AM to pick one of the idle nodes. By not doing so, we increase the probability that another instance of the same role will be allocated on a node in use, a probability which (were there capacity on these nodes and placement random), be 1/(clustersize-roleinstances). The smaller the YARN cluster and the bigger the application, the higher the risk.

This could be revisited, if YARN does not support anti-affinity between new requests at a given priority and existing ones: the solution would be to issue a relaxed placement request listing all nodes that are in the NodeMap and which are not running an instance of the specific role. [To be even more rigorous, the request would have to omit those nodes for which an allocation has already been made off the available list and yet for which no container has yet been granted].

Reworked Outstanding Request Tracker

The reworked request tracker behaves as follows

  1. outstanding requests with specific placements are tracked by (role, hostname)
  2. container assignments are attempted to be resolved against the same parameters.
  3. If found: that request is considered satisfied *irrespective of whether or not the request that satisfied the allocation was the one that requested that location.
  4. When all instances of a specific role have been allocated, the hostnames of all outstanding requests are returned to the available node list on the basis that they have been satisfied elsewhere in the YARN cluster. This list is then sorted.

This strategy returns unused hosts to the list of possible hosts, while retaining the ordering of that list in most-recent-first.

Weaknesses

if one or more container requests cannot be satisfied, then all the hosts in the set of outstanding requests will be retained, so all these hosts in the will be considered unavailable for new location-specific requests. This may imply that new requests that could be explicitly placed will now only be randomly placed —however, it is moot on the basis that if there are outstanding container requests it means the RM cannot grant resources: new requests at the same priority (i.e. same Slider Role ID) will not be granted either.

The only scenario where this would be different is if the resource requirements of instances of the target role were updateed during a flex such that the placement could now be satisfied on the target host. This is not considered a significant problem.

Persistence

The initial implementation uses the JSON-formatted Avro format; while significantly less efficient than a binary format, it is human-readable

Here are sequence of entries from a test run on a single node YARN cluster; running 1 HBase Master and two region servers.

Initial save; the instance of Role 1 (HBase master) is live, Role 2 (RS) is not.

{"entry":{"org.apache.slider.server.avro.RoleHistoryHeader":{"version":1,"saved":1384183475949,"savedx":"14247c3aeed","roles":3}}}
{"entry":{"org.apache.slider.server.avro.NodeEntryRecord":{"host":"192.168.1.85","role":1,"active":true,"last_used":0}}}
{"entry":{"org.apache.slider.server.avro.NodeEntryRecord":{"host":"192.168.1.85","role":2,"active":false,"last_used":0}}}

At least one RS is live:

{"entry":{"org.apache.slider.server.avro.RoleHistoryFooter":{"count":2}}}{"entry":{"org.apache.slider.server.avro.RoleHistoryHeader":{"version":1,"saved":1384183476010,"savedx":"14247c3af2a","roles":3}}}
{"entry":{"org.apache.slider.server.avro.NodeEntryRecord":{"host":"192.168.1.85","role":1,"active":true,"last_used":0}}}
{"entry":{"org.apache.slider.server.avro.NodeEntryRecord":{"host":"192.168.1.85","role":2,"active":true,"last_used":0}}}

Another entry is saved —presumably the second RS is now live, which triggered another write

{"entry":{"org.apache.slider.server.avro.RoleHistoryFooter":{"count":2}}}{"entry":{"org.apache.slider.server.avro.RoleHistoryHeader":{"version":1,"saved":1384183476028,"savedx":"14247c3af3c","roles":3}}}
{"entry":{"org.apache.slider.server.avro.NodeEntryRecord":{"host":"192.168.1.85","role":1,"active":true,"last_used":0}}}
{"entry":{"org.apache.slider.server.avro.NodeEntryRecord":{"host":"192.168.1.85","role":2,"active":true,"last_used":0}}}

At this point the Application Instance was stopped and started.

When the Application Instance is restarted, every node that was active for a role at the time the file was saved 1384183476028 is given a last_used timestamp of that time.

When the history is next saved, the master has come back onto the (single) node, it is active while its last_used timestamp is the previous file's timestamp. No region servers are yet live.

{"entry":{"org.apache.slider.server.avro.RoleHistoryFooter":{"count":2}}}{"entry":{"org.apache.slider.server.avro.RoleHistoryHeader":{"version":1,"saved":1384183512173,"savedx":"14247c43c6d","roles":3}}}
{"entry":{"org.apache.slider.server.avro.NodeEntryRecord":{"host":"192.168.1.85","role":1,"active":true,"last_used":1384183476028}}}
{"entry":{"org.apache.slider.server.avro.NodeEntryRecord":{"host":"192.168.1.85","role":2,"active":false,"last_used":1384183476028}}}

Here a region server is live

{"entry":{"org.apache.slider.server.avro.RoleHistoryFooter":{"count":2}}}{"entry":{"org.apache.slider.server.avro.RoleHistoryHeader":{"version":1,"saved":1384183512199,"savedx":"14247c43c87","roles":3}}}
{"entry":{"org.apache.slider.server.avro.NodeEntryRecord":{"host":"192.168.1.85","role":1,"active":true,"last_used":1384183476028}}}
{"entry":{"org.apache.slider.server.avro.NodeEntryRecord":{"host":"192.168.1.85","role":2,"active":true,"last_used":1384183476028}}}

And here, another region server has started. This does not actually change the contents of the file

{"entry":{"org.apache.slider.server.avro.RoleHistoryFooter":{"count":2}}}{"entry":{"org.apache.slider.server.avro.RoleHistoryHeader":{"version":1,"saved":1384183512217,"savedx":"14247c43c99","roles":3}}}
{"entry":{"org.apache.slider.server.avro.NodeEntryRecord":{"host":"192.168.1.85","role":1,"active":true,"last_used":1384183476028}}}
{"entry":{"org.apache.slider.server.avro.NodeEntryRecord":{"host":"192.168.1.85","role":2,"active":true,"last_used":1384183476028}}}

The last_used timestamps will not be changed until the Application Instance is shrunk or restarted, as the active flag being set implies that the server is running both roles at the save time of 1384183512217.

Resolved issues

How best to distinguish at startup time from nodes used recently from nodes used some period before? Should the RoleHistory simply forget about nodes which are older than some threshold when reading in the history?

we just track last used times

Is there a way to avoid tracking the outstanding requests?

No

What will the strategy of picking the most-recently-used node do if that node creates the container and then fails to start it up. Do we need to add blacklisting too? Or actually monitor the container start time, and if a container hasn't been there for very long, don't pick it.

Container Startup failures drop the node from the ready-to-use list; the node is no longer trusted. We don't blacklist it (yet)

Should we prioritize a node that was used for a long session ahead of a node that was used more recently for a shorter session? Maybe, but it complicates selection as generating a strict order of nodes gets significantly harder.

No: you need to start tracking aggregate execution time, for the last session. In a stable state, all servers recorded in the history will have spread the data amongst them, so its irrelevant.