barrier.py revision 02fc0b9199aa71c7f801aee4fcd4b86513e8facb
1import sys, socket, errno, logging 2from time import time, sleep 3from autotest_lib.client.common_lib import error 4 5# default barrier port 6_DEFAULT_PORT = 11922 7 8 9class BarrierAbortError(error.BarrierError): 10 """Special BarrierError raised when an explicit abort is requested.""" 11 12 13class listen_server(object): 14 """ 15 Manages a listening socket for barrier. 16 17 Can be used to run multiple barrier instances with the same listening 18 socket (if they were going to listen on the same port). 19 20 Attributes: 21 22 @attr address: Address to bind to (string). 23 @attr port: Port to bind to. 24 @attr socket: Listening socket object. 25 """ 26 def __init__(self, address='', port=_DEFAULT_PORT): 27 """ 28 Create a listen_server instance for the given address/port. 29 30 @param address: The address to listen on. 31 @param port: The port to listen on. 32 """ 33 self.address = address 34 self.port = port 35 self.socket = self._setup() 36 37 38 def _setup(self): 39 """Create, bind and listen on the listening socket.""" 40 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 41 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 42 sock.bind((self.address, self.port)) 43 sock.listen(10) 44 45 return sock 46 47 48 def close(self): 49 """Close the listening socket.""" 50 self.socket.close() 51 52 53class barrier(object): 54 """Multi-machine barrier support. 55 56 Provides multi-machine barrier mechanism. 57 Execution stops until all members arrive at the barrier. 58 59 Implementation Details: 60 ....................... 61 62 When a barrier is forming the master node (first in sort order) in the 63 set accepts connections from each member of the set. As they arrive 64 they indicate the barrier they are joining and their identifier (their 65 hostname or IP address and optional tag). They are then asked to wait. 66 When all members are present the master node then checks that each 67 member is still responding via a ping/pong exchange. If this is 68 successful then everyone has checked in at the barrier. We then tell 69 everyone they may continue via a rlse message. 70 71 Where the master is not the first to reach the barrier the client 72 connects will fail. Client will retry until they either succeed in 73 connecting to master or the overall timeout is exceeded. 74 75 As an example here is the exchange for a three node barrier called 76 'TAG' 77 78 MASTER CLIENT1 CLIENT2 79 <-------------TAG C1------------- 80 --------------wait--------------> 81 [...] 82 <-------------TAG C2----------------------------- 83 --------------wait------------------------------> 84 [...] 85 --------------ping--------------> 86 <-------------pong--------------- 87 --------------ping------------------------------> 88 <-------------pong------------------------------- 89 ----- BARRIER conditions MET ----- 90 --------------rlse--------------> 91 --------------rlse------------------------------> 92 93 Note that once the last client has responded to pong the barrier is 94 implicitly deemed satisifed, they have all acknowledged their presence. 95 If we fail to send any of the rlse messages the barrier is still a 96 success, the failed host has effectively broken 'right at the beginning' 97 of the post barrier execution window. 98 99 In addition, there is another rendezvous, that makes each slave a server 100 and the master a client. The connection process and usage is still the 101 same but allows barriers from machines that only have a one-way 102 connection initiation. This is called rendezvous_servers. 103 104 For example: 105 if ME == SERVER: 106 server start 107 108 b = job.barrier(ME, 'server-up', 120) 109 b.rendezvous(CLIENT, SERVER) 110 111 if ME == CLIENT: 112 client run 113 114 b = job.barrier(ME, 'test-complete', 3600) 115 b.rendezvous(CLIENT, SERVER) 116 117 if ME == SERVER: 118 server stop 119 120 Any client can also request an abort of the job by setting 121 abort=True in the rendezvous arguments. 122 """ 123 124 def __init__(self, hostid, tag, timeout=None, port=None, 125 listen_server=None): 126 """ 127 @param hostid: My hostname/IP address + optional tag. 128 @param tag: Symbolic name of the barrier in progress. 129 @param timeout: Maximum seconds to wait for a the barrier to meet. 130 @param port: Port number to listen on. 131 @param listen_server: External listen_server instance to use instead 132 of creating our own. Create a listen_server instance and 133 reuse it across multiple barrier instances so that the 134 barrier code doesn't try to quickly re-bind on the same port 135 (packets still in transit for the previous barrier they may 136 reset new connections). 137 """ 138 self._hostid = hostid 139 self._tag = tag 140 if listen_server: 141 if port: 142 raise error.BarrierError( 143 '"port" and "listen_server" are mutually exclusive.') 144 self._port = listen_server.port 145 else: 146 self._port = port or _DEFAULT_PORT 147 self._server = listen_server # A listen_server instance or None. 148 self._members = [] # List of hosts we expect to find at the barrier. 149 self._timeout_secs = timeout 150 self._start_time = None # Timestamp of when we started waiting. 151 self._masterid = None # Host/IP + optional tag of selected master. 152 logging.info("tag=%s port=%d timeout=%r", 153 self._tag, self._port, self._timeout_secs) 154 155 # Number of clients seen (should be the length of self._waiting). 156 self._seen = 0 157 158 # Clients who have checked in and are waiting (if we are a master). 159 self._waiting = {} # Maps from hostname -> (client, addr) tuples. 160 161 162 def _get_host_from_id(self, hostid): 163 # Remove any trailing local identifier following a #. 164 # This allows multiple members per host which is particularly 165 # helpful in testing. 166 if not hostid.startswith('#'): 167 return hostid.split('#')[0] 168 else: 169 raise error.BarrierError( 170 "Invalid Host id: Host Address should be specified") 171 172 173 def _update_timeout(self, timeout): 174 if timeout is not None and self._start_time is not None: 175 self._timeout_secs = (time() - self._start_time) + timeout 176 else: 177 self._timeout_secs = timeout 178 179 180 def _remaining(self): 181 if self._timeout_secs is not None and self._start_time is not None: 182 timeout = self._timeout_secs - (time() - self._start_time) 183 if timeout <= 0: 184 errmsg = "timeout waiting for barrier: %s" % self._tag 185 logging.error(error) 186 raise error.BarrierError(errmsg) 187 else: 188 timeout = self._timeout_secs 189 190 if self._timeout_secs is not None: 191 logging.info("seconds remaining: %d", timeout) 192 return timeout 193 194 195 def _master_welcome(self, connection): 196 client, addr = connection 197 name = None 198 199 client.settimeout(5) 200 try: 201 # Get the clients name. 202 intro = client.recv(1024) 203 intro = intro.strip("\r\n") 204 205 intro_parts = intro.split(' ', 2) 206 if len(intro_parts) != 2: 207 logging.warn("Ignoring invalid data from %s: %r", 208 client.getpeername(), intro) 209 client.close() 210 return 211 tag, name = intro_parts 212 213 logging.info("new client tag=%s, name=%s", tag, name) 214 215 # Ok, we know who is trying to attach. Confirm that 216 # they are coming to the same meeting. Also, everyone 217 # should be using a unique handle (their IP address). 218 # If we see a duplicate, something _bad_ has happened 219 # so drop them now. 220 if self._tag != tag: 221 logging.warn("client arriving for the wrong barrier: %s != %s", 222 self._tag, tag) 223 client.settimeout(5) 224 client.send("!tag") 225 client.close() 226 return 227 elif name in self._waiting: 228 logging.warn("duplicate client") 229 client.settimeout(5) 230 client.send("!dup") 231 client.close() 232 return 233 234 # Acknowledge the client 235 client.send("wait") 236 237 except socket.timeout: 238 # This is nominally an error, but as we do not know 239 # who that was we cannot do anything sane other 240 # than report it and let the normal timeout kill 241 # us when thats appropriate. 242 logging.warn("client handshake timeout: (%s:%d)", 243 addr[0], addr[1]) 244 client.close() 245 return 246 247 logging.info("client now waiting: %s (%s:%d)", 248 name, addr[0], addr[1]) 249 250 # They seem to be valid record them. 251 self._waiting[name] = connection 252 self._seen += 1 253 254 255 def _slave_hello(self, connection): 256 (client, addr) = connection 257 name = None 258 259 client.settimeout(5) 260 try: 261 client.send(self._tag + " " + self._hostid) 262 263 reply = client.recv(4) 264 reply = reply.strip("\r\n") 265 logging.info("master said: %s", reply) 266 267 # Confirm the master accepted the connection. 268 if reply != "wait": 269 logging.warn("Bad connection request to master") 270 client.close() 271 return 272 273 except socket.timeout: 274 # This is nominally an error, but as we do not know 275 # who that was we cannot do anything sane other 276 # than report it and let the normal timeout kill 277 # us when thats appropriate. 278 logging.error("master handshake timeout: (%s:%d)", 279 addr[0], addr[1]) 280 client.close() 281 return 282 283 logging.info("slave now waiting: (%s:%d)", addr[0], addr[1]) 284 285 # They seem to be valid record them. 286 self._waiting[self._hostid] = connection 287 self._seen = 1 288 289 290 def _master_release(self): 291 # Check everyone is still there, that they have not 292 # crashed or disconnected in the meantime. 293 allpresent = True 294 abort = self._abort 295 for name in self._waiting: 296 (client, addr) = self._waiting[name] 297 298 logging.info("checking client present: %s", name) 299 300 client.settimeout(5) 301 reply = 'none' 302 try: 303 client.send("ping") 304 reply = client.recv(1024) 305 except socket.timeout: 306 logging.warn("ping/pong timeout: %s", name) 307 pass 308 309 if reply == 'abrt': 310 logging.warn("Client %s requested abort", name) 311 abort = True 312 elif reply != "pong": 313 allpresent = False 314 315 if not allpresent: 316 raise error.BarrierError("master lost client") 317 318 if abort: 319 logging.info("Aborting the clients") 320 msg = 'abrt' 321 else: 322 logging.info("Releasing clients") 323 msg = 'rlse' 324 325 # If every ones checks in then commit the release. 326 for name in self._waiting: 327 (client, addr) = self._waiting[name] 328 329 client.settimeout(5) 330 try: 331 client.send(msg) 332 except socket.timeout: 333 logging.warn("release timeout: %s", name) 334 pass 335 336 if abort: 337 raise BarrierAbortError("Client requested abort") 338 339 340 def _waiting_close(self): 341 # Either way, close out all the clients. If we have 342 # not released them then they know to abort. 343 for name in self._waiting: 344 (client, addr) = self._waiting[name] 345 346 logging.info("closing client: %s", name) 347 348 try: 349 client.close() 350 except: 351 pass 352 353 354 def _run_server(self, is_master): 355 server = self._server or listen_server(port=self._port) 356 failed = 0 357 try: 358 while True: 359 try: 360 # Wait for callers welcoming each. 361 server.socket.settimeout(self._remaining()) 362 connection = server.socket.accept() 363 if is_master: 364 self._master_welcome(connection) 365 else: 366 self._slave_hello(connection) 367 except socket.timeout: 368 logging.warn("timeout waiting for remaining clients") 369 pass 370 371 if is_master: 372 # Check if everyone is here. 373 logging.info("master seen %d of %d", 374 self._seen, len(self._members)) 375 if self._seen == len(self._members): 376 self._master_release() 377 break 378 else: 379 # Check if master connected. 380 if self._seen: 381 logging.info("slave connected to master") 382 self._slave_wait() 383 break 384 finally: 385 self._waiting_close() 386 # if we created the listening_server in the beginning of this 387 # function then close the listening socket here 388 if not self._server: 389 server.close() 390 391 392 def _run_client(self, is_master): 393 while self._remaining() is None or self._remaining() > 0: 394 try: 395 remote = socket.socket(socket.AF_INET, 396 socket.SOCK_STREAM) 397 remote.settimeout(30) 398 if is_master: 399 # Connect to all slaves. 400 host = self._get_host_from_id(self._members[self._seen]) 401 logging.info("calling slave: %s", host) 402 connection = (remote, (host, self._port)) 403 remote.connect(connection[1]) 404 self._master_welcome(connection) 405 else: 406 # Just connect to the master. 407 host = self._get_host_from_id(self._masterid) 408 logging.info("calling master") 409 connection = (remote, (host, self._port)) 410 remote.connect(connection[1]) 411 self._slave_hello(connection) 412 except socket.timeout: 413 logging.warn("timeout calling host, retry") 414 sleep(10) 415 pass 416 except socket.error, err: 417 (code, str) = err 418 if (code != errno.ECONNREFUSED): 419 raise 420 sleep(10) 421 422 if is_master: 423 # Check if everyone is here. 424 logging.info("master seen %d of %d", 425 self._seen, len(self._members)) 426 if self._seen == len(self._members): 427 self._master_release() 428 break 429 else: 430 # Check if master connected. 431 if self._seen: 432 logging.info("slave connected to master") 433 self._slave_wait() 434 break 435 436 self._waiting_close() 437 438 439 def _slave_wait(self): 440 remote = self._waiting[self._hostid][0] 441 mode = "wait" 442 while True: 443 # All control messages are the same size to allow 444 # us to split individual messages easily. 445 remote.settimeout(self._remaining()) 446 reply = remote.recv(4) 447 if not reply: 448 break 449 450 reply = reply.strip("\r\n") 451 logging.info("master said: %s", reply) 452 453 mode = reply 454 if reply == "ping": 455 # Ensure we have sufficient time for the 456 # ping/pong/rlse cyle to complete normally. 457 self._update_timeout(10 + 10 * len(self._members)) 458 459 if self._abort: 460 msg = "abrt" 461 else: 462 msg = "pong" 463 logging.info(msg) 464 remote.settimeout(self._remaining()) 465 remote.send(msg) 466 467 elif reply == "rlse" or reply == "abrt": 468 # Ensure we have sufficient time for the 469 # ping/pong/rlse cyle to complete normally. 470 self._update_timeout(10 + 10 * len(self._members)) 471 472 logging.info("was released, waiting for close") 473 474 if mode == "rlse": 475 pass 476 elif mode == "wait": 477 raise error.BarrierError("master abort -- barrier timeout") 478 elif mode == "ping": 479 raise error.BarrierError("master abort -- client lost") 480 elif mode == "!tag": 481 raise error.BarrierError("master abort -- incorrect tag") 482 elif mode == "!dup": 483 raise error.BarrierError("master abort -- duplicate client") 484 elif mode == "abrt": 485 raise BarrierAbortError("Client requested abort") 486 else: 487 raise error.BarrierError("master handshake failure: " + mode) 488 489 490 def rendezvous(self, *hosts, **dargs): 491 # if called with abort=True, this will raise an exception 492 # on all the clients. 493 self._start_time = time() 494 self._members = list(hosts) 495 self._members.sort() 496 self._masterid = self._members.pop(0) 497 self._abort = dargs.get('abort', False) 498 499 logging.info("masterid: %s", self._masterid) 500 if self._abort: 501 logging.debug("%s is aborting", self._hostid) 502 if not len(self._members): 503 logging.info("No other members listed.") 504 return 505 logging.info("members: %s", ",".join(self._members)) 506 507 self._seen = 0 508 self._waiting = {} 509 510 # Figure out who is the master in this barrier. 511 if self._hostid == self._masterid: 512 logging.info("selected as master") 513 self._run_server(is_master=True) 514 else: 515 logging.info("selected as slave") 516 self._run_client(is_master=False) 517 518 519 def rendezvous_servers(self, masterid, *hosts, **dargs): 520 # if called with abort=True, this will raise an exception 521 # on all the clients. 522 self._start_time = time() 523 self._members = list(hosts) 524 self._members.sort() 525 self._masterid = masterid 526 self._abort = dargs.get('abort', False) 527 528 logging.info("masterid: %s", self._masterid) 529 if not len(self._members): 530 logging.info("No other members listed.") 531 return 532 logging.info("members: %s", ",".join(self._members)) 533 534 self._seen = 0 535 self._waiting = {} 536 537 # Figure out who is the master in this barrier. 538 if self._hostid == self._masterid: 539 logging.info("selected as master") 540 self._run_client(is_master=True) 541 else: 542 logging.info("selected as slave") 543 self._run_server(is_master=False) 544