1#!/usr/bin/python
2"""
3Copyright 2016 Google Inc. All Rights Reserved.
4
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9    http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16"""
17import asyncore
18import gc
19import logging
20import Queue
21import signal
22import socket
23import sys
24import threading
25import time
26
27server = None
28in_pipe = None
29out_pipe = None
30must_exit = False
31options = None
32dest_addresses = None
33connections = {}
34dns_cache = {}
35port_mappings = None
36map_localhost = False
37needs_flush = False
38flush_pipes = False
39REMOVE_TCP_OVERHEAD = 1460.0 / 1500.0
40
41
42def PrintMessage(msg):
43  # Print the message to stdout & flush to make sure that the message is not
44  # buffered when tsproxy is run as a subprocess.
45  print >> sys.stdout, msg
46  sys.stdout.flush()
47
48
49########################################################################################################################
50#   Traffic-shaping pipe (just passthrough for now)
51########################################################################################################################
52class TSPipe():
53  PIPE_IN = 0
54  PIPE_OUT = 1
55
56  def __init__(self, direction, latency, kbps):
57    self.direction = direction
58    self.latency = latency
59    self.kbps = kbps
60    self.queue = Queue.Queue()
61    self.last_tick = time.clock()
62    self.next_message = None
63    self.available_bytes = .0
64    self.peer = 'server'
65    if self.direction == self.PIPE_IN:
66      self.peer = 'client'
67
68  def SendMessage(self, message):
69    global connections
70    try:
71      connection_id = message['connection']
72      if connection_id in connections and self.peer in connections[connection_id]:
73        now = time.clock()
74        if message['message'] == 'closed':
75          message['time'] = now
76        else:
77          message['time'] = time.clock() + self.latency
78        message['size'] = .0
79        if 'data' in message:
80          message['size'] = float(len(message['data']))
81        self.queue.put(message)
82    except:
83      pass
84
85  def tick(self):
86    global connections
87    global flush_pipes
88    processed_messages = False
89    now = time.clock()
90    try:
91      if self.next_message is None:
92        self.next_message = self.queue.get_nowait()
93
94      # Accumulate bandwidth if an available packet/message was waiting since our last tick
95      if self.next_message is not None and self.kbps > .0 and self.next_message['time'] <= now:
96        elapsed = now - self.last_tick
97        accumulated_bytes = elapsed * self.kbps * 1000.0 / 8.0
98        self.available_bytes += accumulated_bytes
99
100      # process messages as long as the next message is sendable (latency or available bytes)
101      while (self.next_message is not None) and\
102          (flush_pipes or ((self.next_message['time'] <= now) and\
103                          (self.kbps <= .0 or self.next_message['size'] <= self.available_bytes))):
104        processed_messages = True
105        self.queue.task_done()
106        connection_id = self.next_message['connection']
107        if connection_id in connections:
108          if self.peer in connections[connection_id]:
109            try:
110              if self.kbps > .0:
111                self.available_bytes -= self.next_message['size']
112              connections[connection_id][self.peer].handle_message(self.next_message)
113            except:
114              # Clean up any disconnected connections
115              try:
116                connections[connection_id]['server'].close()
117              except:
118                pass
119              try:
120                connections[connection_id]['client'].close()
121              except:
122                pass
123              del connections[connection_id]
124        self.next_message = None
125        self.next_message = self.queue.get_nowait()
126    except:
127      pass
128
129    # Only accumulate bytes while we have messages that are ready to send
130    if self.next_message is None or self.next_message['time'] > now:
131      self.available_bytes = .0
132    self.last_tick = now
133
134    return processed_messages
135
136
137########################################################################################################################
138#   Threaded DNS resolver
139########################################################################################################################
140class AsyncDNS(threading.Thread):
141  def __init__(self, client_id, hostname, port, result_pipe):
142    threading.Thread.__init__(self)
143    self.hostname = hostname
144    self.port = port
145    self.client_id = client_id
146    self.result_pipe = result_pipe
147
148  def run(self):
149    try:
150      addresses = socket.getaddrinfo(self.hostname, self.port)
151      logging.info('[{0:d}] Resolving {1}:{2:d} Completed'.format(self.client_id, self.hostname, self.port))
152    except:
153      addresses = ()
154      logging.info('[{0:d}] Resolving {1}:{2:d} Failed'.format(self.client_id, self.hostname, self.port))
155    message = {'message': 'resolved', 'connection': self.client_id, 'addresses': addresses}
156    self.result_pipe.SendMessage(message)
157
158
159########################################################################################################################
160#   TCP Client
161########################################################################################################################
162class TCPConnection(asyncore.dispatcher):
163  STATE_ERROR = -1
164  STATE_IDLE = 0
165  STATE_RESOLVING = 1
166  STATE_CONNECTING = 2
167  STATE_CONNECTED = 3
168
169  def __init__(self, client_id):
170    global options
171    asyncore.dispatcher.__init__(self)
172    self.client_id = client_id
173    self.state = self.STATE_IDLE
174    self.buffer = ''
175    self.addr = None
176    self.dns_thread = None
177    self.hostname = None
178    self.port = None
179    self.needs_config = True
180    self.needs_close = False
181    self.read_available = False
182    self.window_available = options.window
183    self.is_localhost = False
184    self.did_resolve = False;
185
186  def SendMessage(self, type, message):
187    message['message'] = type
188    message['connection'] = self.client_id
189    in_pipe.SendMessage(message)
190
191  def handle_message(self, message):
192    if message['message'] == 'data' and 'data' in message and len(message['data']) and self.state == self.STATE_CONNECTED:
193      if not self.needs_close:
194        self.buffer += message['data']
195        self.SendMessage('ack', {})
196    elif message['message'] == 'ack':
197      # Increase the congestion window by 2 packets for every packet transmitted up to 350 packets (~512KB)
198      self.window_available = min(self.window_available + 2, 350)
199      if self.read_available:
200        self.handle_read()
201    elif message['message'] == 'resolve':
202      self.HandleResolve(message)
203    elif message['message'] == 'connect':
204      self.HandleConnect(message)
205    elif message['message'] == 'closed':
206      if len(self.buffer) == 0:
207        self.handle_close()
208      else:
209        self.needs_close = True
210
211  def handle_error(self):
212    logging.warning('[{0:d}] Error'.format(self.client_id))
213    if self.state == self.STATE_CONNECTING:
214      self.SendMessage('connected', {'success': False, 'address': self.addr})
215
216  def handle_close(self):
217    logging.info('[{0:d}] Server Connection Closed'.format(self.client_id))
218    self.state = self.STATE_ERROR
219    self.close()
220    try:
221      if self.client_id in connections:
222        if 'server' in connections[self.client_id]:
223          del connections[self.client_id]['server']
224        if 'client' in connections[self.client_id]:
225          self.SendMessage('closed', {})
226        else:
227          del connections[self.client_id]
228    except:
229      pass
230
231  def writable(self):
232    if self.state == self.STATE_CONNECTING:
233      self.state = self.STATE_CONNECTED
234      self.SendMessage('connected', {'success': True, 'address': self.addr})
235      logging.info('[{0:d}] Connected'.format(self.client_id))
236    return (len(self.buffer) > 0 and self.state == self.STATE_CONNECTED)
237
238  def handle_write(self):
239    if self.needs_config:
240      self.needs_config = False
241      self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
242    sent = self.send(self.buffer)
243    logging.debug('[{0:d}] TCP => {1:d} byte(s)'.format(self.client_id, sent))
244    self.buffer = self.buffer[sent:]
245    if self.needs_close and len(self.buffer) == 0:
246      self.needs_close = False
247      self.handle_close()
248
249  def handle_read(self):
250    if self.window_available == 0:
251      self.read_available = True
252      return
253    self.read_available = False
254    try:
255      while self.window_available > 0:
256        data = self.recv(1460)
257        if data:
258          if self.state == self.STATE_CONNECTED:
259            self.window_available -= 1
260            logging.debug('[{0:d}] TCP <= {1:d} byte(s)'.format(self.client_id, len(data)))
261            self.SendMessage('data', {'data': data})
262        else:
263          return
264    except:
265      pass
266
267  def HandleResolve(self, message):
268    global in_pipe
269    global map_localhost
270    self.did_resolve = True
271    if 'hostname' in message:
272      self.hostname = message['hostname']
273    self.port = 0
274    if 'port' in message:
275      self.port = message['port']
276    logging.info('[{0:d}] Resolving {1}:{2:d}'.format(self.client_id, self.hostname, self.port))
277    if self.hostname == 'localhost':
278      self.hostname = '127.0.0.1'
279    if self.hostname == '127.0.0.1':
280      logging.info('[{0:d}] Connection to localhost detected'.format(self.client_id))
281      self.is_localhost = True
282    if (dest_addresses is not None) and (not self.is_localhost or map_localhost):
283      self.SendMessage('resolved', {'addresses': dest_addresses})
284    else:
285      self.state = self.STATE_RESOLVING
286      self.dns_thread = AsyncDNS(self.client_id, self.hostname, self.port, in_pipe)
287      self.dns_thread.start()
288
289  def HandleConnect(self, message):
290    global map_localhost
291    if 'addresses' in message and len(message['addresses']):
292      self.state = self.STATE_CONNECTING
293      if not self.did_resolve and message['addresses'][0] == '127.0.0.1':
294        logging.info('[{0:d}] Connection to localhost detected'.format(self.client_id))
295        self.is_localhost = True
296      if (dest_addresses is not None) and (not self.is_localhost or map_localhost):
297        self.addr = dest_addresses[0]
298      else:
299        self.addr = message['addresses'][0]
300      self.create_socket(self.addr[0], socket.SOCK_STREAM)
301      addr = self.addr[4][0]
302      if not self.is_localhost or map_localhost:
303        port = GetDestPort(message['port'])
304      else:
305        port = message['port']
306      logging.info('[{0:d}] Connecting to {1}:{2:d}'.format(self.client_id, addr, port))
307      self.connect((addr, port))
308
309
310########################################################################################################################
311#   Socks5 Server
312########################################################################################################################
313class Socks5Server(asyncore.dispatcher):
314
315  def __init__(self, host, port):
316    asyncore.dispatcher.__init__(self)
317    self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
318    try:
319      #self.set_reuse_addr()
320      self.bind((host, port))
321      self.listen(socket.SOMAXCONN)
322      self.ipaddr, self.port = self.getsockname()
323      self.current_client_id = 0
324    except:
325      PrintMessage("Unable to listen on {0}:{1}. Is the port already in use?".format(host, port))
326      exit(1)
327
328  def handle_accept(self):
329    global connections
330    pair = self.accept()
331    if pair is not None:
332      sock, addr = pair
333      self.current_client_id += 1
334      logging.info('[{0:d}] Incoming connection from {1}'.format(self.current_client_id, repr(addr)))
335      connections[self.current_client_id] = {
336        'client' : Socks5Connection(sock, self.current_client_id),
337        'server' : None
338      }
339
340
341# Socks5 reference: https://en.wikipedia.org/wiki/SOCKS#SOCKS5
342class Socks5Connection(asyncore.dispatcher):
343  STATE_ERROR = -1
344  STATE_WAITING_FOR_HANDSHAKE = 0
345  STATE_WAITING_FOR_CONNECT_REQUEST = 1
346  STATE_RESOLVING = 2
347  STATE_CONNECTING = 3
348  STATE_CONNECTED = 4
349
350  def __init__(self, connected_socket, client_id):
351    global options
352    asyncore.dispatcher.__init__(self, connected_socket)
353    self.client_id = client_id
354    self.state = self.STATE_WAITING_FOR_HANDSHAKE
355    self.ip = None
356    self.addresses = None
357    self.hostname = None
358    self.port = None
359    self.requested_address = None
360    self.buffer = ''
361    self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
362    self.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1460)
363    self.needs_close = False
364    self.read_available = False
365    self.window_available = options.window
366
367  def SendMessage(self, type, message):
368    message['message'] = type
369    message['connection'] = self.client_id
370    out_pipe.SendMessage(message)
371
372  def handle_message(self, message):
373    if message['message'] == 'data' and 'data' in message and len(message['data']) and self.state == self.STATE_CONNECTED:
374      if not self.needs_close:
375        self.buffer += message['data']
376        self.SendMessage('ack', {})
377    elif message['message'] == 'ack':
378      # Increase the congestion window by 2 packets for every packet transmitted up to 350 packets (~512KB)
379      self.window_available = min(self.window_available + 2, 350)
380      if self.read_available:
381        self.handle_read()
382    elif message['message'] == 'resolved':
383      self.HandleResolved(message)
384    elif message['message'] == 'connected':
385      self.HandleConnected(message)
386    elif message['message'] == 'closed':
387      if len(self.buffer) == 0:
388        self.handle_close()
389      else:
390        self.needs_close = True
391
392  def writable(self):
393    return (len(self.buffer) > 0)
394
395  def handle_write(self):
396    sent = self.send(self.buffer)
397    logging.debug('[{0:d}] SOCKS <= {1:d} byte(s)'.format(self.client_id, sent))
398    self.buffer = self.buffer[sent:]
399    if self.needs_close and len(self.buffer) == 0:
400      self.needs_close = False
401      self.handle_close()
402
403  def handle_read(self):
404    global connections
405    global dns_cache
406    if self.window_available == 0:
407      self.read_available = True
408      return
409    self.read_available = False
410    try:
411      while self.window_available > 0:
412        # Consume in up-to packet-sized chunks (TCP packet payload as 1460 bytes from 1500 byte ethernet frames)
413        data = self.recv(1460)
414        if data:
415          data_len = len(data)
416          if self.state == self.STATE_CONNECTED:
417            logging.debug('[{0:d}] SOCKS => {1:d} byte(s)'.format(self.client_id, data_len))
418            self.window_available -= 1
419            self.SendMessage('data', {'data': data})
420          elif self.state == self.STATE_WAITING_FOR_HANDSHAKE:
421            self.state = self.STATE_ERROR #default to an error state, set correctly if things work out
422            if data_len >= 2 and ord(data[0]) == 0x05:
423              supports_no_auth = False
424              auth_count = ord(data[1])
425              if data_len == auth_count + 2:
426                for i in range(auth_count):
427                  offset = i + 2
428                  if ord(data[offset]) == 0:
429                    supports_no_auth = True
430              if supports_no_auth:
431                # Respond with a message that "No Authentication" was agreed to
432                logging.info('[{0:d}] New Socks5 client'.format(self.client_id))
433                response = chr(0x05) + chr(0x00)
434                self.state = self.STATE_WAITING_FOR_CONNECT_REQUEST
435                self.buffer += response
436          elif self.state == self.STATE_WAITING_FOR_CONNECT_REQUEST:
437            self.state = self.STATE_ERROR #default to an error state, set correctly if things work out
438            if data_len >= 10 and ord(data[0]) == 0x05 and ord(data[2]) == 0x00:
439              if ord(data[1]) == 0x01: #TCP connection (only supported method for now)
440                connections[self.client_id]['server'] = TCPConnection(self.client_id)
441              self.requested_address = data[3:]
442              port_offset = 0
443              if ord(data[3]) == 0x01:
444                port_offset = 8
445                self.ip = '{0:d}.{1:d}.{2:d}.{3:d}'.format(ord(data[4]), ord(data[5]), ord(data[6]), ord(data[7]))
446              elif ord(data[3]) == 0x03:
447                name_len = ord(data[4])
448                if data_len >= 6 + name_len:
449                  port_offset = 5 + name_len
450                  self.hostname = data[5:5 + name_len]
451              elif ord(data[3]) == 0x04 and data_len >= 22:
452                port_offset = 20
453                self.ip = ''
454                for i in range(16):
455                  self.ip += '{0:02x}'.format(ord(data[4 + i]))
456                  if i % 2 and i < 15:
457                    self.ip += ':'
458              if port_offset and connections[self.client_id]['server'] is not None:
459                self.port = 256 * ord(data[port_offset]) + ord(data[port_offset + 1])
460                if self.port:
461                  if self.ip is None and self.hostname is not None:
462                    if self.hostname in dns_cache:
463                      self.state = self.STATE_CONNECTING
464                      self.addresses = dns_cache[self.hostname]
465                      self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port})
466                    else:
467                      self.state = self.STATE_RESOLVING
468                      self.SendMessage('resolve', {'hostname': self.hostname, 'port': self.port})
469                  elif self.ip is not None:
470                    self.state = self.STATE_CONNECTING
471                    self.addresses = socket.getaddrinfo(self.ip, self.port)
472                    self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port})
473        else:
474          return
475    except:
476      pass
477
478  def handle_close(self):
479    logging.info('[{0:d}] Browser Connection Closed'.format(self.client_id))
480    self.state = self.STATE_ERROR
481    self.close()
482    try:
483      if self.client_id in connections:
484        if 'client' in connections[self.client_id]:
485          del connections[self.client_id]['client']
486        if 'server' in connections[self.client_id]:
487          self.SendMessage('closed', {})
488        else:
489          del connections[self.client_id]
490    except:
491      pass
492
493  def HandleResolved(self, message):
494    global dns_cache
495    if self.state == self.STATE_RESOLVING:
496      if 'addresses' in message and len(message['addresses']):
497        self.state = self.STATE_CONNECTING
498        self.addresses = message['addresses']
499        dns_cache[self.hostname] = self.addresses
500        logging.debug('[{0:d}] Resolved {1}, Connecting'.format(self.client_id, self.hostname))
501        self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port})
502      else:
503        # Send host unreachable error
504        self.state = self.STATE_ERROR
505        self.buffer += chr(0x05) + chr(0x04) + self.requested_address
506
507  def HandleConnected(self, message):
508    if 'success' in message and self.state == self.STATE_CONNECTING:
509      response = chr(0x05)
510      if message['success']:
511        response += chr(0x00)
512        logging.debug('[{0:d}] Connected to {1}'.format(self.client_id, self.hostname))
513        self.state = self.STATE_CONNECTED
514      else:
515        response += chr(0x04)
516        self.state = self.STATE_ERROR
517      response += chr(0x00)
518      response += self.requested_address
519      self.buffer += response
520
521
522########################################################################################################################
523#   stdin command processor
524########################################################################################################################
525class CommandProcessor():
526  def __init__(self):
527    thread = threading.Thread(target = self.run, args=())
528    thread.daemon = True
529    thread.start()
530
531  def run(self):
532    global must_exit
533    while not must_exit:
534      for line in iter(sys.stdin.readline, ''):
535        self.ProcessCommand(line.strip())
536
537  def ProcessCommand(self, input):
538    global in_pipe
539    global out_pipe
540    global needs_flush
541    global REMOVE_TCP_OVERHEAD
542    if len(input):
543      ok = False
544      try:
545        command = input.split()
546        if len(command) and len(command[0]):
547          if command[0].lower() == 'flush':
548            needs_flush = True
549            ok = True
550          elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'rtt' and len(command[2]):
551            rtt = float(command[2])
552            latency = rtt / 2000.0
553            in_pipe.latency = latency
554            out_pipe.latency = latency
555            needs_flush = True
556            ok = True
557          elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'inkbps' and len(command[2]):
558            in_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
559            needs_flush = True
560            ok = True
561          elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'outkbps' and len(command[2]):
562            out_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
563            needs_flush = True
564            ok = True
565          elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'mapports' and len(command[2]):
566            SetPortMappings(command[2])
567            needs_flush = True
568            ok = True
569      except:
570        pass
571      if not ok:
572        PrintMessage('ERROR')
573
574
575########################################################################################################################
576#   Main Entry Point
577########################################################################################################################
578def main():
579  global server
580  global options
581  global in_pipe
582  global out_pipe
583  global dest_addresses
584  global port_mappings
585  global map_localhost
586  import argparse
587  global REMOVE_TCP_OVERHEAD
588  parser = argparse.ArgumentParser(description='Traffic-shaping socks5 proxy.',
589                                   prog='tsproxy')
590  parser.add_argument('-v', '--verbose', action='count', help="Increase verbosity (specify multiple times for more). -vvvv for full debug output.")
591  parser.add_argument('-b', '--bind', default='localhost', help="Server interface address (defaults to localhost).")
592  parser.add_argument('-p', '--port', type=int, default=1080, help="Server port (defaults to 1080, use 0 for randomly assigned).")
593  parser.add_argument('-r', '--rtt', type=float, default=.0, help="Round Trip Time Latency (in ms).")
594  parser.add_argument('-i', '--inkbps', type=float, default=.0, help="Download Bandwidth (in 1000 bits/s - Kbps).")
595  parser.add_argument('-o', '--outkbps', type=float, default=.0, help="Upload Bandwidth (in 1000 bits/s - Kbps).")
596  parser.add_argument('-w', '--window', type=int, default=10, help="Emulated TCP initial congestion window (defaults to 10).")
597  parser.add_argument('-d', '--desthost', help="Redirect all outbound connections to the specified host.")
598  parser.add_argument('-m', '--mapports', help="Remap outbound ports. Comma-separated list of original:new with * as a wildcard. --mapports '443:8443,*:8080'")
599  parser.add_argument('-l', '--localhost', action='store_true', default=False,
600                      help="Include connections already destined for localhost/127.0.0.1 in the host and port remapping.")
601  options = parser.parse_args()
602
603  # Set up logging
604  log_level = logging.CRITICAL
605  if options.verbose == 1:
606    log_level = logging.ERROR
607  elif options.verbose == 2:
608    log_level = logging.WARNING
609  elif options.verbose == 3:
610    log_level = logging.INFO
611  elif options.verbose >= 4:
612    log_level = logging.DEBUG
613  logging.basicConfig(level=log_level, format="%(asctime)s.%(msecs)03d - %(message)s", datefmt="%H:%M:%S")
614
615  # Parse any port mappings
616  if options.mapports:
617    SetPortMappings(options.mapports)
618
619  map_localhost = options.localhost
620
621  # Resolve the address for a rewrite destination host if one was specified
622  if options.desthost:
623    dest_addresses = socket.getaddrinfo(options.desthost, GetDestPort(80))
624
625  # Set up the pipes.  1/2 of the latency gets applied in each direction (and /1000 to convert to seconds)
626  in_pipe = TSPipe(TSPipe.PIPE_IN, options.rtt / 2000.0, options.inkbps * REMOVE_TCP_OVERHEAD)
627  out_pipe = TSPipe(TSPipe.PIPE_OUT, options.rtt / 2000.0, options.outkbps * REMOVE_TCP_OVERHEAD)
628
629  signal.signal(signal.SIGINT, signal_handler)
630  server = Socks5Server(options.bind, options.port)
631  command_processor = CommandProcessor()
632  PrintMessage('Started Socks5 proxy server on {0}:{1:d}\nHit Ctrl-C to exit.'.format(server.ipaddr, server.port))
633  run_loop()
634
635def signal_handler(signal, frame):
636  global server
637  global must_exit
638  logging.error('Exiting...')
639  must_exit = True
640  del server
641
642
643# Wrapper around the asyncore loop that lets us poll the in/out pipes every 1ms
644def run_loop():
645  global must_exit
646  global in_pipe
647  global out_pipe
648  global needs_flush
649  global flush_pipes
650  gc_check_count = 0
651  last_activity = time.clock()
652  # disable gc to avoid pauses during traffic shaping/proxying
653  gc.disable()
654  while not must_exit:
655    asyncore.poll(0.001, asyncore.socket_map)
656    if needs_flush:
657      flush_pipes = True
658      needs_flush = False
659    if in_pipe.tick():
660      last_activity = time.clock()
661    if out_pipe.tick():
662      last_activity = time.clock()
663    if flush_pipes:
664      PrintMessage('OK')
665      flush_pipes = False
666    # Every 500 loops (~0.5 second) check to see if it is a good time to do a gc
667    if gc_check_count > 1000:
668      gc_check_count = 0
669      # manually gc after 5 seconds of idle
670      if time.clock() - last_activity >= 5:
671        last_activity = time.clock()
672        logging.debug("Triggering manual GC")
673        gc.collect()
674    else:
675      gc_check_count += 1
676
677
678def GetDestPort(port):
679  global port_mappings
680  if port_mappings is not None:
681    src_port = str(port)
682    if src_port in port_mappings:
683      return port_mappings[src_port]
684    elif 'default' in port_mappings:
685      return port_mappings['default']
686  return port
687
688
689def SetPortMappings(map_string):
690  global port_mappings
691  port_mappings = {}
692  map_string = map_string.strip('\'" \t\r\n')
693  for pair in map_string.split(','):
694    (src, dest) = pair.split(':')
695    if src == '*':
696      port_mappings['default'] = int(dest)
697      logging.debug("Default port mapped to port {0}".format(dest))
698    else:
699      logging.debug("Port {0} mapped to port {1}".format(src, dest))
700      port_mappings[src] = int(dest)
701
702
703if '__main__' == __name__:
704  main()
705