1import sys, socket, errno, logging
2from time import time, sleep
3from autotest_lib.client.common_lib import error, utils
4
5# default barrier port
6_DEFAULT_PORT = 11922
7
8def _get_host_from_id(hostid):
9    # Remove any trailing local identifier following a #.
10    # This allows multiple members per host which is particularly
11    # helpful in testing.
12    if not hostid.startswith('#'):
13        return hostid.split('#')[0]
14    else:
15        raise error.BarrierError(
16            "Invalid Host id: Host Address should be specified")
17
18
19class BarrierAbortError(error.BarrierError):
20    """Special BarrierError raised when an explicit abort is requested."""
21
22
23class listen_server(object):
24    """
25    Manages a listening socket for barrier.
26
27    Can be used to run multiple barrier instances with the same listening
28    socket (if they were going to listen on the same port).
29
30    Attributes:
31
32    @attr address: Address to bind to (string).
33    @attr port: Port to bind to.
34    @attr socket: Listening socket object.
35    """
36    def __init__(self, address='', port=_DEFAULT_PORT):
37        """
38        Create a listen_server instance for the given address/port.
39
40        @param address: The address to listen on.
41        @param port: The port to listen on.
42        """
43        self.address = address
44        self.port = port
45        # Open the port so that the listening server can accept incoming
46        # connections.
47        utils.run('iptables -A INPUT -p tcp -m tcp --dport %d -j ACCEPT' %
48                  port)
49        self.socket = self._setup()
50
51
52    def _setup(self):
53        """Create, bind and listen on the listening socket."""
54        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
55        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
56        sock.bind((self.address, self.port))
57        sock.listen(10)
58
59        return sock
60
61
62    def close(self):
63        """Close the listening socket."""
64        self.socket.close()
65
66
67class barrier(object):
68    """Multi-machine barrier support.
69
70    Provides multi-machine barrier mechanism.
71    Execution stops until all members arrive at the barrier.
72
73    Implementation Details:
74    .......................
75
76    When a barrier is forming the master node (first in sort order) in the
77    set accepts connections from each member of the set.  As they arrive
78    they indicate the barrier they are joining and their identifier (their
79    hostname or IP address and optional tag).  They are then asked to wait.
80    When all members are present the master node then checks that each
81    member is still responding via a ping/pong exchange.  If this is
82    successful then everyone has checked in at the barrier.  We then tell
83    everyone they may continue via a rlse message.
84
85    Where the master is not the first to reach the barrier the client
86    connects will fail.  Client will retry until they either succeed in
87    connecting to master or the overall timeout is exceeded.
88
89    As an example here is the exchange for a three node barrier called
90    'TAG'
91
92      MASTER                        CLIENT1         CLIENT2
93        <-------------TAG C1-------------
94        --------------wait-------------->
95                      [...]
96        <-------------TAG C2-----------------------------
97        --------------wait------------------------------>
98                      [...]
99        --------------ping-------------->
100        <-------------pong---------------
101        --------------ping------------------------------>
102        <-------------pong-------------------------------
103                ----- BARRIER conditions MET -----
104        --------------rlse-------------->
105        --------------rlse------------------------------>
106
107    Note that once the last client has responded to pong the barrier is
108    implicitly deemed satisifed, they have all acknowledged their presence.
109    If we fail to send any of the rlse messages the barrier is still a
110    success, the failed host has effectively broken 'right at the beginning'
111    of the post barrier execution window.
112
113    In addition, there is another rendezvous, that makes each slave a server
114    and the master a client.  The connection process and usage is still the
115    same but allows barriers from machines that only have a one-way
116    connection initiation.  This is called rendezvous_servers.
117
118    For example:
119        if ME == SERVER:
120            server start
121
122        b = job.barrier(ME, 'server-up', 120)
123        b.rendezvous(CLIENT, SERVER)
124
125        if ME == CLIENT:
126            client run
127
128        b = job.barrier(ME, 'test-complete', 3600)
129        b.rendezvous(CLIENT, SERVER)
130
131        if ME == SERVER:
132            server stop
133
134    Any client can also request an abort of the job by setting
135    abort=True in the rendezvous arguments.
136    """
137
138    def __init__(self, hostid, tag, timeout=None, port=None,
139                 listen_server=None):
140        """
141        @param hostid: My hostname/IP address + optional tag.
142        @param tag: Symbolic name of the barrier in progress.
143        @param timeout: Maximum seconds to wait for a the barrier to meet.
144        @param port: Port number to listen on.
145        @param listen_server: External listen_server instance to use instead
146                of creating our own.  Create a listen_server instance and
147                reuse it across multiple barrier instances so that the
148                barrier code doesn't try to quickly re-bind on the same port
149                (packets still in transit for the previous barrier they may
150                reset new connections).
151        """
152        self._hostid = hostid
153        self._tag = tag
154        if listen_server:
155            if port:
156                raise error.BarrierError(
157                        '"port" and "listen_server" are mutually exclusive.')
158            self._port = listen_server.port
159        else:
160            self._port = port or _DEFAULT_PORT
161        self._server = listen_server  # A listen_server instance or None.
162        self._members = []  # List of hosts we expect to find at the barrier.
163        self._timeout_secs = timeout
164        self._start_time = None  # Timestamp of when we started waiting.
165        self._masterid = None  # Host/IP + optional tag of selected master.
166        logging.info("tag=%s port=%d timeout=%r",
167                     self._tag, self._port, self._timeout_secs)
168
169        # Number of clients seen (should be the length of self._waiting).
170        self._seen = 0
171
172        # Clients who have checked in and are waiting (if we are a master).
173        self._waiting = {}  # Maps from hostname -> (client, addr) tuples.
174
175
176    def _update_timeout(self, timeout):
177        if timeout is not None and self._start_time is not None:
178            self._timeout_secs = (time() - self._start_time) + timeout
179        else:
180            self._timeout_secs = timeout
181
182
183    def _remaining(self):
184        if self._timeout_secs is not None and self._start_time is not None:
185            timeout = self._timeout_secs - (time() - self._start_time)
186            if timeout <= 0:
187                errmsg = "timeout waiting for barrier: %s" % self._tag
188                logging.error(error)
189                raise error.BarrierError(errmsg)
190        else:
191            timeout = self._timeout_secs
192
193        if self._timeout_secs is not None:
194            logging.info("seconds remaining: %d", timeout)
195        return timeout
196
197
198    def _master_welcome(self, connection):
199        client, addr = connection
200        name = None
201
202        client.settimeout(5)
203        try:
204            # Get the clients name.
205            intro = client.recv(1024)
206            intro = intro.strip("\r\n")
207
208            intro_parts = intro.split(' ', 2)
209            if len(intro_parts) != 2:
210                logging.warning("Ignoring invalid data from %s: %r",
211                             client.getpeername(), intro)
212                client.close()
213                return
214            tag, name = intro_parts
215
216            logging.info("new client tag=%s, name=%s", tag, name)
217
218            # Ok, we know who is trying to attach.  Confirm that
219            # they are coming to the same meeting.  Also, everyone
220            # should be using a unique handle (their IP address).
221            # If we see a duplicate, something _bad_ has happened
222            # so drop them now.
223            if self._tag != tag:
224                logging.warning("client arriving for the wrong barrier: %s != %s",
225                             self._tag, tag)
226                client.settimeout(5)
227                client.send("!tag")
228                client.close()
229                return
230            elif name in self._waiting:
231                logging.warning("duplicate client")
232                client.settimeout(5)
233                client.send("!dup")
234                client.close()
235                return
236
237            # Acknowledge the client
238            client.send("wait")
239
240        except socket.timeout:
241            # This is nominally an error, but as we do not know
242            # who that was we cannot do anything sane other
243            # than report it and let the normal timeout kill
244            # us when thats appropriate.
245            logging.warning("client handshake timeout: (%s:%d)",
246                         addr[0], addr[1])
247            client.close()
248            return
249
250        logging.info("client now waiting: %s (%s:%d)",
251                     name, addr[0], addr[1])
252
253        # They seem to be valid record them.
254        self._waiting[name] = connection
255        self._seen += 1
256
257
258    def _slave_hello(self, connection):
259        (client, addr) = connection
260        name = None
261
262        client.settimeout(5)
263        try:
264            client.send(self._tag + " " + self._hostid)
265
266            reply = client.recv(4)
267            reply = reply.strip("\r\n")
268            logging.info("master said: %s", reply)
269
270            # Confirm the master accepted the connection.
271            if reply != "wait":
272                logging.warning("Bad connection request to master")
273                client.close()
274                return
275
276        except socket.timeout:
277            # This is nominally an error, but as we do not know
278            # who that was we cannot do anything sane other
279            # than report it and let the normal timeout kill
280            # us when thats appropriate.
281            logging.error("master handshake timeout: (%s:%d)",
282                          addr[0], addr[1])
283            client.close()
284            return
285
286        logging.info("slave now waiting: (%s:%d)", addr[0], addr[1])
287
288        # They seem to be valid record them.
289        self._waiting[self._hostid] = connection
290        self._seen = 1
291
292
293    def _master_release(self):
294        # Check everyone is still there, that they have not
295        # crashed or disconnected in the meantime.
296        allpresent = True
297        abort = self._abort
298        for name in self._waiting:
299            (client, addr) = self._waiting[name]
300
301            logging.info("checking client present: %s", name)
302
303            client.settimeout(5)
304            reply = 'none'
305            try:
306                client.send("ping")
307                reply = client.recv(1024)
308            except socket.timeout:
309                logging.warning("ping/pong timeout: %s", name)
310                pass
311
312            if reply == 'abrt':
313                logging.warning("Client %s requested abort", name)
314                abort = True
315            elif reply != "pong":
316                allpresent = False
317
318        if not allpresent:
319            raise error.BarrierError("master lost client")
320
321        if abort:
322            logging.info("Aborting the clients")
323            msg = 'abrt'
324        else:
325            logging.info("Releasing clients")
326            msg = 'rlse'
327
328        # If every ones checks in then commit the release.
329        for name in self._waiting:
330            (client, addr) = self._waiting[name]
331
332            client.settimeout(5)
333            try:
334                client.send(msg)
335            except socket.timeout:
336                logging.warning("release timeout: %s", name)
337                pass
338
339        if abort:
340            raise BarrierAbortError("Client requested abort")
341
342
343    def _waiting_close(self):
344        # Either way, close out all the clients.  If we have
345        # not released them then they know to abort.
346        for name in self._waiting:
347            (client, addr) = self._waiting[name]
348
349            logging.info("closing client: %s", name)
350
351            try:
352                client.close()
353            except:
354                pass
355
356
357    def _run_server(self, is_master):
358        server = self._server or listen_server(port=self._port)
359        failed = 0
360        try:
361            while True:
362                try:
363                    # Wait for callers welcoming each.
364                    server.socket.settimeout(self._remaining())
365                    connection = server.socket.accept()
366                    if is_master:
367                        self._master_welcome(connection)
368                    else:
369                        self._slave_hello(connection)
370                except socket.timeout:
371                    logging.warning("timeout waiting for remaining clients")
372                    pass
373
374                if is_master:
375                    # Check if everyone is here.
376                    logging.info("master seen %d of %d",
377                                 self._seen, len(self._members))
378                    if self._seen == len(self._members):
379                        self._master_release()
380                        break
381                else:
382                    # Check if master connected.
383                    if self._seen:
384                        logging.info("slave connected to master")
385                        self._slave_wait()
386                        break
387        finally:
388            self._waiting_close()
389            # if we created the listening_server in the beginning of this
390            # function then close the listening socket here
391            if not self._server:
392                server.close()
393
394
395    def _run_client(self, is_master):
396        while self._remaining() is None or self._remaining() > 0:
397            try:
398                remote = socket.socket(socket.AF_INET,
399                        socket.SOCK_STREAM)
400                remote.settimeout(30)
401                if is_master:
402                    # Connect to all slaves.
403                    host = _get_host_from_id(self._members[self._seen])
404                    logging.info("calling slave: %s", host)
405                    connection = (remote, (host, self._port))
406                    remote.connect(connection[1])
407                    self._master_welcome(connection)
408                else:
409                    # Just connect to the master.
410                    host = _get_host_from_id(self._masterid)
411                    logging.info("calling master")
412                    connection = (remote, (host, self._port))
413                    remote.connect(connection[1])
414                    self._slave_hello(connection)
415            except socket.timeout:
416                logging.warning("timeout calling host, retry")
417                sleep(10)
418                pass
419            except socket.error, err:
420                (code, str) = err
421                if (code != errno.ECONNREFUSED and
422                    code != errno.ETIMEDOUT):
423                    raise
424                sleep(10)
425
426            if is_master:
427                # Check if everyone is here.
428                logging.info("master seen %d of %d",
429                             self._seen, len(self._members))
430                if self._seen == len(self._members):
431                    self._master_release()
432                    break
433            else:
434                # Check if master connected.
435                if self._seen:
436                    logging.info("slave connected to master")
437                    self._slave_wait()
438                    break
439
440        self._waiting_close()
441
442
443    def _slave_wait(self):
444        remote = self._waiting[self._hostid][0]
445        mode = "wait"
446        while True:
447            # All control messages are the same size to allow
448            # us to split individual messages easily.
449            remote.settimeout(self._remaining())
450            reply = remote.recv(4)
451            if not reply:
452                break
453
454            reply = reply.strip("\r\n")
455            logging.info("master said: %s", reply)
456
457            mode = reply
458            if reply == "ping":
459                # Ensure we have sufficient time for the
460                # ping/pong/rlse cyle to complete normally.
461                self._update_timeout(10 + 10 * len(self._members))
462
463                if self._abort:
464                    msg = "abrt"
465                else:
466                    msg = "pong"
467                logging.info(msg)
468                remote.settimeout(self._remaining())
469                remote.send(msg)
470
471            elif reply == "rlse" or reply == "abrt":
472                # Ensure we have sufficient time for the
473                # ping/pong/rlse cyle to complete normally.
474                self._update_timeout(10 + 10 * len(self._members))
475
476                logging.info("was released, waiting for close")
477
478        if mode == "rlse":
479            pass
480        elif mode == "wait":
481            raise error.BarrierError("master abort -- barrier timeout")
482        elif mode == "ping":
483            raise error.BarrierError("master abort -- client lost")
484        elif mode == "!tag":
485            raise error.BarrierError("master abort -- incorrect tag")
486        elif mode == "!dup":
487            raise error.BarrierError("master abort -- duplicate client")
488        elif mode == "abrt":
489            raise BarrierAbortError("Client requested abort")
490        else:
491            raise error.BarrierError("master handshake failure: " + mode)
492
493
494    def rendezvous(self, *hosts, **dargs):
495        # if called with abort=True, this will raise an exception
496        # on all the clients.
497        self._start_time = time()
498        self._members = list(hosts)
499        self._members.sort()
500        self._masterid = self._members.pop(0)
501        self._abort = dargs.get('abort', False)
502
503        logging.info("masterid: %s", self._masterid)
504        if self._abort:
505            logging.debug("%s is aborting", self._hostid)
506        if not len(self._members):
507            logging.info("No other members listed.")
508            return
509        logging.info("members: %s", ",".join(self._members))
510
511        self._seen = 0
512        self._waiting = {}
513
514        # Figure out who is the master in this barrier.
515        if self._hostid == self._masterid:
516            logging.info("selected as master")
517            self._run_server(is_master=True)
518        else:
519            logging.info("selected as slave")
520            self._run_client(is_master=False)
521
522
523    def rendezvous_servers(self, masterid, *hosts, **dargs):
524        # if called with abort=True, this will raise an exception
525        # on all the clients.
526        self._start_time = time()
527        self._members = list(hosts)
528        self._members.sort()
529        self._masterid = masterid
530        self._abort = dargs.get('abort', False)
531
532        logging.info("masterid: %s", self._masterid)
533        if not len(self._members):
534            logging.info("No other members listed.")
535            return
536        logging.info("members: %s", ",".join(self._members))
537
538        self._seen = 0
539        self._waiting = {}
540
541        # Figure out who is the master in this barrier.
542        if self._hostid == self._masterid:
543            logging.info("selected as master")
544            self._run_client(is_master=True)
545        else:
546            logging.info("selected as slave")
547            self._run_server(is_master=False)
548