barrier.py revision 02fc0b9199aa71c7f801aee4fcd4b86513e8facb
1import sys, socket, errno, logging
2from time import time, sleep
3from autotest_lib.client.common_lib import error
4
5# default barrier port
6_DEFAULT_PORT = 11922
7
8
9class BarrierAbortError(error.BarrierError):
10    """Special BarrierError raised when an explicit abort is requested."""
11
12
13class listen_server(object):
14    """
15    Manages a listening socket for barrier.
16
17    Can be used to run multiple barrier instances with the same listening
18    socket (if they were going to listen on the same port).
19
20    Attributes:
21
22    @attr address: Address to bind to (string).
23    @attr port: Port to bind to.
24    @attr socket: Listening socket object.
25    """
26    def __init__(self, address='', port=_DEFAULT_PORT):
27        """
28        Create a listen_server instance for the given address/port.
29
30        @param address: The address to listen on.
31        @param port: The port to listen on.
32        """
33        self.address = address
34        self.port = port
35        self.socket = self._setup()
36
37
38    def _setup(self):
39        """Create, bind and listen on the listening socket."""
40        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
41        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
42        sock.bind((self.address, self.port))
43        sock.listen(10)
44
45        return sock
46
47
48    def close(self):
49        """Close the listening socket."""
50        self.socket.close()
51
52
53class barrier(object):
54    """Multi-machine barrier support.
55
56    Provides multi-machine barrier mechanism.
57    Execution stops until all members arrive at the barrier.
58
59    Implementation Details:
60    .......................
61
62    When a barrier is forming the master node (first in sort order) in the
63    set accepts connections from each member of the set.  As they arrive
64    they indicate the barrier they are joining and their identifier (their
65    hostname or IP address and optional tag).  They are then asked to wait.
66    When all members are present the master node then checks that each
67    member is still responding via a ping/pong exchange.  If this is
68    successful then everyone has checked in at the barrier.  We then tell
69    everyone they may continue via a rlse message.
70
71    Where the master is not the first to reach the barrier the client
72    connects will fail.  Client will retry until they either succeed in
73    connecting to master or the overall timeout is exceeded.
74
75    As an example here is the exchange for a three node barrier called
76    'TAG'
77
78      MASTER                        CLIENT1         CLIENT2
79        <-------------TAG C1-------------
80        --------------wait-------------->
81                      [...]
82        <-------------TAG C2-----------------------------
83        --------------wait------------------------------>
84                      [...]
85        --------------ping-------------->
86        <-------------pong---------------
87        --------------ping------------------------------>
88        <-------------pong-------------------------------
89                ----- BARRIER conditions MET -----
90        --------------rlse-------------->
91        --------------rlse------------------------------>
92
93    Note that once the last client has responded to pong the barrier is
94    implicitly deemed satisifed, they have all acknowledged their presence.
95    If we fail to send any of the rlse messages the barrier is still a
96    success, the failed host has effectively broken 'right at the beginning'
97    of the post barrier execution window.
98
99    In addition, there is another rendezvous, that makes each slave a server
100    and the master a client.  The connection process and usage is still the
101    same but allows barriers from machines that only have a one-way
102    connection initiation.  This is called rendezvous_servers.
103
104    For example:
105        if ME == SERVER:
106            server start
107
108        b = job.barrier(ME, 'server-up', 120)
109        b.rendezvous(CLIENT, SERVER)
110
111        if ME == CLIENT:
112            client run
113
114        b = job.barrier(ME, 'test-complete', 3600)
115        b.rendezvous(CLIENT, SERVER)
116
117        if ME == SERVER:
118            server stop
119
120    Any client can also request an abort of the job by setting
121    abort=True in the rendezvous arguments.
122    """
123
124    def __init__(self, hostid, tag, timeout=None, port=None,
125                 listen_server=None):
126        """
127        @param hostid: My hostname/IP address + optional tag.
128        @param tag: Symbolic name of the barrier in progress.
129        @param timeout: Maximum seconds to wait for a the barrier to meet.
130        @param port: Port number to listen on.
131        @param listen_server: External listen_server instance to use instead
132                of creating our own.  Create a listen_server instance and
133                reuse it across multiple barrier instances so that the
134                barrier code doesn't try to quickly re-bind on the same port
135                (packets still in transit for the previous barrier they may
136                reset new connections).
137        """
138        self._hostid = hostid
139        self._tag = tag
140        if listen_server:
141            if port:
142                raise error.BarrierError(
143                        '"port" and "listen_server" are mutually exclusive.')
144            self._port = listen_server.port
145        else:
146            self._port = port or _DEFAULT_PORT
147        self._server = listen_server  # A listen_server instance or None.
148        self._members = []  # List of hosts we expect to find at the barrier.
149        self._timeout_secs = timeout
150        self._start_time = None  # Timestamp of when we started waiting.
151        self._masterid = None  # Host/IP + optional tag of selected master.
152        logging.info("tag=%s port=%d timeout=%r",
153                     self._tag, self._port, self._timeout_secs)
154
155        # Number of clients seen (should be the length of self._waiting).
156        self._seen = 0
157
158        # Clients who have checked in and are waiting (if we are a master).
159        self._waiting = {}  # Maps from hostname -> (client, addr) tuples.
160
161
162    def _get_host_from_id(self, hostid):
163        # Remove any trailing local identifier following a #.
164        # This allows multiple members per host which is particularly
165        # helpful in testing.
166        if not hostid.startswith('#'):
167            return hostid.split('#')[0]
168        else:
169            raise error.BarrierError(
170                    "Invalid Host id: Host Address should be specified")
171
172
173    def _update_timeout(self, timeout):
174        if timeout is not None and self._start_time is not None:
175            self._timeout_secs = (time() - self._start_time) + timeout
176        else:
177            self._timeout_secs = timeout
178
179
180    def _remaining(self):
181        if self._timeout_secs is not None and self._start_time is not None:
182            timeout = self._timeout_secs - (time() - self._start_time)
183            if timeout <= 0:
184                errmsg = "timeout waiting for barrier: %s" % self._tag
185                logging.error(error)
186                raise error.BarrierError(errmsg)
187        else:
188            timeout = self._timeout_secs
189
190        if self._timeout_secs is not None:
191            logging.info("seconds remaining: %d", timeout)
192        return timeout
193
194
195    def _master_welcome(self, connection):
196        client, addr = connection
197        name = None
198
199        client.settimeout(5)
200        try:
201            # Get the clients name.
202            intro = client.recv(1024)
203            intro = intro.strip("\r\n")
204
205            intro_parts = intro.split(' ', 2)
206            if len(intro_parts) != 2:
207                logging.warn("Ignoring invalid data from %s: %r",
208                             client.getpeername(), intro)
209                client.close()
210                return
211            tag, name = intro_parts
212
213            logging.info("new client tag=%s, name=%s", tag, name)
214
215            # Ok, we know who is trying to attach.  Confirm that
216            # they are coming to the same meeting.  Also, everyone
217            # should be using a unique handle (their IP address).
218            # If we see a duplicate, something _bad_ has happened
219            # so drop them now.
220            if self._tag != tag:
221                logging.warn("client arriving for the wrong barrier: %s != %s",
222                             self._tag, tag)
223                client.settimeout(5)
224                client.send("!tag")
225                client.close()
226                return
227            elif name in self._waiting:
228                logging.warn("duplicate client")
229                client.settimeout(5)
230                client.send("!dup")
231                client.close()
232                return
233
234            # Acknowledge the client
235            client.send("wait")
236
237        except socket.timeout:
238            # This is nominally an error, but as we do not know
239            # who that was we cannot do anything sane other
240            # than report it and let the normal timeout kill
241            # us when thats appropriate.
242            logging.warn("client handshake timeout: (%s:%d)",
243                         addr[0], addr[1])
244            client.close()
245            return
246
247        logging.info("client now waiting: %s (%s:%d)",
248                     name, addr[0], addr[1])
249
250        # They seem to be valid record them.
251        self._waiting[name] = connection
252        self._seen += 1
253
254
255    def _slave_hello(self, connection):
256        (client, addr) = connection
257        name = None
258
259        client.settimeout(5)
260        try:
261            client.send(self._tag + " " + self._hostid)
262
263            reply = client.recv(4)
264            reply = reply.strip("\r\n")
265            logging.info("master said: %s", reply)
266
267            # Confirm the master accepted the connection.
268            if reply != "wait":
269                logging.warn("Bad connection request to master")
270                client.close()
271                return
272
273        except socket.timeout:
274            # This is nominally an error, but as we do not know
275            # who that was we cannot do anything sane other
276            # than report it and let the normal timeout kill
277            # us when thats appropriate.
278            logging.error("master handshake timeout: (%s:%d)",
279                          addr[0], addr[1])
280            client.close()
281            return
282
283        logging.info("slave now waiting: (%s:%d)", addr[0], addr[1])
284
285        # They seem to be valid record them.
286        self._waiting[self._hostid] = connection
287        self._seen = 1
288
289
290    def _master_release(self):
291        # Check everyone is still there, that they have not
292        # crashed or disconnected in the meantime.
293        allpresent = True
294        abort = self._abort
295        for name in self._waiting:
296            (client, addr) = self._waiting[name]
297
298            logging.info("checking client present: %s", name)
299
300            client.settimeout(5)
301            reply = 'none'
302            try:
303                client.send("ping")
304                reply = client.recv(1024)
305            except socket.timeout:
306                logging.warn("ping/pong timeout: %s", name)
307                pass
308
309            if reply == 'abrt':
310                logging.warn("Client %s requested abort", name)
311                abort = True
312            elif reply != "pong":
313                allpresent = False
314
315        if not allpresent:
316            raise error.BarrierError("master lost client")
317
318        if abort:
319            logging.info("Aborting the clients")
320            msg = 'abrt'
321        else:
322            logging.info("Releasing clients")
323            msg = 'rlse'
324
325        # If every ones checks in then commit the release.
326        for name in self._waiting:
327            (client, addr) = self._waiting[name]
328
329            client.settimeout(5)
330            try:
331                client.send(msg)
332            except socket.timeout:
333                logging.warn("release timeout: %s", name)
334                pass
335
336        if abort:
337            raise BarrierAbortError("Client requested abort")
338
339
340    def _waiting_close(self):
341        # Either way, close out all the clients.  If we have
342        # not released them then they know to abort.
343        for name in self._waiting:
344            (client, addr) = self._waiting[name]
345
346            logging.info("closing client: %s", name)
347
348            try:
349                client.close()
350            except:
351                pass
352
353
354    def _run_server(self, is_master):
355        server = self._server or listen_server(port=self._port)
356        failed = 0
357        try:
358            while True:
359                try:
360                    # Wait for callers welcoming each.
361                    server.socket.settimeout(self._remaining())
362                    connection = server.socket.accept()
363                    if is_master:
364                        self._master_welcome(connection)
365                    else:
366                        self._slave_hello(connection)
367                except socket.timeout:
368                    logging.warn("timeout waiting for remaining clients")
369                    pass
370
371                if is_master:
372                    # Check if everyone is here.
373                    logging.info("master seen %d of %d",
374                                 self._seen, len(self._members))
375                    if self._seen == len(self._members):
376                        self._master_release()
377                        break
378                else:
379                    # Check if master connected.
380                    if self._seen:
381                        logging.info("slave connected to master")
382                        self._slave_wait()
383                        break
384        finally:
385            self._waiting_close()
386            # if we created the listening_server in the beginning of this
387            # function then close the listening socket here
388            if not self._server:
389                server.close()
390
391
392    def _run_client(self, is_master):
393        while self._remaining() is None or self._remaining() > 0:
394            try:
395                remote = socket.socket(socket.AF_INET,
396                        socket.SOCK_STREAM)
397                remote.settimeout(30)
398                if is_master:
399                    # Connect to all slaves.
400                    host = self._get_host_from_id(self._members[self._seen])
401                    logging.info("calling slave: %s", host)
402                    connection = (remote, (host, self._port))
403                    remote.connect(connection[1])
404                    self._master_welcome(connection)
405                else:
406                    # Just connect to the master.
407                    host = self._get_host_from_id(self._masterid)
408                    logging.info("calling master")
409                    connection = (remote, (host, self._port))
410                    remote.connect(connection[1])
411                    self._slave_hello(connection)
412            except socket.timeout:
413                logging.warn("timeout calling host, retry")
414                sleep(10)
415                pass
416            except socket.error, err:
417                (code, str) = err
418                if (code != errno.ECONNREFUSED):
419                    raise
420                sleep(10)
421
422            if is_master:
423                # Check if everyone is here.
424                logging.info("master seen %d of %d",
425                             self._seen, len(self._members))
426                if self._seen == len(self._members):
427                    self._master_release()
428                    break
429            else:
430                # Check if master connected.
431                if self._seen:
432                    logging.info("slave connected to master")
433                    self._slave_wait()
434                    break
435
436        self._waiting_close()
437
438
439    def _slave_wait(self):
440        remote = self._waiting[self._hostid][0]
441        mode = "wait"
442        while True:
443            # All control messages are the same size to allow
444            # us to split individual messages easily.
445            remote.settimeout(self._remaining())
446            reply = remote.recv(4)
447            if not reply:
448                break
449
450            reply = reply.strip("\r\n")
451            logging.info("master said: %s", reply)
452
453            mode = reply
454            if reply == "ping":
455                # Ensure we have sufficient time for the
456                # ping/pong/rlse cyle to complete normally.
457                self._update_timeout(10 + 10 * len(self._members))
458
459                if self._abort:
460                    msg = "abrt"
461                else:
462                    msg = "pong"
463                logging.info(msg)
464                remote.settimeout(self._remaining())
465                remote.send(msg)
466
467            elif reply == "rlse" or reply == "abrt":
468                # Ensure we have sufficient time for the
469                # ping/pong/rlse cyle to complete normally.
470                self._update_timeout(10 + 10 * len(self._members))
471
472                logging.info("was released, waiting for close")
473
474        if mode == "rlse":
475            pass
476        elif mode == "wait":
477            raise error.BarrierError("master abort -- barrier timeout")
478        elif mode == "ping":
479            raise error.BarrierError("master abort -- client lost")
480        elif mode == "!tag":
481            raise error.BarrierError("master abort -- incorrect tag")
482        elif mode == "!dup":
483            raise error.BarrierError("master abort -- duplicate client")
484        elif mode == "abrt":
485            raise BarrierAbortError("Client requested abort")
486        else:
487            raise error.BarrierError("master handshake failure: " + mode)
488
489
490    def rendezvous(self, *hosts, **dargs):
491        # if called with abort=True, this will raise an exception
492        # on all the clients.
493        self._start_time = time()
494        self._members = list(hosts)
495        self._members.sort()
496        self._masterid = self._members.pop(0)
497        self._abort = dargs.get('abort', False)
498
499        logging.info("masterid: %s", self._masterid)
500        if self._abort:
501            logging.debug("%s is aborting", self._hostid)
502        if not len(self._members):
503            logging.info("No other members listed.")
504            return
505        logging.info("members: %s", ",".join(self._members))
506
507        self._seen = 0
508        self._waiting = {}
509
510        # Figure out who is the master in this barrier.
511        if self._hostid == self._masterid:
512            logging.info("selected as master")
513            self._run_server(is_master=True)
514        else:
515            logging.info("selected as slave")
516            self._run_client(is_master=False)
517
518
519    def rendezvous_servers(self, masterid, *hosts, **dargs):
520        # if called with abort=True, this will raise an exception
521        # on all the clients.
522        self._start_time = time()
523        self._members = list(hosts)
524        self._members.sort()
525        self._masterid = masterid
526        self._abort = dargs.get('abort', False)
527
528        logging.info("masterid: %s", self._masterid)
529        if not len(self._members):
530            logging.info("No other members listed.")
531            return
532        logging.info("members: %s", ",".join(self._members))
533
534        self._seen = 0
535        self._waiting = {}
536
537        # Figure out who is the master in this barrier.
538        if self._hostid == self._masterid:
539            logging.info("selected as master")
540            self._run_client(is_master=True)
541        else:
542            logging.info("selected as slave")
543            self._run_server(is_master=False)
544