source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/rocket.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 58.1 KB
Line 
1# -*- coding: utf-8 -*-
2
3# This file is part of the Rocket Web Server
4# Copyright (c) 2011 Timothy Farrell
5# Modified by Massimo Di Pierro
6
7# Import System Modules
8
9import sys
10import errno
11import socket
12import logging
13import platform
14from gluon._compat import iteritems, to_bytes, to_unicode, StringIO
15from gluon._compat import urllib_unquote, to_native, PY2
16
17# Define Constants
18VERSION = '1.2.6'
19SERVER_NAME = socket.gethostname()
20SERVER_SOFTWARE = 'Rocket %s' % VERSION
21HTTP_SERVER_SOFTWARE = '%s Python/%s' % (
22    SERVER_SOFTWARE, sys.version.split(' ')[0])
23BUF_SIZE = 16384
24SOCKET_TIMEOUT = 10  # in secs
25THREAD_STOP_CHECK_INTERVAL = 1  # in secs, How often should threads check for a server stop message?
26IS_JYTHON = platform.system() == 'Java'  # Handle special cases for Jython
27IGNORE_ERRORS_ON_CLOSE = set([errno.ECONNABORTED, errno.ECONNRESET])
28DEFAULT_LISTEN_QUEUE_SIZE = 5
29DEFAULT_MIN_THREADS = 10
30DEFAULT_MAX_THREADS = 0
31DEFAULTS = dict(LISTEN_QUEUE_SIZE=DEFAULT_LISTEN_QUEUE_SIZE,
32                MIN_THREADS=DEFAULT_MIN_THREADS,
33                MAX_THREADS=DEFAULT_MAX_THREADS)
34
35PY3K = not PY2
36
37
38class NullHandler(logging.Handler):
39    """A Logging handler to prevent library errors."""
40    def emit(self, record):
41        pass
42
43b = to_bytes
44u = to_unicode
45
46# Import Package Modules
47# package imports removed in monolithic build
48
49__all__ = ['VERSION', 'SERVER_SOFTWARE', 'HTTP_SERVER_SOFTWARE', 'BUF_SIZE',
50           'IS_JYTHON', 'IGNORE_ERRORS_ON_CLOSE', 'DEFAULTS', 'PY3K', 'b', 'u',
51           'Rocket', 'CherryPyWSGIServer', 'SERVER_NAME', 'NullHandler']
52
53# Monolithic build...end of module: rocket/__init__.py
54# Monolithic build...start of module: rocket/connection.py
55
56# Import System Modules
57import sys
58import time
59import socket
60try:
61    import ssl
62    has_ssl = True
63except ImportError:
64    has_ssl = False
65# Import Package Modules
66# package imports removed in monolithic build
67# TODO - This part is still very experimental.
68# from .filelike import FileLikeSocket
69
70
71class Connection(object):
72    __slots__ = [
73        'setblocking',
74        'sendall',
75        'shutdown',
76        'makefile',
77        'fileno',
78        'client_addr',
79        'client_port',
80        'server_port',
81        'socket',
82        'start_time',
83        'ssl',
84        'secure',
85        'recv',
86        'send',
87        'read',
88        'write'
89    ]
90
91    def __init__(self, sock_tuple, port, secure=False):
92        self.client_addr, self.client_port = sock_tuple[1][:2]
93        self.server_port = port
94        self.socket = sock_tuple[0]
95        self.start_time = time.time()
96        self.ssl = has_ssl and isinstance(self.socket, ssl.SSLSocket)
97        self.secure = secure
98
99        if IS_JYTHON:
100            # In Jython we must set TCP_NODELAY here since it does not
101            # inherit from the listening socket.
102            # See: http://bugs.jython.org/issue1309
103            self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
104
105        self.socket.settimeout(SOCKET_TIMEOUT)
106
107        self.shutdown = self.socket.shutdown
108        self.fileno = self.socket.fileno
109        self.setblocking = self.socket.setblocking
110        self.recv = self.socket.recv
111        self.send = self.socket.send
112        self.makefile = self.socket.makefile
113
114        if sys.platform == 'darwin':
115            self.sendall = self._sendall_darwin
116        else:
117            self.sendall = self.socket.sendall
118
119    def _sendall_darwin(self, buf):
120        pending = len(buf)
121        offset = 0
122        while pending:
123            try:
124                sent = self.socket.send(buf[offset:])
125                pending -= sent
126                offset += sent
127            except socket.error:
128                import errno
129                info = sys.exc_info()
130                if info[1].args[0] != errno.EAGAIN:
131                    raise
132        return offset
133
134# FIXME - this is not ready for prime-time yet.
135#    def makefile(self, buf_size=BUF_SIZE):
136#        return FileLikeSocket(self, buf_size)
137
138    def close(self):
139        if hasattr(self.socket, '_sock'):
140            try:
141                self.socket._sock.close()
142            except socket.error:
143                info = sys.exc_info()
144                if info[1].args[0] != socket.EBADF:
145                    raise info[1]
146                else:
147                    pass
148        self.socket.close()
149
150# Monolithic build...end of module: rocket/connection.py
151# Monolithic build...start of module: rocket/filelike.py
152
153# Import System Modules
154import socket
155# Import Package Modules
156# package imports removed in monolithic build
157
158
159class FileLikeSocket(object):
160    def __init__(self, conn, buf_size=BUF_SIZE):
161        self.conn = conn
162        self.buf_size = buf_size
163        self.buffer = StringIO()
164        self.content_length = None
165
166        if self.conn.socket.gettimeout() == 0.0:
167            self.read = self.non_blocking_read
168        else:
169            self.read = self.blocking_read
170
171    def __iter__(self):
172        return self
173
174    def recv(self, size):
175        while True:
176            try:
177                return self.conn.recv(size)
178            except socket.error:
179                exc = sys.exc_info()
180                e = exc[1]
181                # FIXME - Don't raise socket_errors_nonblocking or socket_error_eintr
182                if (e.args[0] not in set()):
183                    raise
184
185    def next(self):
186        data = self.readline()
187        if data == '':
188            raise StopIteration
189        return data
190
191    def non_blocking_read(self, size=None):
192        # Shamelessly adapted from Cherrypy!
193        bufr = self.buffer
194        bufr.seek(0, 2)
195        if size is None:
196            while True:
197                data = self.recv(self.buf_size)
198                if not data:
199                    break
200                bufr.write(data)
201
202            self.buffer = StringIO()
203
204            return bufr.getvalue()
205        else:
206            buf_len = self.buffer.tell()
207            if buf_len >= size:
208                bufr.seek(0)
209                data = bufr.read(size)
210                self.buffer = StringIO(bufr.read())
211                return data
212
213            self.buffer = StringIO()
214            while True:
215                remaining = size - buf_len
216                data = self.recv(remaining)
217
218                if not data:
219                    break
220
221                n = len(data)
222                if n == size and not buf_len:
223                    return data
224
225                if n == remaining:
226                    bufr.write(data)
227                    del data
228                    break
229
230                bufr.write(data)
231                buf_len += n
232                del data
233
234            return bufr.getvalue()
235
236    def blocking_read(self, length=None):
237        if length is None:
238            if self.content_length is not None:
239                length = self.content_length
240            else:
241                length = 1
242
243        try:
244            data = self.conn.recv(length)
245        except:
246            data = b('')
247
248        return data
249
250    def readline(self):
251        data = b("")
252        char = self.read(1)
253        while char != b('\n') and char is not b(''):
254            line = repr(char)
255            data += char
256            char = self.read(1)
257        data += char
258        return data
259
260    def readlines(self, hint="ignored"):
261        return list(self)
262
263    def close(self):
264        self.conn = None
265        self.content_length = None
266
267# Monolithic build...end of module: rocket/filelike.py
268# Monolithic build...start of module: rocket/futures.py
269
270# Import System Modules
271import time
272try:
273    from concurrent.futures import Future, ThreadPoolExecutor
274    from concurrent.futures.thread import _WorkItem
275    has_futures = True
276except ImportError:
277    has_futures = False
278
279    class Future(object):
280        pass
281
282    class ThreadPoolExecutor(object):
283        pass
284
285    class _WorkItem(object):
286        pass
287
288
289class WSGIFuture(Future):
290    def __init__(self, f_dict, *args, **kwargs):
291        Future.__init__(self, *args, **kwargs)
292
293        self.timeout = None
294
295        self._mem_dict = f_dict
296        self._lifespan = 30
297        self._name = None
298        self._start_time = time.time()
299
300    def set_running_or_notify_cancel(self):
301        if time.time() - self._start_time >= self._lifespan:
302            self.cancel()
303        else:
304            return super(WSGIFuture, self).set_running_or_notify_cancel()
305
306    def remember(self, name, lifespan=None):
307        self._lifespan = lifespan or self._lifespan
308
309        if name in self._mem_dict:
310            raise NameError('Cannot remember future by name "%s".  ' % name +
311                            'A future already exists with that name.')
312        self._name = name
313        self._mem_dict[name] = self
314
315        return self
316
317    def forget(self):
318        if self._name in self._mem_dict and self._mem_dict[self._name] is self:
319            del self._mem_dict[self._name]
320            self._name = None
321
322
323class _WorkItem(object):
324    def __init__(self, future, fn, args, kwargs):
325        self.future = future
326        self.fn = fn
327        self.args = args
328        self.kwargs = kwargs
329
330    def run(self):
331        if not self.future.set_running_or_notify_cancel():
332            return
333
334        try:
335            result = self.fn(*self.args, **self.kwargs)
336        except BaseException:
337            e = sys.exc_info()[1]
338            self.future.set_exception(e)
339        else:
340            self.future.set_result(result)
341
342
343class WSGIExecutor(ThreadPoolExecutor):
344    multithread = True
345    multiprocess = False
346
347    def __init__(self, *args, **kwargs):
348        ThreadPoolExecutor.__init__(self, *args, **kwargs)
349
350        self.futures = dict()
351
352    def submit(self, fn, *args, **kwargs):
353        if self._shutdown_lock.acquire():
354            if self._shutdown:
355                self._shutdown_lock.release()
356                raise RuntimeError(
357                    'Cannot schedule new futures after shutdown')
358
359            f = WSGIFuture(self.futures)
360            w = _WorkItem(f, fn, args, kwargs)
361
362            self._work_queue.put(w)
363            self._adjust_thread_count()
364            self._shutdown_lock.release()
365            return f
366        else:
367            return False
368
369
370class FuturesMiddleware(object):
371    """Futures middleware that adds a Futures Executor to the environment"""
372    def __init__(self, app, threads=5):
373        self.app = app
374        self.executor = WSGIExecutor(threads)
375
376    def __call__(self, environ, start_response):
377        environ["wsgiorg.executor"] = self.executor
378        environ["wsgiorg.futures"] = self.executor.futures
379        return self.app(environ, start_response)
380
381# Monolithic build...end of module: rocket/futures.py
382# Monolithic build...start of module: rocket/listener.py
383
384# Import System Modules
385import os
386import socket
387import logging
388import traceback
389from threading import Thread
390
391try:
392    import ssl
393    from ssl import SSLError
394    has_ssl = True
395except ImportError:
396    has_ssl = False
397
398    class SSLError(socket.error):
399        pass
400# Import Package Modules
401# package imports removed in monolithic build
402
403
404class Listener(Thread):
405    """The Listener class is a class responsible for accepting connections
406    and queuing them to be processed by a worker thread."""
407
408    def __init__(self, interface, queue_size, active_queue, *args, **kwargs):
409        Thread.__init__(self, *args, **kwargs)
410
411        # Instance variables
412        self.active_queue = active_queue
413        self.interface = interface
414        self.addr = interface[0]
415        self.port = interface[1]
416        self.secure = len(interface) >= 4
417        self.clientcert_req = (len(interface) == 5 and interface[4])
418
419        self.thread = None
420        self.ready = False
421
422        # Error Log
423        self.err_log = logging.getLogger('Rocket.Errors.Port%i' % self.port)
424        self.err_log.addHandler(NullHandler())
425
426        # Build the socket
427        if ':' in self.addr:
428            listener = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
429        else:
430            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
431
432        if not listener:
433            self.err_log.error("Failed to get socket.")
434            return
435
436        if self.secure:
437            if not has_ssl:
438                self.err_log.error("ssl module required to serve HTTPS.")
439                return
440            elif not os.path.exists(interface[2]):
441                data = (interface[2], interface[0], interface[1])
442                self.err_log.error("Cannot find key file "
443                                   "'%s'.  Cannot bind to %s:%s" % data)
444                return
445            elif not os.path.exists(interface[3]):
446                data = (interface[3], interface[0], interface[1])
447                self.err_log.error("Cannot find certificate file "
448                                   "'%s'.  Cannot bind to %s:%s" % data)
449                return
450
451            if self.clientcert_req and not os.path.exists(interface[4]):
452                data = (interface[4], interface[0], interface[1])
453                self.err_log.error("Cannot find root ca certificate file "
454                                   "'%s'.  Cannot bind to %s:%s" % data)
455                return
456
457        # Set socket options
458        try:
459            listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
460        except:
461            msg = "Cannot share socket.  Using %s:%i exclusively."
462            self.err_log.warning(msg % (self.addr, self.port))
463
464        try:
465            if not IS_JYTHON:
466                listener.setsockopt(socket.IPPROTO_TCP,
467                                    socket.TCP_NODELAY,
468                                    1)
469        except:
470            msg = "Cannot set TCP_NODELAY, things might run a little slower"
471            self.err_log.warning(msg)
472
473        try:
474            listener.bind((self.addr, self.port))
475        except:
476            msg = "Socket %s:%i in use by other process and it won't share."
477            self.err_log.error(msg % (self.addr, self.port))
478        else:
479            # We want socket operations to timeout periodically so we can
480            # check if the server is shutting down
481            listener.settimeout(THREAD_STOP_CHECK_INTERVAL)
482            # Listen for new connections allowing queue_size number of
483            # connections to wait before rejecting a connection.
484            listener.listen(queue_size)
485
486            self.listener = listener
487
488            self.ready = True
489
490    def wrap_socket(self, sock):
491        try:
492            if self.clientcert_req:
493                ca_certs = self.interface[4]
494                cert_reqs = ssl.CERT_OPTIONAL
495                sock = ssl.wrap_socket(sock,
496                                       keyfile=self.interface[2],
497                                       certfile=self.interface[3],
498                                       server_side=True,
499                                       cert_reqs=cert_reqs,
500                                       ca_certs=ca_certs,
501                                       ssl_version=ssl.PROTOCOL_SSLv23)
502            else:
503                sock = ssl.wrap_socket(sock,
504                                       keyfile=self.interface[2],
505                                       certfile=self.interface[3],
506                                       server_side=True,
507                                       ssl_version=ssl.PROTOCOL_SSLv23)
508        except SSLError:
509            # Generally this happens when an HTTP request is received on a
510            # secure socket. We don't do anything because it will be detected
511            # by Worker and dealt with appropriately.
512            pass
513
514        return sock
515
516    def start(self):
517        if not self.ready:
518            self.err_log.warning('Listener started when not ready.')
519            return
520
521        if self.thread is not None and self.thread.isAlive():
522            self.err_log.warning('Listener already running.')
523            return
524
525        self.thread = Thread(target=self.listen, name="Port" + str(self.port))
526
527        self.thread.start()
528
529    def isAlive(self):
530        if self.thread is None:
531            return False
532
533        return self.thread.isAlive()
534
535    def join(self):
536        if self.thread is None:
537            return
538
539        self.ready = False
540
541        self.thread.join()
542
543        del self.thread
544        self.thread = None
545        self.ready = True
546
547    def listen(self):
548        if __debug__:
549            self.err_log.debug('Entering main loop.')
550        while True:
551            try:
552                sock, addr = self.listener.accept()
553
554                if self.secure:
555                    sock = self.wrap_socket(sock)
556
557                self.active_queue.put(((sock, addr),
558                                       self.interface[1],
559                                       self.secure))
560
561            except socket.timeout:
562                # socket.timeout will be raised every
563                # THREAD_STOP_CHECK_INTERVAL seconds.  When that happens,
564                # we check if it's time to die.
565
566                if not self.ready:
567                    if __debug__:
568                        self.err_log.debug('Listener exiting.')
569                    return
570                else:
571                    continue
572            except:
573                self.err_log.error(traceback.format_exc())
574
575# Monolithic build...end of module: rocket/listener.py
576# Monolithic build...start of module: rocket/main.py
577
578# Import System Modules
579import sys
580import time
581import socket
582import logging
583import traceback
584from threading import Lock
585if PY3K:
586    from queue import Queue
587else:
588    from Queue import Queue
589
590# Import Package Modules
591# package imports removed in monolithic build
592
593# Setup Logging
594log = logging.getLogger('Rocket')
595log.addHandler(NullHandler())
596
597
598class Rocket(object):
599    """The Rocket class is responsible for handling threads and accepting and
600    dispatching connections."""
601
602    def __init__(self,
603                 interfaces=('127.0.0.1', 8000),
604                 method='wsgi',
605                 app_info=None,
606                 min_threads=None,
607                 max_threads=None,
608                 queue_size=None,
609                 timeout=600,
610                 handle_signals=True):
611
612        self.handle_signals = handle_signals
613        self.startstop_lock = Lock()
614        self.timeout = timeout
615
616        if not isinstance(interfaces, list):
617            self.interfaces = [interfaces]
618        else:
619            self.interfaces = interfaces
620
621        if min_threads is None:
622            min_threads = DEFAULTS['MIN_THREADS']
623
624        if max_threads is None:
625            max_threads = DEFAULTS['MAX_THREADS']
626
627        if not queue_size:
628            if hasattr(socket, 'SOMAXCONN'):
629                queue_size = socket.SOMAXCONN
630            else:
631                queue_size = DEFAULTS['LISTEN_QUEUE_SIZE']
632
633        if max_threads and queue_size > max_threads:
634            queue_size = max_threads
635
636        if isinstance(app_info, dict):
637            app_info['server_software'] = SERVER_SOFTWARE
638
639        self.monitor_queue = Queue()
640        self.active_queue = Queue()
641
642        self._threadpool = ThreadPool(get_method(method),
643                                      app_info=app_info,
644                                      active_queue=self.active_queue,
645                                      monitor_queue=self.monitor_queue,
646                                      min_threads=min_threads,
647                                      max_threads=max_threads)
648
649        # Build our socket listeners
650        self.listeners = [Listener(
651            i, queue_size, self.active_queue) for i in self.interfaces]
652        for ndx in range(len(self.listeners) - 1, 0, -1):
653            if not self.listeners[ndx].ready:
654                del self.listeners[ndx]
655
656        if not self.listeners:
657            log.critical("No interfaces to listen on...closing.")
658            sys.exit(1)
659
660    def _sigterm(self, signum, frame):
661        log.info('Received SIGTERM')
662        self.stop()
663
664    def _sighup(self, signum, frame):
665        log.info('Received SIGHUP')
666        self.restart()
667
668    def start(self, background=False):
669        log.info('Starting %s' % SERVER_SOFTWARE)
670
671        self.startstop_lock.acquire()
672
673        try:
674            # Set up our shutdown signals
675            if self.handle_signals:
676                try:
677                    import signal
678                    signal.signal(signal.SIGTERM, self._sigterm)
679                    signal.signal(signal.SIGUSR1, self._sighup)
680                except:
681                    log.debug('This platform does not support signals.')
682
683            # Start our worker threads
684            self._threadpool.start()
685
686            # Start our monitor thread
687            self._monitor = Monitor(self.monitor_queue,
688                                    self.active_queue,
689                                    self.timeout,
690                                    self._threadpool)
691            self._monitor.setDaemon(True)
692            self._monitor.start()
693
694            # I know that EXPR and A or B is bad but I'm keeping it for Py2.4
695            # compatibility.
696            str_extract = lambda l: (l.addr, l.port, l.secure and '*' or '')
697
698            msg = 'Listening on sockets: '
699            msg += ', '.join(
700                ['%s:%i%s' % str_extract(l) for l in self.listeners])
701            log.info(msg)
702
703            for l in self.listeners:
704                l.start()
705
706        finally:
707            self.startstop_lock.release()
708
709        if background:
710            return
711
712        while self._monitor.isAlive():
713            try:
714                time.sleep(THREAD_STOP_CHECK_INTERVAL)
715            except KeyboardInterrupt:
716                # Capture a keyboard interrupt when running from a console
717                break
718            except:
719                if self._monitor.isAlive():
720                    log.error(traceback.format_exc())
721                    continue
722
723        return self.stop()
724
725    def stop(self, stoplogging=False):
726        log.info('Stopping %s' % SERVER_SOFTWARE)
727
728        self.startstop_lock.acquire()
729
730        try:
731            # Stop listeners
732            for l in self.listeners:
733                l.ready = False
734
735            # Encourage a context switch
736            time.sleep(0.01)
737
738            for l in self.listeners:
739                if l.isAlive():
740                    l.join()
741
742            # Stop Monitor
743            self._monitor.stop()
744            if self._monitor.isAlive():
745                self._monitor.join()
746
747            # Stop Worker threads
748            self._threadpool.stop()
749
750            if stoplogging:
751                logging.shutdown()
752                msg = "Calling logging.shutdown() is now the responsibility of \
753                       the application developer.  Please update your \
754                       applications to no longer call rocket.stop(True)"
755                try:
756                    raise DeprecationWarning(msg)
757                except ImportError:
758                    raise RuntimeError(msg)
759
760        finally:
761            self.startstop_lock.release()
762
763    def restart(self):
764        self.stop()
765        self.start()
766
767
768def CherryPyWSGIServer(bind_addr,
769                       wsgi_app,
770                       numthreads=10,
771                       server_name=None,
772                       max=-1,
773                       request_queue_size=5,
774                       timeout=10,
775                       shutdown_timeout=5):
776    """ A Cherrypy wsgiserver-compatible wrapper. """
777    max_threads = max
778    if max_threads < 0:
779        max_threads = 0
780    return Rocket(bind_addr, 'wsgi', {'wsgi_app': wsgi_app},
781                  min_threads=numthreads,
782                  max_threads=max_threads,
783                  queue_size=request_queue_size,
784                  timeout=timeout)
785
786# Monolithic build...end of module: rocket/main.py
787# Monolithic build...start of module: rocket/monitor.py
788
789# Import System Modules
790import time
791import logging
792import select
793from threading import Thread
794
795# Import Package Modules
796# package imports removed in monolithic build
797
798
799class Monitor(Thread):
800    # Monitor worker class.
801
802    def __init__(self,
803                 monitor_queue,
804                 active_queue,
805                 timeout,
806                 threadpool,
807                 *args,
808                 **kwargs):
809
810        Thread.__init__(self, *args, **kwargs)
811
812        self._threadpool = threadpool
813
814        # Instance Variables
815        self.monitor_queue = monitor_queue
816        self.active_queue = active_queue
817        self.timeout = timeout
818
819        self.log = logging.getLogger('Rocket.Monitor')
820        self.log.addHandler(NullHandler())
821
822        self.connections = set()
823        self.active = False
824
825    def run(self):
826        self.active = True
827        conn_list = list()
828        list_changed = False
829
830        # We need to make sure the queue is empty before we start
831        while not self.monitor_queue.empty():
832            self.monitor_queue.get()
833
834        if __debug__:
835            self.log.debug('Entering monitor loop.')
836
837        # Enter thread main loop
838        while self.active:
839
840            # Move the queued connections to the selection pool
841            while not self.monitor_queue.empty():
842                if __debug__:
843                    self.log.debug('In "receive timed-out connections" loop.')
844
845                c = self.monitor_queue.get()
846
847                if c is None:
848                    # A non-client is a signal to die
849                    if __debug__:
850                        self.log.debug('Received a death threat.')
851                    self.stop()
852                    break
853
854                self.log.debug('Received a timed out connection.')
855
856                if __debug__:
857                    assert(c not in self.connections)
858
859                if IS_JYTHON:
860                    # Jython requires a socket to be in Non-blocking mode in
861                    # order to select on it.
862                    c.setblocking(False)
863
864                if __debug__:
865                    self.log.debug('Adding connection to monitor list.')
866
867                self.connections.add(c)
868                list_changed = True
869
870            # Wait on those connections
871            if list_changed:
872                conn_list = list(self.connections)
873                list_changed = False
874
875            try:
876                if len(conn_list):
877                    readable = select.select(conn_list,
878                                             [],
879                                             [],
880                                             THREAD_STOP_CHECK_INTERVAL)[0]
881                else:
882                    time.sleep(THREAD_STOP_CHECK_INTERVAL)
883                    readable = []
884
885                if not self.active:
886                    break
887
888                # If we have any readable connections, put them back
889                for r in readable:
890                    if __debug__:
891                        self.log.debug('Restoring readable connection')
892
893                    if IS_JYTHON:
894                        # Jython requires a socket to be in Non-blocking mode in
895                        # order to select on it, but the rest of the code requires
896                        # that it be in blocking mode.
897                        r.setblocking(True)
898
899                    r.start_time = time.time()
900                    self.active_queue.put(r)
901
902                    self.connections.remove(r)
903                    list_changed = True
904
905            except:
906                if self.active:
907                    raise
908                else:
909                    break
910
911            # If we have any stale connections, kill them off.
912            if self.timeout:
913                now = time.time()
914                stale = set()
915                for c in self.connections:
916                    if (now - c.start_time) >= self.timeout:
917                        stale.add(c)
918
919                for c in stale:
920                    if __debug__:
921                        # "EXPR and A or B" kept for Py2.4 compatibility
922                        data = (
923                            c.client_addr, c.server_port, c.ssl and '*' or '')
924                        self.log.debug(
925                            'Flushing stale connection: %s:%i%s' % data)
926
927                    self.connections.remove(c)
928                    list_changed = True
929
930                    try:
931                        c.close()
932                    finally:
933                        del c
934
935            # Dynamically resize the threadpool to adapt to our changing needs.
936            self._threadpool.dynamic_resize()
937
938    def stop(self):
939        self.active = False
940
941        if __debug__:
942            self.log.debug('Flushing waiting connections')
943
944        while self.connections:
945            c = self.connections.pop()
946            try:
947                c.close()
948            finally:
949                del c
950
951        if __debug__:
952            self.log.debug('Flushing queued connections')
953
954        while not self.monitor_queue.empty():
955            c = self.monitor_queue.get()
956
957            if c is None:
958                continue
959
960            try:
961                c.close()
962            finally:
963                del c
964
965        # Place a None sentry value to cause the monitor to die.
966        self.monitor_queue.put(None)
967
968# Monolithic build...end of module: rocket/monitor.py
969# Monolithic build...start of module: rocket/threadpool.py
970
971# Import System Modules
972import logging
973# Import Package Modules
974# package imports removed in monolithic build
975
976
977# Setup Logging
978log = logging.getLogger('Rocket.Errors.ThreadPool')
979log.addHandler(NullHandler())
980
981
982class ThreadPool:
983    """The ThreadPool class is a container class for all the worker threads. It
984    manages the number of actively running threads."""
985
986    def __init__(self,
987                 method,
988                 app_info,
989                 active_queue,
990                 monitor_queue,
991                 min_threads=DEFAULTS['MIN_THREADS'],
992                 max_threads=DEFAULTS['MAX_THREADS'],
993                 ):
994
995        if __debug__:
996            log.debug("Initializing ThreadPool.")
997
998        self.check_for_dead_threads = 0
999        self.active_queue = active_queue
1000
1001        self.worker_class = method
1002        self.min_threads = min_threads
1003        self.max_threads = max_threads
1004        self.monitor_queue = monitor_queue
1005        self.stop_server = False
1006        self.alive = False
1007
1008        # TODO - Optimize this based on some real-world usage data
1009        self.grow_threshold = int(max_threads / 10) + 2
1010
1011        if not isinstance(app_info, dict):
1012            app_info = dict()
1013
1014        if has_futures and app_info.get('futures'):
1015            app_info['executor'] = WSGIExecutor(max([DEFAULTS['MIN_THREADS'],
1016                                                     2]))
1017
1018        app_info.update(max_threads=max_threads,
1019                        min_threads=min_threads)
1020
1021        self.min_threads = min_threads
1022        self.app_info = app_info
1023
1024        self.threads = set()
1025
1026    def start(self):
1027        self.stop_server = False
1028        if __debug__:
1029            log.debug("Starting threads.")
1030
1031        self.grow(self.min_threads)
1032
1033        self.alive = True
1034
1035    def stop(self):
1036        self.alive = False
1037
1038        if __debug__:
1039            log.debug("Stopping threads.")
1040
1041        self.stop_server = True
1042
1043        # Prompt the threads to die
1044        self.shrink(len(self.threads))
1045
1046        # Stop futures initially
1047        if has_futures and self.app_info.get('futures'):
1048            if __debug__:
1049                log.debug("Future executor is present.  Python will not "
1050                          "exit until all jobs have finished.")
1051            self.app_info['executor'].shutdown(wait=False)
1052
1053        # Give them the gun
1054        # active_threads = [t for t in self.threads if t.isAlive()]
1055        # while active_threads:
1056        #     t = active_threads.pop()
1057        #     t.kill()
1058
1059        # Wait until they pull the trigger
1060        for t in self.threads:
1061            if t.isAlive():
1062                t.join()
1063
1064        # Clean up the mess
1065        self.bring_out_your_dead()
1066
1067    def bring_out_your_dead(self):
1068        # Remove dead threads from the pool
1069
1070        dead_threads = [t for t in self.threads if not t.isAlive()]
1071        for t in dead_threads:
1072            if __debug__:
1073                log.debug("Removing dead thread: %s." % t.getName())
1074            try:
1075                # Py2.4 complains here so we put it in a try block
1076                self.threads.remove(t)
1077            except:
1078                pass
1079        self.check_for_dead_threads -= len(dead_threads)
1080
1081    def grow(self, amount=None):
1082        if self.stop_server:
1083            return
1084
1085        if not amount:
1086            amount = self.max_threads
1087
1088        if self.alive:
1089            amount = min([amount, self.max_threads - len(self.threads)])
1090
1091        if __debug__:
1092            log.debug("Growing by %i." % amount)
1093
1094        for x in range(amount):
1095            worker = self.worker_class(self.app_info,
1096                                       self.active_queue,
1097                                       self.monitor_queue)
1098
1099            worker.setDaemon(True)
1100            self.threads.add(worker)
1101            worker.start()
1102
1103    def shrink(self, amount=1):
1104        if __debug__:
1105            log.debug("Shrinking by %i." % amount)
1106
1107        self.check_for_dead_threads += amount
1108
1109        for x in range(amount):
1110            self.active_queue.put(None)
1111
1112    def dynamic_resize(self):
1113        if (self.max_threads > self.min_threads or self.max_threads == 0):
1114            if self.check_for_dead_threads > 0:
1115                self.bring_out_your_dead()
1116
1117            queueSize = self.active_queue.qsize()
1118            threadCount = len(self.threads)
1119
1120            if __debug__:
1121                log.debug("Examining ThreadPool. %i threads and %i Q'd conxions"
1122                          % (threadCount, queueSize))
1123
1124            if queueSize == 0 and threadCount > self.min_threads:
1125                self.shrink()
1126
1127            elif queueSize > self.grow_threshold:
1128
1129                self.grow(queueSize)
1130
1131# Monolithic build...end of module: rocket/threadpool.py
1132# Monolithic build...start of module: rocket/worker.py
1133
1134# Import System Modules
1135import re
1136import sys
1137import socket
1138import logging
1139import traceback
1140from wsgiref.headers import Headers
1141from threading import Thread
1142from datetime import datetime
1143
1144try:
1145    from ssl import SSLError
1146except ImportError:
1147    class SSLError(socket.error):
1148        pass
1149# Import Package Modules
1150# package imports removed in monolithic build
1151
1152
1153# Define Constants
1154re_SLASH = re.compile('%2F', re.IGNORECASE)
1155re_REQUEST_LINE = re.compile(r"""^
1156(?P<method>OPTIONS|GET|HEAD|POST|PUT|DELETE|PATCH|TRACE|CONNECT) # Req Method
1157\                                                                # single space
1158(
1159    (?P<scheme>[^:/]+)                                           # Scheme
1160    (://)  #
1161    (?P<host>[^/]+)                                              # Host
1162)? #
1163(?P<path>(\*|/[^ \?]*))                                          # Path
1164(\? (?P<query_string>[^ ]*))?                                    # Query String
1165\                                                                # single space
1166(?P<protocol>HTTPS?/1\.[01])                                     # Protocol
1167$
1168""", re.X)
1169LOG_LINE = '%(client_ip)s - "%(request_line)s" - %(status)s %(size)s'
1170RESPONSE = '''\
1171%s %s
1172Content-Length: %i
1173Content-Type: %s
1174
1175%s
1176'''
1177if IS_JYTHON:
1178    HTTP_METHODS = set(['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT',
1179                        'DELETE', 'TRACE', 'CONNECT'])
1180
1181
1182class Worker(Thread):
1183    """The Worker class is a base class responsible for receiving connections
1184    and (a subclass) will run an application to process the the connection """
1185
1186    def __init__(self,
1187                 app_info,
1188                 active_queue,
1189                 monitor_queue,
1190                 *args,
1191                 **kwargs):
1192
1193        Thread.__init__(self, *args, **kwargs)
1194
1195        # Instance Variables
1196        self.app_info = app_info
1197        self.active_queue = active_queue
1198        self.monitor_queue = monitor_queue
1199
1200        self.size = 0
1201        self.status = "200 OK"
1202        self.closeConnection = True
1203        self.request_line = ""
1204        self.protocol = 'HTTP/1.1'
1205
1206        # Request Log
1207        self.req_log = logging.getLogger('Rocket.Requests')
1208        self.req_log.addHandler(NullHandler())
1209
1210        # Error Log
1211        self.err_log = logging.getLogger('Rocket.Errors.' + self.getName())
1212        self.err_log.addHandler(NullHandler())
1213
1214    def _handleError(self, typ, val, tb):
1215        if typ == SSLError:
1216            if 'timed out' in str(val.args[0]):
1217                typ = SocketTimeout
1218        if typ == SocketTimeout:
1219            if __debug__:
1220                self.err_log.debug('Socket timed out')
1221            self.monitor_queue.put(self.conn)
1222            return True
1223        if typ == SocketClosed:
1224            self.closeConnection = True
1225            if __debug__:
1226                self.err_log.debug('Client closed socket')
1227            return False
1228        if typ == BadRequest:
1229            self.closeConnection = True
1230            if __debug__:
1231                self.err_log.debug('Client sent a bad request')
1232            return True
1233        if typ == socket.error:
1234            self.closeConnection = True
1235            if val.args[0] in IGNORE_ERRORS_ON_CLOSE:
1236                if __debug__:
1237                    self.err_log.debug('Ignorable socket Error received...'
1238                                       'closing connection.')
1239                return False
1240            else:
1241                self.status = "999 Utter Server Failure"
1242                tb_fmt = traceback.format_exception(typ, val, tb)
1243                self.err_log.error('Unhandled Error when serving '
1244                                   'connection:\n' + '\n'.join(tb_fmt))
1245                return False
1246
1247        self.closeConnection = True
1248        tb_fmt = traceback.format_exception(typ, val, tb)
1249        self.err_log.error('\n'.join(tb_fmt))
1250        self.send_response('500 Server Error')
1251        return False
1252
1253    def run(self):
1254        if __debug__:
1255            self.err_log.debug('Entering main loop.')
1256
1257        # Enter thread main loop
1258        while True:
1259            conn = self.active_queue.get()
1260
1261            if not conn:
1262                # A non-client is a signal to die
1263                if __debug__:
1264                    self.err_log.debug('Received a death threat.')
1265                return conn
1266
1267            if isinstance(conn, tuple):
1268                conn = Connection(*conn)
1269
1270            self.conn = conn
1271
1272            if conn.ssl != conn.secure:
1273                self.err_log.info('Received HTTP connection on HTTPS port.')
1274                self.send_response('400 Bad Request')
1275                self.closeConnection = True
1276                conn.close()
1277                continue
1278            else:
1279                if __debug__:
1280                    self.err_log.debug('Received a connection.')
1281                self.closeConnection = False
1282
1283            # Enter connection serve loop
1284            while True:
1285                if __debug__:
1286                    self.err_log.debug('Serving a request')
1287                try:
1288                    self.run_app(conn)
1289                except:
1290                    exc = sys.exc_info()
1291                    handled = self._handleError(*exc)
1292                    if handled:
1293                        break
1294                finally:
1295                    if self.request_line:
1296                        log_info = dict(client_ip=conn.client_addr,
1297                                        time=datetime.now().strftime('%c'),
1298                                        status=self.status.split(' ')[0],
1299                                        size=self.size,
1300                                        request_line=self.request_line)
1301                        self.req_log.info(LOG_LINE % log_info)
1302
1303                if self.closeConnection:
1304                    try:
1305                        conn.close()
1306                    except:
1307                        self.err_log.error(str(traceback.format_exc()))
1308
1309                    break
1310
1311    def run_app(self, conn):
1312        # Must be overridden with a method reads the request from the socket
1313        # and sends a response.
1314        self.closeConnection = True
1315        raise NotImplementedError('Overload this method!')
1316
1317    def send_response(self, status):
1318        stat_msg = status.split(' ', 1)[1]
1319        msg = RESPONSE % (self.protocol,
1320                          status,
1321                          len(stat_msg),
1322                          'text/plain',
1323                          stat_msg)
1324        try:
1325            self.conn.sendall(b(msg))
1326        except socket.timeout:
1327            self.closeConnection = True
1328            msg = 'Tried to send "%s" to client but received timeout error'
1329            self.err_log.error(msg % status)
1330        except socket.error:
1331            self.closeConnection = True
1332            msg = 'Tried to send "%s" to client but received socket error'
1333            self.err_log.error(msg % status)
1334
1335    def read_request_line(self, sock_file):
1336        self.request_line = ''
1337        try:
1338            # Grab the request line
1339            d = sock_file.readline()
1340            if PY3K:
1341                d = d.decode('ISO-8859-1')
1342
1343            if d == '\r\n':
1344                # Allow an extra NEWLINE at the beginning per HTTP 1.1 spec
1345                if __debug__:
1346                    self.err_log.debug('Client sent newline')
1347
1348                d = sock_file.readline()
1349                if PY3K:
1350                    d = d.decode('ISO-8859-1')
1351        except socket.timeout:
1352            raise SocketTimeout('Socket timed out before request.')
1353        except TypeError:
1354            raise SocketClosed(
1355                'SSL bug caused closure of socket.  See '
1356                '"https://groups.google.com/d/topic/web2py/P_Gw0JxWzCs".')
1357
1358        d = d.strip()
1359
1360        if not d:
1361            if __debug__:
1362                self.err_log.debug(
1363                    'Client did not send a recognizable request.')
1364            raise SocketClosed('Client closed socket.')
1365
1366        self.request_line = d
1367
1368        # NOTE: I've replaced the traditional method of procedurally breaking
1369        # apart the request line with a (rather unsightly) regular expression.
1370        # However, Java's regexp support sucks so bad that it actually takes
1371        # longer in Jython to process the regexp than procedurally. So I've
1372        # left the old code here for Jython's sake...for now.
1373        if IS_JYTHON:
1374            return self._read_request_line_jython(d)
1375
1376        match = re_REQUEST_LINE.match(d)
1377
1378        if not match:
1379            self.send_response('400 Bad Request')
1380            raise BadRequest
1381
1382        req = match.groupdict()
1383        for k, v in iteritems(req):
1384            if not v:
1385                req[k] = ""
1386            if k == 'path':
1387                req['path'] = r'%2F'.join(
1388                    [urllib_unquote(x) for x in re_SLASH.split(v)])
1389
1390        self.protocol = req['protocol']
1391        return req
1392
1393    def _read_request_line_jython(self, d):
1394        d = d.strip()
1395        try:
1396            method, uri, proto = d.split(' ')
1397            if not proto.startswith('HTTP') or \
1398                    proto[-3:] not in ('1.0', '1.1') or \
1399                    method not in HTTP_METHODS:
1400                self.send_response('400 Bad Request')
1401                raise BadRequest
1402        except ValueError:
1403            self.send_response('400 Bad Request')
1404            raise BadRequest
1405
1406        req = dict(method=method, protocol=proto)
1407        scheme = ''
1408        host = ''
1409        if uri == '*' or uri.startswith('/'):
1410            path = uri
1411        elif '://' in uri:
1412            scheme, rest = uri.split('://')
1413            host, path = rest.split('/', 1)
1414            path = '/' + path
1415        else:
1416            self.send_response('400 Bad Request')
1417            raise BadRequest
1418
1419        query_string = ''
1420        if '?' in path:
1421            path, query_string = path.split('?', 1)
1422
1423        path = r'%2F'.join([urllib_unquote(x) for x in re_SLASH.split(path)])
1424
1425        req.update(path=path,
1426                   query_string=query_string,
1427                   scheme=scheme.lower(),
1428                   host=host)
1429        return req
1430
1431    def read_headers(self, sock_file):
1432        try:
1433            headers = dict()
1434            lname = None
1435            lval = None
1436            while True:
1437                l = sock_file.readline()
1438
1439                if PY3K:
1440                    try:
1441                        l = str(l, 'ISO-8859-1')
1442                    except UnicodeDecodeError:
1443                        self.err_log.warning(
1444                            'Client sent invalid header: ' + repr(l))
1445
1446                if l.strip().replace('\0', '') == '':
1447                    break
1448
1449                if l[0] in ' \t' and lname:
1450                    # Some headers take more than one line
1451                    lval += ' ' + l.strip()
1452                else:
1453                    # HTTP header values are latin-1 encoded
1454                    l = l.split(':', 1)
1455                    # HTTP header names are us-ascii encoded
1456
1457                    lname = l[0].strip().upper().replace('-', '_')
1458                    lval = l[-1].strip()
1459
1460                headers[str(lname)] = str(lval)
1461
1462        except socket.timeout:
1463            raise SocketTimeout("Socket timed out before request.")
1464
1465        return headers
1466
1467
1468class SocketTimeout(Exception):
1469    """Exception for when a socket times out between requests."""
1470    pass
1471
1472
1473class BadRequest(Exception):
1474    """Exception for when a client sends an incomprehensible request."""
1475    pass
1476
1477
1478class SocketClosed(Exception):
1479    """Exception for when a socket is closed by the client."""
1480    pass
1481
1482
1483class ChunkedReader(object):
1484
1485    def __init__(self, sock_file):
1486        self.stream = sock_file
1487        self.chunk_size = 0
1488
1489    def _read_header(self):
1490        chunk_len = ""
1491        try:
1492            while "" == chunk_len:
1493                chunk_len = self.stream.readline().strip()
1494            return int(chunk_len, 16)
1495        except ValueError:
1496            return 0
1497
1498    def read(self, size):
1499        data = b('')
1500        chunk_size = self.chunk_size
1501        while size:
1502            if not chunk_size:
1503                chunk_size = self._read_header()
1504
1505            if size < chunk_size:
1506                data += self.stream.read(size)
1507                chunk_size -= size
1508                break
1509            else:
1510                if not chunk_size:
1511                    break
1512                data += self.stream.read(chunk_size)
1513                size -= chunk_size
1514                chunk_size = 0
1515
1516        self.chunk_size = chunk_size
1517        return data
1518
1519    def readline(self):
1520        data = b('')
1521        c = self.read(1)
1522        while c and c != b('\n'):
1523            data += c
1524            c = self.read(1)
1525        data += c
1526        return data
1527
1528    def readlines(self):
1529        yield self.readline()
1530
1531
1532def get_method(method):
1533    methods = dict(wsgi=WSGIWorker)
1534    return methods[method.lower()]
1535
1536# Monolithic build...end of module: rocket/worker.py
1537# Monolithic build...start of module: rocket/methods/__init__.py
1538
1539# Monolithic build...end of module: rocket/methods/__init__.py
1540# Monolithic build...start of module: rocket/methods/wsgi.py
1541
1542# Import System Modules
1543import sys
1544import socket
1545from wsgiref.headers import Headers
1546from wsgiref.util import FileWrapper
1547
1548# Import Package Modules
1549# package imports removed in monolithic build
1550
1551if PY3K:
1552    from email.utils import formatdate
1553else:
1554    # Caps Utils for Py2.4 compatibility
1555    from email.Utils import formatdate
1556
1557# Define Constants
1558NEWLINE = b('\r\n')
1559HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s'''
1560BASE_ENV = {'SERVER_NAME': SERVER_NAME,
1561            'SCRIPT_NAME': '',  # Direct call WSGI does not need a name
1562            'wsgi.errors': sys.stderr,
1563            'wsgi.version': (1, 0),
1564            'wsgi.multiprocess': False,
1565            'wsgi.run_once': False,
1566            'wsgi.file_wrapper': FileWrapper
1567            }
1568
1569
1570class WSGIWorker(Worker):
1571    def __init__(self, *args, **kwargs):
1572        """Builds some instance variables that will last the life of the
1573        thread."""
1574        Worker.__init__(self, *args, **kwargs)
1575
1576        if isinstance(self.app_info, dict):
1577            multithreaded = self.app_info.get('max_threads') != 1
1578        else:
1579            multithreaded = False
1580        self.base_environ = dict(
1581            {'SERVER_SOFTWARE': self.app_info['server_software'],
1582             'wsgi.multithread': multithreaded,
1583             })
1584        self.base_environ.update(BASE_ENV)
1585
1586        # Grab our application
1587        self.app = self.app_info.get('wsgi_app')
1588
1589        if not hasattr(self.app, "__call__"):
1590            raise TypeError("The wsgi_app specified (%s) is not a valid WSGI application." % repr(self.app))
1591
1592        # Enable futures
1593        if has_futures and self.app_info.get('futures'):
1594            executor = self.app_info['executor']
1595            self.base_environ.update({"wsgiorg.executor": executor,
1596                                      "wsgiorg.futures": executor.futures})
1597
1598    def build_environ(self, sock_file, conn):
1599        """ Build the execution environment. """
1600        # Grab the request line
1601        request = self.read_request_line(sock_file)
1602
1603        # Copy the Base Environment
1604        environ = self.base_environ.copy()
1605
1606        # Grab the headers
1607        for k, v in iteritems(self.read_headers(sock_file)):
1608            environ[str('HTTP_' + k)] = v
1609
1610        # Add CGI Variables
1611        environ['REQUEST_METHOD'] = request['method']
1612        environ['PATH_INFO'] = request['path']
1613        environ['SERVER_PROTOCOL'] = request['protocol']
1614        environ['SERVER_PORT'] = str(conn.server_port)
1615        environ['REMOTE_PORT'] = str(conn.client_port)
1616        environ['REMOTE_ADDR'] = str(conn.client_addr)
1617        environ['QUERY_STRING'] = request['query_string']
1618        if 'HTTP_CONTENT_LENGTH' in environ:
1619            environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH']
1620        if 'HTTP_CONTENT_TYPE' in environ:
1621            environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE']
1622
1623        # Save the request method for later
1624        self.request_method = environ['REQUEST_METHOD']
1625
1626        # Add Dynamic WSGI Variables
1627        if conn.ssl:
1628            environ['wsgi.url_scheme'] = 'https'
1629            environ['HTTPS'] = 'on'
1630            try:
1631                peercert = conn.socket.getpeercert(binary_form=True)
1632                environ['SSL_CLIENT_RAW_CERT'] = \
1633                    peercert and to_native(ssl.DER_cert_to_PEM_cert(peercert))
1634            except Exception:
1635                print(sys.exc_info()[1])
1636        else:
1637            environ['wsgi.url_scheme'] = 'http'
1638
1639        if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
1640            environ['wsgi.input'] = ChunkedReader(sock_file)
1641        else:
1642            environ['wsgi.input'] = sock_file
1643
1644        return environ
1645
1646    def send_headers(self, data, sections):
1647        h_set = self.header_set
1648
1649        # Does the app want us to send output chunked?
1650        self.chunked = h_set.get('Transfer-Encoding', '').lower() == 'chunked'
1651
1652        # Add a Date header if it's not there already
1653        if not 'Date' in h_set:
1654            h_set['Date'] = formatdate(usegmt=True)
1655
1656        # Add a Server header if it's not there already
1657        if not 'Server' in h_set:
1658            h_set['Server'] = HTTP_SERVER_SOFTWARE
1659
1660        if 'Content-Length' in h_set:
1661            self.size = int(h_set['Content-Length'])
1662        else:
1663            s = int(self.status.split(' ')[0])
1664            if (s < 200 or s not in (204, 205, 304)) and not self.chunked:
1665                if sections == 1 or self.protocol != 'HTTP/1.1':
1666                    # Add a Content-Length header because it's not there
1667                    self.size = len(data)
1668                    h_set['Content-Length'] = str(self.size)
1669                else:
1670                    # If they sent us more than one section, we blow chunks
1671                    h_set['Transfer-Encoding'] = 'Chunked'
1672                    self.chunked = True
1673                    if __debug__:
1674                        self.err_log.debug('Adding header...'
1675                                           'Transfer-Encoding: Chunked')
1676
1677        if 'Connection' not in h_set:
1678            # If the application did not provide a connection header,
1679            # fill it in
1680            client_conn = self.environ.get('HTTP_CONNECTION', '').lower()
1681            if self.environ['SERVER_PROTOCOL'] == 'HTTP/1.1':
1682                # HTTP = 1.1 defaults to keep-alive connections
1683                if client_conn:
1684                    h_set['Connection'] = client_conn
1685                else:
1686                    h_set['Connection'] = 'keep-alive'
1687            else:
1688                # HTTP < 1.1 supports keep-alive but it's quirky
1689                # so we don't support it
1690                h_set['Connection'] = 'close'
1691
1692        # Close our connection if we need to.
1693        self.closeConnection = h_set.get('Connection', '').lower() == 'close'
1694
1695        # Build our output headers
1696        header_data = HEADER_RESPONSE % (self.status, str(h_set))
1697
1698        # Send the headers
1699        if __debug__:
1700            self.err_log.debug('Sending Headers: %s' % repr(header_data))
1701        self.conn.sendall(b(header_data))
1702        self.headers_sent = True
1703
1704    def write_warning(self, data, sections=None):
1705        self.err_log.warning('WSGI app called write method directly.  This is '
1706                             'deprecated behavior.  Please update your app.')
1707        return self.write(data, sections)
1708
1709    def write(self, data, sections=None):
1710        """ Write the data to the output socket. """
1711
1712        if self.error[0]:
1713            self.status = self.error[0]
1714            data = b(self.error[1])
1715
1716        if not self.headers_sent:
1717            self.send_headers(data, sections)
1718
1719        if self.request_method != 'HEAD':
1720            try:
1721                if self.chunked:
1722                    self.conn.sendall(b'%x\r\n%s\r\n' % (len(data), to_bytes(data, 'ISO-8859-1')))
1723                else:
1724                    self.conn.sendall(to_bytes(data))
1725            except socket.timeout:
1726                self.closeConnection = True
1727            except socket.error:
1728                # But some clients will close the connection before that
1729                # resulting in a socket error.
1730                self.closeConnection = True
1731
1732    def start_response(self, status, response_headers, exc_info=None):
1733        """ Store the HTTP status and headers to be sent when self.write is
1734        called. """
1735        if exc_info:
1736            try:
1737                if self.headers_sent:
1738                    # Re-raise original exception if headers sent
1739                    # because this violates WSGI specification.
1740                    raise
1741            finally:
1742                exc_info = None
1743        elif self.header_set:
1744            raise AssertionError("Headers already set!")
1745
1746        if PY3K and not isinstance(status, str):
1747            self.status = str(status, 'ISO-8859-1')
1748        else:
1749            self.status = status
1750        # Make sure headers are bytes objects
1751        try:
1752            self.header_set = Headers(response_headers)
1753        except UnicodeDecodeError:
1754            self.error = ('500 Internal Server Error',
1755                          'HTTP Headers should be bytes')
1756            self.err_log.error('Received HTTP Headers from client that contain'
1757                               ' invalid characters for Latin-1 encoding.')
1758
1759        return self.write_warning
1760
1761    def run_app(self, conn):
1762        self.size = 0
1763        self.header_set = Headers([])
1764        self.headers_sent = False
1765        self.error = (None, None)
1766        self.chunked = False
1767        sections = None
1768        output = None
1769
1770        if __debug__:
1771            self.err_log.debug('Getting sock_file')
1772
1773        # Build our file-like object
1774        if PY3K:
1775            sock_file = conn.makefile(mode='rb', buffering=BUF_SIZE)
1776        else:
1777            sock_file = conn.makefile(BUF_SIZE)
1778
1779        try:
1780            # Read the headers and build our WSGI environment
1781            self.environ = environ = self.build_environ(sock_file, conn)
1782
1783            # Handle 100 Continue
1784            if environ.get('HTTP_EXPECT', '') == '100-continue':
1785                res = environ['SERVER_PROTOCOL'] + ' 100 Continue\r\n\r\n'
1786                conn.sendall(b(res))
1787
1788            # Send it to our WSGI application
1789            output = self.app(environ, self.start_response)
1790
1791            if not hasattr(output, '__len__') and not hasattr(output, '__iter__'):
1792                self.error = ('500 Internal Server Error',
1793                              'WSGI applications must return a list or '
1794                              'generator type.')
1795
1796            if hasattr(output, '__len__'):
1797                sections = len(output)
1798
1799            for data in output:
1800                # Don't send headers until body appears
1801                if data:
1802                    self.write(data, sections)
1803
1804            if not self.headers_sent:
1805                # Send headers if the body was empty
1806                self.send_headers('', sections)
1807
1808            if self.chunked and self.request_method != 'HEAD':
1809                # If chunked, send our final chunk length
1810                self.conn.sendall(b('0\r\n\r\n'))
1811
1812        # Don't capture exceptions here.  The Worker class handles
1813        # them appropriately.
1814        finally:
1815            if __debug__:
1816                self.err_log.debug('Finally closing output and sock_file')
1817
1818            if hasattr(output, 'close'):
1819                output.close()
1820
1821            sock_file.close()
1822
1823# Monolithic build...end of module: rocket/methods/wsgi.py
1824def demo_app(environ, start_response):
1825    global static_folder
1826    import os
1827    types = {'htm': 'text/html','html': 'text/html','gif': 'image/gif',
1828             'jpg': 'image/jpeg','png': 'image/png','pdf': 'applications/pdf'}
1829    if static_folder:
1830        if not static_folder.startswith('/'):
1831            static_folder = os.path.join(os.getcwd(),static_folder)
1832        path = os.path.join(static_folder, environ['PATH_INFO'][1:] or 'index.html')
1833        type = types.get(path.split('.')[-1],'text')
1834        if os.path.exists(path):
1835            try:
1836                data = open(path,'rb').read()
1837                start_response('200 OK', [('Content-Type', type)])
1838            except IOError:
1839                start_response('404 NOT FOUND', [])
1840                data = '404 NOT FOUND'
1841        else:
1842            start_response('500 INTERNAL SERVER ERROR', [])
1843            data = '500 INTERNAL SERVER ERROR'
1844    else:
1845        start_response('200 OK', [('Content-Type', 'text/html')])
1846        data = '<html><body><h1>Hello from Rocket Web Server</h1></body></html>'
1847    return [data]
1848
1849def demo():
1850    from optparse import OptionParser
1851    parser = OptionParser()
1852    parser.add_option("-i", "--ip", dest="ip",default="127.0.0.1",
1853                      help="ip address of the network interface")
1854    parser.add_option("-p", "--port", dest="port",default="8000",
1855                      help="post where to run web server")
1856    parser.add_option("-s", "--static", dest="static",default=None,
1857                      help="folder containing static files")
1858    (options, args) = parser.parse_args()
1859    global static_folder
1860    static_folder = options.static
1861    print('Rocket running on %s:%s' % (options.ip, options.port))
1862    r=Rocket((options.ip,int(options.port)),'wsgi', {'wsgi_app':demo_app})
1863    r.start()
1864
1865if __name__=='__main__':
1866    demo()
Note: See TracBrowser for help on using the repository browser.