1#!/usr/bin/env python
2
3"""
4Heartbeat server/client to detect soft lockups
5"""
6
7import socket, os, sys, time, getopt
8
9def daemonize(output_file):
10    try:
11        pid = os.fork()
12    except OSError, e:
13        raise Exception, "error %d: %s" % (e.strerror, e.errno)
14
15    if pid:
16        os._exit(0)
17
18    os.umask(0)
19    os.setsid()
20    sys.stdout.flush()
21    sys.stderr.flush()
22
23    if file:
24        output_handle = file(output_file, 'a+', 0)
25        # autoflush stdout/stderr
26        sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
27        sys.stderr = os.fdopen(sys.stderr.fileno(), 'w', 0)
28    else:
29        output_handle = file('/dev/null', 'a+')
30
31    stdin_handle = open('/dev/null', 'r')
32    os.dup2(output_handle.fileno(), sys.stdout.fileno())
33    os.dup2(output_handle.fileno(), sys.stderr.fileno())
34    os.dup2(stdin_handle.fileno(), sys.stdin.fileno())
35
36def recv_all(sock):
37    total_data = []
38    while True:
39        data = sock.recv(1024)
40        if not data:
41            break
42        total_data.append(data)
43    return ''.join(total_data)
44
45def run_server(host, port, daemon, file, queue_size, threshold, drift):
46    if daemon:
47        daemonize(output_file=file)
48    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
49    sock.bind((host, port))
50    sock.listen(queue_size)
51    timeout_interval = threshold * 2
52    prev_check_timestamp = float(time.time())
53    while 1:
54        c_sock, c_addr = sock.accept()
55        heartbeat = recv_all(c_sock)
56        local_timestamp = float(time.time())
57        drift = check_heartbeat(heartbeat, local_timestamp, threshold, check_drift)
58        # NOTE: this doesn't work if the only client is the one that timed
59        # out, but anything more complete would require another thread and
60        # a lock for client_prev_timestamp.
61        if local_timestamp - prev_check_timestamp > threshold * 2.0:
62            check_for_timeouts(threshold, check_drift)
63            prev_check_timestamp = local_timestamp
64        if verbose:
65            if check_drift:
66                print "%.2f: %s (%s)" % (local_timestamp, heartbeat, drift)
67            else:
68                print "%.2f: %s" % (local_timestamp, heartbeat)
69
70def run_client(host, port, daemon, file, interval):
71    if daemon:
72        daemonize(output_file=file)
73    seq = 1
74    while 1:
75        try:
76            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
77            sock.connect((host, port))
78            heartbeat = get_heartbeat(seq)
79            sock.sendall(heartbeat)
80            sock.close()
81            if verbose:
82                print heartbeat
83        except socket.error, (value, message):
84            print "%.2f: ERROR, %d - %s" % (float(time.time()), value, message)
85
86        seq += 1
87        time.sleep(interval)
88
89def get_heartbeat(seq=1):
90    return "%s %06d %.2f" % (hostname, seq, float(time.time()))
91
92def check_heartbeat(heartbeat, local_timestamp, threshold, check_drift):
93    hostname, seq, timestamp = heartbeat.rsplit()
94    timestamp = float(timestamp)
95    if client_prev_timestamp.has_key(hostname):
96        delta = local_timestamp - client_prev_timestamp[hostname]
97        if delta > threshold:
98            print "%.2f: ALERT, SLU detected on host %s, delta %ds" \
99                % (float(time.time()), hostname, delta)
100
101    client_prev_timestamp[hostname] = local_timestamp
102
103    if check_drift:
104        if not client_clock_offset.has_key(hostname):
105            client_clock_offset[hostname] = timestamp - local_timestamp
106            client_prev_drift[hostname] = 0
107        drift = timestamp - local_timestamp - client_clock_offset[hostname]
108        drift_delta = drift - client_prev_drift[hostname]
109        client_prev_drift[hostname] = drift
110        return "drift %+4.2f (%+4.2f)" % (drift, drift_delta)
111
112def check_for_timeouts(threshold, check_drift):
113    local_timestamp = float(time.time())
114    hostname_list = list(client_prev_timestamp)
115    for hostname in hostname_list:
116        timestamp = client_prev_timestamp[hostname]
117        delta = local_timestamp - timestamp
118        if delta > threshold * 2:
119            print "%.2f: ALERT, SLU detected on host %s, no heartbeat for %ds" \
120                % (local_timestamp, hostname, delta)
121            del client_prev_timestamp[hostname]
122            if check_drift:
123                del client_clock_offset[hostname]
124                del client_prev_drift[hostname]
125
126def usage():
127    print """
128Usage:
129
130    heartbeat_slu.py --server --address <bind_address> --port <bind_port>
131                     [--file <output_file>] [--no-daemon] [--verbose]
132                     [--threshold <heartbeat threshold>]
133
134    heartbeat_slu.py --client --address <server_address> -p <server_port>
135                     [--file output_file] [--no-daemon] [--verbose]
136                     [--interval <heartbeat interval in seconds>]
137"""
138
139# host information and global data
140hostname = socket.gethostname()
141client_prev_timestamp = {}
142client_clock_offset = {}
143client_prev_drift = {}
144
145# default param values
146host_port = 9001
147host_address = ''
148interval = 1 # seconds between heartbeats
149threshold = 10 # seconds late till alert
150is_server = False
151is_daemon = True
152file_server = "/tmp/heartbeat_server.out"
153file_client = "/tmp/heartbeat_client.out"
154file_selected = None
155queue_size = 5
156verbose = False
157check_drift = False
158
159# process cmdline opts
160try:
161    opts, args = getopt.getopt(sys.argv[1:], "vhsfd:p:a:i:t:", [
162                    "server", "client", "no-daemon", "address=", "port=",
163                    "file=", "server", "interval=", "threshold=", "verbose",
164                    "check-drift", "help"])
165except getopt.GetoptError, e:
166    print "error: %s" % str(e)
167    usage()
168    exit(1)
169
170for param, value in opts:
171    if param in ["-p", "--port"]:
172        host_port = int(value)
173    elif param in ["-a", "--address"]:
174        host_address = value
175    elif param in ["-s", "--server"]:
176        is_server = True
177    elif param in ["-c", "--client"]:
178        is_server = False
179    elif param in ["--no-daemon"]:
180        is_daemon = False
181    elif param in ["-f", "--file"]:
182        file_selected = value
183    elif param in ["-i", "--interval"]:
184        interval = int(value)
185    elif param in ["-t", "--threshold"]:
186        threshold = int(value)
187    elif param in ["-d", "--check-drift"]:
188        check_drift = True
189    elif param in ["-v", "--verbose"]:
190        verbose = True
191    elif param in ["-h", "--help"]:
192        usage()
193        exit(0)
194    else:
195        print "error: unrecognized option: %s" % value
196        usage()
197        exit(1)
198
199# run until we're terminated
200if is_server:
201    file_server = file_selected or file_server
202    run_server(host_address, host_port, is_daemon, file_server, queue_size, threshold, check_drift)
203else:
204    file_client = file_selected or file_client
205    run_client(host_address, host_port, is_daemon, file_client, interval)
206