#! /usr/bin/env python # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # This script will check the configuration and uniformity of all the nodes in a cluster. # Checks # each node is reachable via ssh # login identity is the same # the physical memory is the same # the mounts are the same on each machine # a set of writable locations (typically different disks) are in fact writable # # In order to check for writable partitions, you must configure the WRITABLE variable below. # import subprocess import time import select import os import sys import fcntl import signal if not sys.platform.startswith('linux'): sys.stderr.write('This script only works on linux, sorry.\n') sys.exit(1) TIMEOUT = 5 WRITABLE = [] #WRITABLE = ['/srv/hdfs1', '/srv/hdfs2', '/srv/hdfs3'] def ssh(slave, *args): 'execute a command on a remote slave and return the Popen handle' handle = subprocess.Popen( ('ssh', '-o', 'StrictHostKeyChecking=no', '-q', '-A', '-n', slave) + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) handle.slave = slave handle.finished = False handle.out = '' return handle def wait(handles, seconds): 'wait for lots of handles simultaneously, and kill anything that doesn\'t return in seconds time\n' 'Note that stdout will be stored on the handle as the "out" field and "finished" will be set to True' handles = handles[:] stop = time.time() + seconds for h in handles: fcntl.fcntl(h.stdout, fcntl.F_SETFL, os.O_NONBLOCK) while handles and time.time() < stop: wait = min(0, stop - time.time()) handleMap = dict( [(h.stdout, h) for h in handles] ) rd, wr, err = select.select(handleMap.keys(), [], [], wait) for r in rd: handle = handleMap[r] while 1: more = handle.stdout.read(1024) if more == '': handles.remove(handle) handle.poll() handle.wait() handle.finished = True handle.out += more if len(more) < 1024: break for handle in handles: os.kill(handle.pid, signal.SIGKILL) handle.poll() def runAll(slaves, *cmd): 'Run the given command on all the slaves, returns Popen handles' handles = [] for slave in slaves: handles.append(ssh(slave, *cmd)) wait(handles, TIMEOUT) return handles def checkIdentity(slaves): 'Ensure the login identity is consistent across the slaves' handles = runAll(slaves, 'id', '-u', '-n') bad = set() myIdentity = os.popen('id -u -n').read().strip() for h in handles: if not h.finished or h.returncode != 0: print '#', 'cannot look at identity on', h.slave bad.add(h.slave) else: identity = h.out.strip() if identity != myIdentity: print '#', h.slave, 'inconsistent identity', identity bad.add(h.slave) return bad def checkMemory(slaves): 'Run free on all slaves and look for weird results' handles = runAll(slaves, 'free') bad = set() mem = {} swap = {} for h in handles: if not h.finished or h.returncode != 0: print '#', 'cannot look at memory on', h.slave bad.add(h.slave) else: if h.out.find('Swap:') < 0: print '#',h.slave,'has no swap' bad.add(h.slave) continue lines = h.out.split('\n') for line in lines: if line.startswith('Mem:'): mem.setdefault(line.split()[1],set()).add(h.slave) if line.startswith('Swap:'): swap.setdefault(line.split()[1],set()).add(h.slave) # order memory sizes by most common mems = sorted([(len(v), k, v) for k, v in mem.items()], reverse=True) mostCommon = float(mems[0][1]) for _, size, slaves in mems[1:]: fract = abs(mostCommon - float(size)) / mostCommon if fract > 0.05: print '#',', '.join(slaves), ': unusual memory size', size bad.update(slaves) swaps = sorted([(len(v), k, v) for k, v in swap.items()], reverse=True) mostCommon = float(mems[0][1]) for _, size, slaves in swaps[1:]: fract = abs(mostCommon - float(size) / mostCommon) if fract > 0.05: print '#',', '.join(slaves), ': unusual swap size', size bad.update(slaves) return bad def checkWritable(slaves): 'Touch all the directories that should be writable by this user return any nodes that fail' if not WRITABLE: print '# WRITABLE value not configured, not checking partitions' return [] handles = runAll(slaves, 'touch', *WRITABLE) bad = set() for h in handles: if not h.finished or h.returncode != 0: bad.add(h.slave) print '#', h.slave, 'some drives are not writable' return bad def checkMounts(slaves): 'Check the file systems that are mounted and report any that are unusual' handles = runAll(slaves, 'mount') mounts = {} finished = set() bad = set() for handle in handles: if handle.finished and handle.returncode == 0: for line in handle.out.split('\n'): words = line.split() if len(words) < 5: continue if words[4] == 'nfs': continue if words[0].find(':/') >= 0: continue mount = words[2] mounts.setdefault(mount, set()).add(handle.slave) finished.add(handle.slave) else: bad.add(handle.slave) print '#', handle.slave, 'did not finish' for m in sorted(mounts.keys()): diff = finished - mounts[m] if diff: bad.update(diff) print '#', m, 'not mounted on', ', '.join(diff) return bad def main(argv): if len(argv) < 1: sys.stderr.write('Usage: check_slaves slaves\n') sys.exit(1) sys.stdin.close() slaves = set() for slave in open(argv[0]): hashPos = slave.find('#') if hashPos >= 0: slave = slave[:hashPos] slave = slave.strip() if not slave: continue slaves.add(slave) bad = set() for test in checkIdentity, checkMemory, checkMounts, checkWritable: bad.update(test(slaves - bad)) for slave in sorted(slaves - bad): print slave main(sys.argv[1:])