1import sys, socket, errno, logging
2from time import time, sleep
3from autotest_lib.client.common_lib import error, utils
4
5# default barrier port
6_DEFAULT_PORT = 11922
7
8def _get_host_from_id(hostid):
9    # Remove any trailing local identifier following a #.
10    # This allows multiple members per host which is particularly
11    # helpful in testing.
12    if not hostid.startswith('#'):
13        return hostid.split('#')[0]
14    else:
15        raise error.BarrierError(
16            "Invalid Host id: Host Address should be specified")
17
18
19class BarrierAbortError(error.BarrierError):
20    """Special BarrierError raised when an explicit abort is requested."""
21
22
23class listen_server(object):
24    """
25    Manages a listening socket for barrier.
26
27    Can be used to run multiple barrier instances with the same listening
28    socket (if they were going to listen on the same port).
29
30    Attributes:
31
32    @attr address: Address to bind to (string).
33    @attr port: Port to bind to.
34    @attr socket: Listening socket object.
35    """
36    def __init__(self, address='', port=_DEFAULT_PORT):
37        """
38        Create a listen_server instance for the given address/port.
39
40        @param address: The address to listen on.
41        @param port: The port to listen on.
42        """
43        self.address = address
44        self.port = port
45        # Open the port so that the listening server can accept incoming
46        # connections.
47        utils.run('iptables -A INPUT -p tcp -m tcp --dport %d -j ACCEPT' %
48                  port)
49        self.socket = self._setup()
50
51
52    def _setup(self):
53        """Create, bind and listen on the listening socket."""
54        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
55        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
56        sock.bind((self.address, self.port))
57        sock.listen(10)
58
59        return sock
60
61
62    def close(self):
63        """Close the listening socket."""
64        self.socket.close()
65
66
67class barrier(object):
68    """Multi-machine barrier support.
69
70    Provides multi-machine barrier mechanism.
71    Execution stops until all members arrive at the barrier.
72
73    Implementation Details:
74    .......................
75
76    When a barrier is forming the master node (first in sort order) in the
77    set accepts connections from each member of the set.  As they arrive
78    they indicate the barrier they are joining and their identifier (their
79    hostname or IP address and optional tag).  They are then asked to wait.
80    When all members are present the master node then checks that each
81    member is still responding via a ping/pong exchange.  If this is
82    successful then everyone has checked in at the barrier.  We then tell
83    everyone they may continue via a rlse message.
84
85    Where the master is not the first to reach the barrier the client
86    connects will fail.  Client will retry until they either succeed in
87    connecting to master or the overall timeout is exceeded.
88
89    As an example here is the exchange for a three node barrier called
90    'TAG'
91
92      MASTER                        CLIENT1         CLIENT2
93        <-------------TAG C1-------------
94        --------------wait-------------->
95                      [...]
96        <-------------TAG C2-----------------------------
97        --------------wait------------------------------>
98                      [...]
99        --------------ping-------------->
100        <-------------pong---------------
101        --------------ping------------------------------>
102        <-------------pong-------------------------------
103                ----- BARRIER conditions MET -----
104        --------------rlse-------------->
105        --------------rlse------------------------------>
106
107    Note that once the last client has responded to pong the barrier is
108    implicitly deemed satisifed, they have all acknowledged their presence.
109    If we fail to send any of the rlse messages the barrier is still a
110    success, the failed host has effectively broken 'right at the beginning'
111    of the post barrier execution window.
112
113    In addition, there is another rendezvous, that makes each slave a server
114    and the master a client.  The connection process and usage is still the
115    same but allows barriers from machines that only have a one-way
116    connection initiation.  This is called rendezvous_servers.
117
118    For example:
119        if ME == SERVER:
120            server start
121
122        b = job.barrier(ME, 'server-up', 120)
123        b.rendezvous(CLIENT, SERVER)
124
125        if ME == CLIENT:
126            client run
127
128        b = job.barrier(ME, 'test-complete', 3600)
129        b.rendezvous(CLIENT, SERVER)
130
131        if ME == SERVER:
132            server stop
133
134    Any client can also request an abort of the job by setting
135    abort=True in the rendezvous arguments.
136    """
137
138    def __init__(self, hostid, tag, timeout=None, port=None,
139                 listen_server=None):
140        """
141        @param hostid: My hostname/IP address + optional tag.
142        @param tag: Symbolic name of the barrier in progress.
143        @param timeout: Maximum seconds to wait for a the barrier to meet.
144        @param port: Port number to listen on.
145        @param listen_server: External listen_server instance to use instead
146                of creating our own.  Create a listen_server instance and
147                reuse it across multiple barrier instances so that the
148                barrier code doesn't try to quickly re-bind on the same port
149                (packets still in transit for the previous barrier they may
150                reset new connections).
151        """
152        self._hostid = hostid
153        self._tag = tag
154        if listen_server:
155            if port:
156                raise error.BarrierError(
157                        '"port" and "listen_server" are mutually exclusive.')
158            self._port = listen_server.port
159        else:
160            self._port = port or _DEFAULT_PORT
161        self._server = listen_server  # A listen_server instance or None.
162        self._members = []  # List of hosts we expect to find at the barrier.
163        self._timeout_secs = timeout
164        self._start_time = None  # Timestamp of when we started waiting.
165        self._masterid = None  # Host/IP + optional tag of selected master.
166        logging.info("tag=%s port=%d timeout=%r",
167                     self._tag, self._port, self._timeout_secs)
168
169        # Number of clients seen (should be the length of self._waiting).
170        self._seen = 0
171
172        # Clients who have checked in and are waiting (if we are a master).
173        self._waiting = {}  # Maps from hostname -> (client, addr) tuples.
174
175
176    def _update_timeout(self, timeout):
177        if timeout is not None and self._start_time is not None:
178            self._timeout_secs = (time() - self._start_time) + timeout
179        else:
180            self._timeout_secs = timeout
181
182
183    def _remaining(self):
184        if self._timeout_secs is not None and self._start_time is not None:
185            timeout = self._timeout_secs - (time() - self._start_time)
186            if timeout <= 0:
187                errmsg = "timeout waiting for barrier: %s" % self._tag
188                logging.error(error)
189                raise error.BarrierError(errmsg)
190        else:
191            timeout = self._timeout_secs
192
193        if self._timeout_secs is not None:
194            logging.info("seconds remaining: %d", timeout)
195        return timeout
196
197
198    def _master_welcome(self, connection):
199        client, addr = connection
200        name = None
201
202        client.settimeout(5)
203        try:
204            # Get the clients name.
205            intro = client.recv(1024)
206            intro = intro.strip("\r\n")
207
208            intro_parts = intro.split(' ', 2)
209            if len(intro_parts) != 2:
210                logging.warning("Ignoring invalid data from %s: %r",
211                             client.getpeername(), intro)
212                client.close()
213                return
214            tag, name = intro_parts
215
216            logging.info("new client tag=%s, name=%s", tag, name)
217
218            # Ok, we know who is trying to attach.  Confirm that
219            # they are coming to the same meeting.  Also, everyone
220            # should be using a unique handle (their IP address).
221            # If we see a duplicate, something _bad_ has happened
222            # so drop them now.
223            if self._tag != tag:
224                logging.warning("client arriving for the wrong barrier: %s != %s",
225                             self._tag, tag)
226                client.settimeout(5)
227                client.send("!tag")
228                client.close()
229                return
230            elif name in self._waiting:
231                logging.warning("duplicate client")
232                client.settimeout(5)
233                client.send("!dup")
234                client.close()
235                return
236
237            # Acknowledge the client
238            client.send("wait")
239
240        except socket.timeout:
241            # This is nominally an error, but as we do not know
242            # who that was we cannot do anything sane other
243            # than report it and let the normal timeout kill
244            # us when thats appropriate.
245            logging.warning("client handshake timeout: (%s:%d)",
246                         addr[0], addr[1])
247            client.close()
248            return
249
250        logging.info("client now waiting: %s (%s:%d)",
251                     name, addr[0], addr[1])
252
253        # They seem to be valid record them.
254        self._waiting[name] = connection
255        self._seen += 1
256
257
258    def _slave_hello(self, connection):
259        (client, addr) = connection
260        name = None
261
262        client.settimeout(5)
263        try:
264            client.send(self._tag + " " + self._hostid)
265
266            reply = client.recv(4)
267            reply = reply.strip("\r\n")
268            logging.info("master said: %s", reply)
269
270            # Confirm the master accepted the connection.
271            if reply != "wait":
272                logging.warning("Bad connection request to master")
273                client.close()
274                return
275
276        except socket.timeout:
277            # This is nominally an error, but as we do not know
278            # who that was we cannot do anything sane other
279            # than report it and let the normal timeout kill
280            # us when thats appropriate.
281            logging.error("master handshake timeout: (%s:%d)",
282                          addr[0], addr[1])
283            client.close()
284            return
285
286        logging.info("slave now waiting: (%s:%d)", addr[0], addr[1])
287
288        # They seem to be valid record them.
289        self._waiting[self._hostid] = connection
290        self._seen = 1
291
292
293    def _master_release(self):
294        # Check everyone is still there, that they have not
295        # crashed or disconnected in the meantime.
296        allpresent = True
297        abort = self._abort
298        for name in self._waiting:
299            (client, addr) = self._waiting[name]
300
301            logging.info("checking client present: %s", name)
302
303            client.settimeout(5)
304            reply = 'none'
305            try:
306                client.send("ping")
307                reply = client.recv(1024)
308            except socket.timeout:
309                logging.warning("ping/pong timeout: %s", name)
310                pass
311
312            if reply == 'abrt':
313                logging.warning("Client %s requested abort", name)
314                abort = True
315            elif reply != "pong":
316                allpresent = False
317
318        if not allpresent:
319            raise error.BarrierError("master lost client")
320
321        if abort:
322            logging.info("Aborting the clients")
323            msg = 'abrt'
324        else:
325            logging.info("Releasing clients")
326            msg = 'rlse'
327
328        # If every ones checks in then commit the release.
329        for name in self._waiting:
330            (client, addr) = self._waiting[name]
331
332            client.settimeout(5)
333            try:
334                client.send(msg)
335            except socket.timeout:
336                logging.warning("release timeout: %s", name)
337                pass
338
339        if abort:
340            raise BarrierAbortError("Client requested abort")
341
342
343    def _waiting_close(self):
344        # Either way, close out all the clients.  If we have
345        # not released them then they know to abort.
346        for name in self._waiting:
347            (client, addr) = self._waiting[name]
348
349            logging.info("closing client: %s", name)
350
351            try:
352                client.close()
353            except:
354                pass
355
356
357    def _run_server(self, is_master):
358        server = self._server or listen_server(port=self._port)
359        failed = 0
360        try:
361            while True:
362                try:
363                    # Wait for callers welcoming each.
364                    server.socket.settimeout(self._remaining())
365                    connection = server.socket.accept()
366                    if is_master:
367                        self._master_welcome(connection)
368                    else:
369                        self._slave_hello(connection)
370                except socket.timeout:
371                    logging.warning("timeout waiting for remaining clients")
372                    pass
373
374                if is_master:
375                    # Check if everyone is here.
376                    logging.info("master seen %d of %d",
377                                 self._seen, len(self._members))
378                    if self._seen == len(self._members):
379                        self._master_release()
380                        break
381                else:
382                    # Check if master connected.
383                    if self._seen:
384                        logging.info("slave connected to master")
385                        self._slave_wait()
386                        break
387        finally:
388            self._waiting_close()
389            # if we created the listening_server in the beginning of this
390            # function then close the listening socket here
391            if not self._server:
392                server.close()
393
394
395    def _run_client(self, is_master):
396        while self._remaining() is None or self._remaining() > 0:
397            try:
398                remote = socket.socket(socket.AF_INET,
399                        socket.SOCK_STREAM)
400                remote.settimeout(30)
401                if is_master:
402                    # Connect to all slaves.
403                    host = _get_host_from_id(self._members[self._seen])
404                    logging.info("calling slave: %s", host)
405                    connection = (remote, (host, self._port))
406                    remote.connect(connection[1])
407                    self._master_welcome(connection)
408                else:
409                    # Just connect to the master.
410                    host = _get_host_from_id(self._masterid)
411                    logging.info("calling master")
412                    connection = (remote, (host, self._port))
413                    remote.connect(connection[1])
414                    self._slave_hello(connection)
415            except socket.timeout:
416                logging.warning("timeout calling host, retry")
417                sleep(10)
418                pass
419            except socket.error, err:
420                (code, str) = err
421                if (code != errno.ECONNREFUSED):
422                    raise
423                sleep(10)
424
425            if is_master:
426                # Check if everyone is here.
427                logging.info("master seen %d of %d",
428                             self._seen, len(self._members))
429                if self._seen == len(self._members):
430                    self._master_release()
431                    break
432            else:
433                # Check if master connected.
434                if self._seen:
435                    logging.info("slave connected to master")
436                    self._slave_wait()
437                    break
438
439        self._waiting_close()
440
441
442    def _slave_wait(self):
443        remote = self._waiting[self._hostid][0]
444        mode = "wait"
445        while True:
446            # All control messages are the same size to allow
447            # us to split individual messages easily.
448            remote.settimeout(self._remaining())
449            reply = remote.recv(4)
450            if not reply:
451                break
452
453            reply = reply.strip("\r\n")
454            logging.info("master said: %s", reply)
455
456            mode = reply
457            if reply == "ping":
458                # Ensure we have sufficient time for the
459                # ping/pong/rlse cyle to complete normally.
460                self._update_timeout(10 + 10 * len(self._members))
461
462                if self._abort:
463                    msg = "abrt"
464                else:
465                    msg = "pong"
466                logging.info(msg)
467                remote.settimeout(self._remaining())
468                remote.send(msg)
469
470            elif reply == "rlse" or reply == "abrt":
471                # Ensure we have sufficient time for the
472                # ping/pong/rlse cyle to complete normally.
473                self._update_timeout(10 + 10 * len(self._members))
474
475                logging.info("was released, waiting for close")
476
477        if mode == "rlse":
478            pass
479        elif mode == "wait":
480            raise error.BarrierError("master abort -- barrier timeout")
481        elif mode == "ping":
482            raise error.BarrierError("master abort -- client lost")
483        elif mode == "!tag":
484            raise error.BarrierError("master abort -- incorrect tag")
485        elif mode == "!dup":
486            raise error.BarrierError("master abort -- duplicate client")
487        elif mode == "abrt":
488            raise BarrierAbortError("Client requested abort")
489        else:
490            raise error.BarrierError("master handshake failure: " + mode)
491
492
493    def rendezvous(self, *hosts, **dargs):
494        # if called with abort=True, this will raise an exception
495        # on all the clients.
496        self._start_time = time()
497        self._members = list(hosts)
498        self._members.sort()
499        self._masterid = self._members.pop(0)
500        self._abort = dargs.get('abort', False)
501
502        logging.info("masterid: %s", self._masterid)
503        if self._abort:
504            logging.debug("%s is aborting", self._hostid)
505        if not len(self._members):
506            logging.info("No other members listed.")
507            return
508        logging.info("members: %s", ",".join(self._members))
509
510        self._seen = 0
511        self._waiting = {}
512
513        # Figure out who is the master in this barrier.
514        if self._hostid == self._masterid:
515            logging.info("selected as master")
516            self._run_server(is_master=True)
517        else:
518            logging.info("selected as slave")
519            self._run_client(is_master=False)
520
521
522    def rendezvous_servers(self, masterid, *hosts, **dargs):
523        # if called with abort=True, this will raise an exception
524        # on all the clients.
525        self._start_time = time()
526        self._members = list(hosts)
527        self._members.sort()
528        self._masterid = masterid
529        self._abort = dargs.get('abort', False)
530
531        logging.info("masterid: %s", self._masterid)
532        if not len(self._members):
533            logging.info("No other members listed.")
534            return
535        logging.info("members: %s", ",".join(self._members))
536
537        self._seen = 0
538        self._waiting = {}
539
540        # Figure out who is the master in this barrier.
541        if self._hostid == self._masterid:
542            logging.info("selected as master")
543            self._run_client(is_master=True)
544        else:
545            logging.info("selected as slave")
546            self._run_server(is_master=False)
547