source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/contrib/gateways/fcgi.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 43.0 KB
Line 
1# Copyright (c) 2002, 2003, 2005, 2006 Allan Saddi <allan@saddi.com>
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions
6# are met:
7# 1. Redistributions of source code must retain the above copyright
8#    notice, this list of conditions and the following disclaimer.
9# 2. Redistributions in binary form must reproduce the above copyright
10#    notice, this list of conditions and the following disclaimer in the
11#    documentation and/or other materials provided with the distribution.
12#
13# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23# SUCH DAMAGE.
24#
25# $Id$
26
27"""
28fcgi - a FastCGI/WSGI gateway.
29
30For more information about FastCGI, see <http://www.fastcgi.com/>.
31
32For more information about the Web Server Gateway Interface, see
33<http://www.python.org/peps/pep-0333.html>.
34
35Example usage:
36
37  #!/usr/bin/env python
38  from myapplication import app # Assume app is your WSGI application object
39  from fcgi import WSGIServer
40  WSGIServer(app).run()
41
42See the documentation for WSGIServer/Server for more information.
43
44On most platforms, fcgi will fallback to regular CGI behavior if run in a
45non-FastCGI context. If you want to force CGI behavior, set the environment
46variable FCGI_FORCE_CGI to "Y" or "y".
47"""
48
49__author__ = 'Allan Saddi <allan@saddi.com>'
50__version__ = '$Revision$'
51
52import sys
53import os
54import signal
55import struct
56import cStringIO as StringIO
57import select
58import socket
59import errno
60import traceback
61
62try:
63    import thread
64    import threading
65    thread_available = True
66except ImportError:
67    import dummy_thread as thread
68    import dummy_threading as threading
69    thread_available = False
70
71# Apparently 2.3 doesn't define SHUT_WR? Assume it is 1 in this case.
72if not hasattr(socket, 'SHUT_WR'):
73    socket.SHUT_WR = 1
74
75__all__ = ['WSGIServer']
76
77# Constants from the spec.
78FCGI_LISTENSOCK_FILENO = 0
79
80FCGI_HEADER_LEN = 8
81
82FCGI_VERSION_1 = 1
83
84FCGI_BEGIN_REQUEST = 1
85FCGI_ABORT_REQUEST = 2
86FCGI_END_REQUEST = 3
87FCGI_PARAMS = 4
88FCGI_STDIN = 5
89FCGI_STDOUT = 6
90FCGI_STDERR = 7
91FCGI_DATA = 8
92FCGI_GET_VALUES = 9
93FCGI_GET_VALUES_RESULT = 10
94FCGI_UNKNOWN_TYPE = 11
95FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE
96
97FCGI_NULL_REQUEST_ID = 0
98
99FCGI_KEEP_CONN = 1
100
101FCGI_RESPONDER = 1
102FCGI_AUTHORIZER = 2
103FCGI_FILTER = 3
104
105FCGI_REQUEST_COMPLETE = 0
106FCGI_CANT_MPX_CONN = 1
107FCGI_OVERLOADED = 2
108FCGI_UNKNOWN_ROLE = 3
109
110FCGI_MAX_CONNS = 'FCGI_MAX_CONNS'
111FCGI_MAX_REQS = 'FCGI_MAX_REQS'
112FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS'
113
114FCGI_Header = '!BBHHBx'
115FCGI_BeginRequestBody = '!HB5x'
116FCGI_EndRequestBody = '!LB3x'
117FCGI_UnknownTypeBody = '!B7x'
118
119FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody)
120FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody)
121
122if __debug__:
123    import time
124
125    # Set non-zero to write debug output to a file.
126    DEBUG = 0
127    DEBUGLOG = '/tmp/fcgi.log'
128
129    def _debug(level, msg):
130        if DEBUG < level:
131            return
132
133        try:
134            f = open(DEBUGLOG, 'a')
135            f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg))
136            f.close()
137        except:
138            pass
139
140class InputStream(object):
141    """
142    File-like object representing FastCGI input streams (FCGI_STDIN and
143    FCGI_DATA). Supports the minimum methods required by WSGI spec.
144    """
145    def __init__(self, conn):
146        self._conn = conn
147
148        # See Server.
149        self._shrinkThreshold = conn.server.inputStreamShrinkThreshold
150
151        self._buf = ''
152        self._bufList = []
153        self._pos = 0 # Current read position.
154        self._avail = 0 # Number of bytes currently available.
155
156        self._eof = False # True when server has sent EOF notification.
157
158    def _shrinkBuffer(self):
159        """Gets rid of already read data (since we can't rewind)."""
160        if self._pos >= self._shrinkThreshold:
161            self._buf = self._buf[self._pos:]
162            self._avail -= self._pos
163            self._pos = 0
164
165            assert self._avail >= 0
166
167    def _waitForData(self):
168        """Waits for more data to become available."""
169        self._conn.process_input()
170
171    def read(self, n=-1):
172        if self._pos == self._avail and self._eof:
173            return ''
174        while True:
175            if n < 0 or (self._avail - self._pos) < n:
176                # Not enough data available.
177                if self._eof:
178                    # And there's no more coming.
179                    newPos = self._avail
180                    break
181                else:
182                    # Wait for more data.
183                    self._waitForData()
184                    continue
185            else:
186                newPos = self._pos + n
187                break
188        # Merge buffer list, if necessary.
189        if self._bufList:
190            self._buf += ''.join(self._bufList)
191            self._bufList = []
192        r = self._buf[self._pos:newPos]
193        self._pos = newPos
194        self._shrinkBuffer()
195        return r
196
197    def readline(self, length=None):
198        if self._pos == self._avail and self._eof:
199            return ''
200        while True:
201            # Unfortunately, we need to merge the buffer list early.
202            if self._bufList:
203                self._buf += ''.join(self._bufList)
204                self._bufList = []
205            # Find newline.
206            i = self._buf.find('\n', self._pos)
207            if i < 0:
208                # Not found?
209                if self._eof:
210                    # No more data coming.
211                    newPos = self._avail
212                    break
213                else:
214                    # Wait for more to come.
215                    self._waitForData()
216                    continue
217            else:
218                newPos = i + 1
219                break
220        if length is not None:
221            if self._pos + length < newPos:
222                newPos = self._pos + length
223        r = self._buf[self._pos:newPos]
224        self._pos = newPos
225        self._shrinkBuffer()
226        return r
227
228    def readlines(self, sizehint=0):
229        total = 0
230        lines = []
231        line = self.readline()
232        while line:
233            lines.append(line)
234            total += len(line)
235            if 0 < sizehint <= total:
236                break
237            line = self.readline()
238        return lines
239
240    def __iter__(self):
241        return self
242
243    def next(self):
244        r = self.readline()
245        if not r:
246            raise StopIteration
247        return r
248
249    def add_data(self, data):
250        if not data:
251            self._eof = True
252        else:
253            self._bufList.append(data)
254            self._avail += len(data)
255
256class MultiplexedInputStream(InputStream):
257    """
258    A version of InputStream meant to be used with MultiplexedConnections.
259    Assumes the MultiplexedConnection (the producer) and the Request
260    (the consumer) are running in different threads.
261    """
262    def __init__(self, conn):
263        super(MultiplexedInputStream, self).__init__(conn)
264
265        # Arbitrates access to this InputStream (it's used simultaneously
266        # by a Request and its owning Connection object).
267        lock = threading.RLock()
268
269        # Notifies Request thread that there is new data available.
270        self._lock = threading.Condition(lock)
271
272    def _waitForData(self):
273        # Wait for notification from add_data().
274        self._lock.wait()
275
276    def read(self, n=-1):
277        self._lock.acquire()
278        try:
279            return super(MultiplexedInputStream, self).read(n)
280        finally:
281            self._lock.release()
282
283    def readline(self, length=None):
284        self._lock.acquire()
285        try:
286            return super(MultiplexedInputStream, self).readline(length)
287        finally:
288            self._lock.release()
289
290    def add_data(self, data):
291        self._lock.acquire()
292        try:
293            super(MultiplexedInputStream, self).add_data(data)
294            self._lock.notify()
295        finally:
296            self._lock.release()
297
298class OutputStream(object):
299    """
300    FastCGI output stream (FCGI_STDOUT/FCGI_STDERR). By default, calls to
301    write() or writelines() immediately result in Records being sent back
302    to the server. Buffering should be done in a higher level!
303    """
304    def __init__(self, conn, req, type, buffered=False):
305        self._conn = conn
306        self._req = req
307        self._type = type
308        self._buffered = buffered
309        self._bufList = [] # Used if buffered is True
310        self.dataWritten = False
311        self.closed = False
312
313    def _write(self, data):
314        length = len(data)
315        while length:
316            toWrite = min(length, self._req.server.maxwrite - FCGI_HEADER_LEN)
317
318            rec = Record(self._type, self._req.requestId)
319            rec.contentLength = toWrite
320            rec.contentData = data[:toWrite]
321            self._conn.writeRecord(rec)
322
323            data = data[toWrite:]
324            length -= toWrite
325
326    def write(self, data):
327        assert not self.closed
328
329        if not data:
330            return
331
332        self.dataWritten = True
333
334        if self._buffered:
335            self._bufList.append(data)
336        else:
337            self._write(data)
338
339    def writelines(self, lines):
340        assert not self.closed
341
342        for line in lines:
343            self.write(line)
344
345    def flush(self):
346        # Only need to flush if this OutputStream is actually buffered.
347        if self._buffered:
348            data = ''.join(self._bufList)
349            self._bufList = []
350            self._write(data)
351
352    # Though available, the following should NOT be called by WSGI apps.
353    def close(self):
354        """Sends end-of-stream notification, if necessary."""
355        if not self.closed and self.dataWritten:
356            self.flush()
357            rec = Record(self._type, self._req.requestId)
358            self._conn.writeRecord(rec)
359            self.closed = True
360
361class TeeOutputStream(object):
362    """
363    Simple wrapper around two or more output file-like objects that copies
364    written data to all streams.
365    """
366    def __init__(self, streamList):
367        self._streamList = streamList
368
369    def write(self, data):
370        for f in self._streamList:
371            f.write(data)
372
373    def writelines(self, lines):
374        for line in lines:
375            self.write(line)
376
377    def flush(self):
378        for f in self._streamList:
379            f.flush()
380
381class StdoutWrapper(object):
382    """
383    Wrapper for sys.stdout so we know if data has actually been written.
384    """
385    def __init__(self, stdout):
386        self._file = stdout
387        self.dataWritten = False
388
389    def write(self, data):
390        if data:
391            self.dataWritten = True
392        self._file.write(data)
393
394    def writelines(self, lines):
395        for line in lines:
396            self.write(line)
397
398    def __getattr__(self, name):
399        return getattr(self._file, name)
400
401def decode_pair(s, pos=0):
402    """
403    Decodes a name/value pair.
404
405    The number of bytes decoded as well as the name/value pair
406    are returned.
407    """
408    nameLength = ord(s[pos])
409    if nameLength & 128:
410        nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
411        pos += 4
412    else:
413        pos += 1
414
415    valueLength = ord(s[pos])
416    if valueLength & 128:
417        valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
418        pos += 4
419    else:
420        pos += 1
421
422    name = s[pos:pos+nameLength]
423    pos += nameLength
424    value = s[pos:pos+valueLength]
425    pos += valueLength
426
427    return (pos, (name, value))
428
429def encode_pair(name, value):
430    """
431    Encodes a name/value pair.
432
433    The encoded string is returned.
434    """
435    nameLength = len(name)
436    if nameLength < 128:
437        s = chr(nameLength)
438    else:
439        s = struct.pack('!L', nameLength | 0x80000000L)
440
441    valueLength = len(value)
442    if valueLength < 128:
443        s += chr(valueLength)
444    else:
445        s += struct.pack('!L', valueLength | 0x80000000L)
446
447    return s + name + value
448
449class Record(object):
450    """
451    A FastCGI Record.
452
453    Used for encoding/decoding records.
454    """
455    def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID):
456        self.version = FCGI_VERSION_1
457        self.type = type
458        self.requestId = requestId
459        self.contentLength = 0
460        self.paddingLength = 0
461        self.contentData = ''
462
463    def _recvall(sock, length):
464        """
465        Attempts to receive length bytes from a socket, blocking if necessary.
466        (Socket may be blocking or non-blocking.)
467        """
468        dataList = []
469        recvLen = 0
470        while length:
471            try:
472                data = sock.recv(length)
473            except socket.error as e:
474                if e.errno == errno.EAGAIN:
475                    select.select([sock], [], [])
476                    continue
477                else:
478                    raise
479            if not data: # EOF
480                break
481            dataList.append(data)
482            dataLen = len(data)
483            recvLen += dataLen
484            length -= dataLen
485        return ''.join(dataList), recvLen
486    _recvall = staticmethod(_recvall)
487
488    def read(self, sock):
489        """Read and decode a Record from a socket."""
490        try:
491            header, length = self._recvall(sock, FCGI_HEADER_LEN)
492        except:
493            raise EOFError
494
495        if length < FCGI_HEADER_LEN:
496            raise EOFError
497
498        self.version, self.type, self.requestId, self.contentLength, \
499                      self.paddingLength = struct.unpack(FCGI_Header, header)
500
501        if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, '
502                             'contentLength = %d' %
503                             (sock.fileno(), self.type, self.requestId,
504                              self.contentLength))
505
506        if self.contentLength:
507            try:
508                self.contentData, length = self._recvall(sock,
509                                                         self.contentLength)
510            except:
511                raise EOFError
512
513            if length < self.contentLength:
514                raise EOFError
515
516        if self.paddingLength:
517            try:
518                self._recvall(sock, self.paddingLength)
519            except:
520                raise EOFError
521
522    def _sendall(sock, data):
523        """
524        Writes data to a socket and does not return until all the data is sent.
525        """
526        length = len(data)
527        while length:
528            try:
529                sent = sock.send(data)
530            except socket.error as e:
531                if e.errno == errno.EAGAIN:
532                    select.select([], [sock], [])
533                    continue
534                else:
535                    raise
536            data = data[sent:]
537            length -= sent
538    _sendall = staticmethod(_sendall)
539
540    def write(self, sock):
541        """Encode and write a Record to a socket."""
542        self.paddingLength = -self.contentLength & 7
543
544        if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, '
545                             'contentLength = %d' %
546                             (sock.fileno(), self.type, self.requestId,
547                              self.contentLength))
548
549        header = struct.pack(FCGI_Header, self.version, self.type,
550                             self.requestId, self.contentLength,
551                             self.paddingLength)
552        self._sendall(sock, header)
553        if self.contentLength:
554            self._sendall(sock, self.contentData)
555        if self.paddingLength:
556            self._sendall(sock, '\x00'*self.paddingLength)
557
558class Request(object):
559    """
560    Represents a single FastCGI request.
561
562    These objects are passed to your handler and is the main interface
563    between your handler and the fcgi module. The methods should not
564    be called by your handler. However, server, params, stdin, stdout,
565    stderr, and data are free for your handler's use.
566    """
567    def __init__(self, conn, inputStreamClass):
568        self._conn = conn
569
570        self.server = conn.server
571        self.params = {}
572        self.stdin = inputStreamClass(conn)
573        self.stdout = OutputStream(conn, self, FCGI_STDOUT)
574        self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True)
575        self.data = inputStreamClass(conn)
576
577    def run(self):
578        """Runs the handler, flushes the streams, and ends the request."""
579        try:
580            protocolStatus, appStatus = self.server.handler(self)
581        except:
582            traceback.print_exc(file=self.stderr)
583            self.stderr.flush()
584            if not self.stdout.dataWritten:
585                self.server.error(self)
586
587            protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0
588
589        if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' %
590                             (protocolStatus, appStatus))
591
592        self._flush()
593        self._end(appStatus, protocolStatus)
594
595    def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE):
596        self._conn.end_request(self, appStatus, protocolStatus)
597
598    def _flush(self):
599        self.stdout.close()
600        self.stderr.close()
601
602class CGIRequest(Request):
603    """A normal CGI request disguised as a FastCGI request."""
604    def __init__(self, server):
605        # These are normally filled in by Connection.
606        self.requestId = 1
607        self.role = FCGI_RESPONDER
608        self.flags = 0
609        self.aborted = False
610
611        self.server = server
612        self.params = dict(os.environ)
613        self.stdin = sys.stdin
614        self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity!
615        self.stderr = sys.stderr
616        self.data = StringIO.StringIO()
617
618    def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE):
619        sys.exit(appStatus)
620
621    def _flush(self):
622        # Not buffered, do nothing.
623        pass
624
625class Connection(object):
626    """
627    A Connection with the web server.
628
629    Each Connection is associated with a single socket (which is
630    connected to the web server) and is responsible for handling all
631    the FastCGI message processing for that socket.
632    """
633    _multiplexed = False
634    _inputStreamClass = InputStream
635
636    def __init__(self, sock, addr, server):
637        self._sock = sock
638        self._addr = addr
639        self.server = server
640
641        # Active Requests for this Connection, mapped by request ID.
642        self._requests = {}
643
644    def _cleanupSocket(self):
645        """Close the Connection's socket."""
646        try:
647            self._sock.shutdown(socket.SHUT_WR)
648        except:
649            return
650        try:
651            while True:
652                r, w, e = select.select([self._sock], [], [])
653                if not r or not self._sock.recv(1024):
654                    break
655        except:
656            pass
657        self._sock.close()
658
659    def run(self):
660        """Begin processing data from the socket."""
661        self._keepGoing = True
662        while self._keepGoing:
663            try:
664                self.process_input()
665            except EOFError:
666                break
667            except (select.error, socket.error), e:
668                if e.errno == errno.EBADF: # Socket was closed by Request.
669                    break
670                raise
671
672        self._cleanupSocket()
673
674    def process_input(self):
675        """Attempt to read a single Record from the socket and process it."""
676        # Currently, any children Request threads notify this Connection
677        # that it is no longer needed by closing the Connection's socket.
678        # We need to put a timeout on select, otherwise we might get
679        # stuck in it indefinitely... (I don't like this solution.)
680        while self._keepGoing:
681            try:
682                r, w, e = select.select([self._sock], [], [], 1.0)
683            except ValueError:
684                # Sigh. ValueError gets thrown sometimes when passing select
685                # a closed socket.
686                raise EOFError
687            if r: break
688        if not self._keepGoing:
689            return
690        rec = Record()
691        rec.read(self._sock)
692
693        if rec.type == FCGI_GET_VALUES:
694            self._do_get_values(rec)
695        elif rec.type == FCGI_BEGIN_REQUEST:
696            self._do_begin_request(rec)
697        elif rec.type == FCGI_ABORT_REQUEST:
698            self._do_abort_request(rec)
699        elif rec.type == FCGI_PARAMS:
700            self._do_params(rec)
701        elif rec.type == FCGI_STDIN:
702            self._do_stdin(rec)
703        elif rec.type == FCGI_DATA:
704            self._do_data(rec)
705        elif rec.requestId == FCGI_NULL_REQUEST_ID:
706            self._do_unknown_type(rec)
707        else:
708            # Need to complain about this.
709            pass
710
711    def writeRecord(self, rec):
712        """
713        Write a Record to the socket.
714        """
715        rec.write(self._sock)
716
717    def end_request(self, req, appStatus=0L,
718                    protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
719        """
720        End a Request.
721
722        Called by Request objects. An FCGI_END_REQUEST Record is
723        sent to the web server. If the web server no longer requires
724        the connection, the socket is closed, thereby ending this
725        Connection (run() returns).
726        """
727        rec = Record(FCGI_END_REQUEST, req.requestId)
728        rec.contentData = struct.pack(FCGI_EndRequestBody, appStatus,
729                                      protocolStatus)
730        rec.contentLength = FCGI_EndRequestBody_LEN
731        self.writeRecord(rec)
732
733        if remove:
734            del self._requests[req.requestId]
735
736        if __debug__: _debug(2, 'end_request: flags = %d' % req.flags)
737
738        if not (req.flags & FCGI_KEEP_CONN) and not self._requests:
739            self._cleanupSocket()
740            self._keepGoing = False
741
742    def _do_get_values(self, inrec):
743        """Handle an FCGI_GET_VALUES request from the web server."""
744        outrec = Record(FCGI_GET_VALUES_RESULT)
745
746        pos = 0
747        while pos < inrec.contentLength:
748            pos, (name, value) = decode_pair(inrec.contentData, pos)
749            cap = self.server.capability.get(name)
750            if cap is not None:
751                outrec.contentData += encode_pair(name, str(cap))
752
753        outrec.contentLength = len(outrec.contentData)
754        self.writeRecord(outrec)
755
756    def _do_begin_request(self, inrec):
757        """Handle an FCGI_BEGIN_REQUEST from the web server."""
758        role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData)
759
760        req = self.server.request_class(self, self._inputStreamClass)
761        req.requestId, req.role, req.flags = inrec.requestId, role, flags
762        req.aborted = False
763
764        if not self._multiplexed and self._requests:
765            # Can't multiplex requests.
766            self.end_request(req, 0L, FCGI_CANT_MPX_CONN, remove=False)
767        else:
768            self._requests[inrec.requestId] = req
769
770    def _do_abort_request(self, inrec):
771        """
772        Handle an FCGI_ABORT_REQUEST from the web server.
773
774        We just mark a flag in the associated Request.
775        """
776        req = self._requests.get(inrec.requestId)
777        if req is not None:
778            req.aborted = True
779
780    def _start_request(self, req):
781        """Run the request."""
782        # Not multiplexed, so run it inline.
783        req.run()
784
785    def _do_params(self, inrec):
786        """
787        Handle an FCGI_PARAMS Record.
788
789        If the last FCGI_PARAMS Record is received, start the request.
790        """
791        req = self._requests.get(inrec.requestId)
792        if req is not None:
793            if inrec.contentLength:
794                pos = 0
795                while pos < inrec.contentLength:
796                    pos, (name, value) = decode_pair(inrec.contentData, pos)
797                    req.params[name] = value
798            else:
799                self._start_request(req)
800
801    def _do_stdin(self, inrec):
802        """Handle the FCGI_STDIN stream."""
803        req = self._requests.get(inrec.requestId)
804        if req is not None:
805            req.stdin.add_data(inrec.contentData)
806
807    def _do_data(self, inrec):
808        """Handle the FCGI_DATA stream."""
809        req = self._requests.get(inrec.requestId)
810        if req is not None:
811            req.data.add_data(inrec.contentData)
812
813    def _do_unknown_type(self, inrec):
814        """Handle an unknown request type. Respond accordingly."""
815        outrec = Record(FCGI_UNKNOWN_TYPE)
816        outrec.contentData = struct.pack(FCGI_UnknownTypeBody, inrec.type)
817        outrec.contentLength = FCGI_UnknownTypeBody_LEN
818        self.writeRecord(outrec)
819
820class MultiplexedConnection(Connection):
821    """
822    A version of Connection capable of handling multiple requests
823    simultaneously.
824    """
825    _multiplexed = True
826    _inputStreamClass = MultiplexedInputStream
827
828    def __init__(self, sock, addr, server):
829        super(MultiplexedConnection, self).__init__(sock, addr, server)
830
831        # Used to arbitrate access to self._requests.
832        lock = threading.RLock()
833
834        # Notification is posted everytime a request completes, allowing us
835        # to quit cleanly.
836        self._lock = threading.Condition(lock)
837
838    def _cleanupSocket(self):
839        # Wait for any outstanding requests before closing the socket.
840        self._lock.acquire()
841        while self._requests:
842            self._lock.wait()
843        self._lock.release()
844
845        super(MultiplexedConnection, self)._cleanupSocket()
846
847    def writeRecord(self, rec):
848        # Must use locking to prevent intermingling of Records from different
849        # threads.
850        self._lock.acquire()
851        try:
852            # Probably faster than calling super. ;)
853            rec.write(self._sock)
854        finally:
855            self._lock.release()
856
857    def end_request(self, req, appStatus=0L,
858                    protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
859        self._lock.acquire()
860        try:
861            super(MultiplexedConnection, self).end_request(req, appStatus,
862                                                           protocolStatus,
863                                                           remove)
864            self._lock.notify()
865        finally:
866            self._lock.release()
867
868    def _do_begin_request(self, inrec):
869        self._lock.acquire()
870        try:
871            super(MultiplexedConnection, self)._do_begin_request(inrec)
872        finally:
873            self._lock.release()
874
875    def _do_abort_request(self, inrec):
876        self._lock.acquire()
877        try:
878            super(MultiplexedConnection, self)._do_abort_request(inrec)
879        finally:
880            self._lock.release()
881
882    def _start_request(self, req):
883        thread.start_new_thread(req.run, ())
884
885    def _do_params(self, inrec):
886        self._lock.acquire()
887        try:
888            super(MultiplexedConnection, self)._do_params(inrec)
889        finally:
890            self._lock.release()
891
892    def _do_stdin(self, inrec):
893        self._lock.acquire()
894        try:
895            super(MultiplexedConnection, self)._do_stdin(inrec)
896        finally:
897            self._lock.release()
898
899    def _do_data(self, inrec):
900        self._lock.acquire()
901        try:
902            super(MultiplexedConnection, self)._do_data(inrec)
903        finally:
904            self._lock.release()
905
906class Server(object):
907    """
908    The FastCGI server.
909
910    Waits for connections from the web server, processing each
911    request.
912
913    If run in a normal CGI context, it will instead instantiate a
914    CGIRequest and run the handler through there.
915    """
916    request_class = Request
917    cgirequest_class = CGIRequest
918
919    # Limits the size of the InputStream's string buffer to this size + the
920    # server's maximum Record size. Since the InputStream is not seekable,
921    # we throw away already-read data once this certain amount has been read.
922    inputStreamShrinkThreshold = 102400 - 8192
923
924    def __init__(self, handler=None, maxwrite=8192, bindAddress=None,
925                 umask=None, multiplexed=False):
926        """
927        handler, if present, must reference a function or method that
928        takes one argument: a Request object. If handler is not
929        specified at creation time, Server *must* be subclassed.
930        (The handler method below is abstract.)
931
932        maxwrite is the maximum number of bytes (per Record) to write
933        to the server. I've noticed mod_fastcgi has a relatively small
934        receive buffer (8K or so).
935
936        bindAddress, if present, must either be a string or a 2-tuple. If
937        present, run() will open its own listening socket. You would use
938        this if you wanted to run your application as an 'external' FastCGI
939        app. (i.e. the webserver would no longer be responsible for starting
940        your app) If a string, it will be interpreted as a filename and a UNIX
941        socket will be opened. If a tuple, the first element, a string,
942        is the interface name/IP to bind to, and the second element (an int)
943        is the port number.
944
945        Set multiplexed to True if you want to handle multiple requests
946        per connection. Some FastCGI backends (namely mod_fastcgi) don't
947        multiplex requests at all, so by default this is off (which saves
948        on thread creation/locking overhead). If threads aren't available,
949        this keyword is ignored; it's not possible to multiplex requests
950        at all.
951        """
952        if handler is not None:
953            self.handler = handler
954        self.maxwrite = maxwrite
955        if thread_available:
956            try:
957                import resource
958                # Attempt to glean the maximum number of connections
959                # from the OS.
960                maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
961            except ImportError:
962                maxConns = 100 # Just some made up number.
963            maxReqs = maxConns
964            if multiplexed:
965                self._connectionClass = MultiplexedConnection
966                maxReqs *= 5 # Another made up number.
967            else:
968                self._connectionClass = Connection
969            self.capability = {
970                FCGI_MAX_CONNS: maxConns,
971                FCGI_MAX_REQS: maxReqs,
972                FCGI_MPXS_CONNS: multiplexed and 1 or 0
973                }
974        else:
975            self._connectionClass = Connection
976            self.capability = {
977                # If threads aren't available, these are pretty much correct.
978                FCGI_MAX_CONNS: 1,
979                FCGI_MAX_REQS: 1,
980                FCGI_MPXS_CONNS: 0
981                }
982        self._bindAddress = bindAddress
983        self._umask = umask
984
985    def _setupSocket(self):
986        if self._bindAddress is None: # Run as a normal FastCGI?
987            isFCGI = True
988
989            sock = socket.fromfd(FCGI_LISTENSOCK_FILENO, socket.AF_INET,
990                                 socket.SOCK_STREAM)
991            try:
992                sock.getpeername()
993            except socket.error as e:
994                if e.errno == errno.ENOTSOCK:
995                    # Not a socket, assume CGI context.
996                    isFCGI = False
997                elif e.errno != errno.ENOTCONN:
998                    raise
999
1000            # FastCGI/CGI discrimination is broken on Mac OS X.
1001            # Set the environment variable FCGI_FORCE_CGI to "Y" or "y"
1002            # if you want to run your app as a simple CGI. (You can do
1003            # this with Apache's mod_env [not loaded by default in OS X
1004            # client, ha ha] and the SetEnv directive.)
1005            if not isFCGI or \
1006               os.environ.get('FCGI_FORCE_CGI', 'N').upper().startswith('Y'):
1007                req = self.cgirequest_class(self)
1008                req.run()
1009                sys.exit(0)
1010        else:
1011            # Run as a server
1012            oldUmask = None
1013            if type(self._bindAddress) is str:
1014                # Unix socket
1015                sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1016                try:
1017                    os.unlink(self._bindAddress)
1018                except OSError:
1019                    pass
1020                if self._umask is not None:
1021                    oldUmask = os.umask(self._umask)
1022            else:
1023                # INET socket
1024                assert type(self._bindAddress) is tuple
1025                assert len(self._bindAddress) == 2
1026                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1027                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1028
1029            sock.bind(self._bindAddress)
1030            sock.listen(socket.SOMAXCONN)
1031
1032            if oldUmask is not None:
1033                os.umask(oldUmask)
1034
1035        return sock
1036
1037    def _cleanupSocket(self, sock):
1038        """Closes the main socket."""
1039        sock.close()
1040
1041    def _installSignalHandlers(self):
1042        self._oldSIGs = [(x,signal.getsignal(x)) for x in
1043                         (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)]
1044        signal.signal(signal.SIGHUP, self._hupHandler)
1045        signal.signal(signal.SIGINT, self._intHandler)
1046        signal.signal(signal.SIGTERM, self._intHandler)
1047
1048    def _restoreSignalHandlers(self):
1049        for signum,handler in self._oldSIGs:
1050            signal.signal(signum, handler)
1051
1052    def _hupHandler(self, signum, frame):
1053        self._hupReceived = True
1054        self._keepGoing = False
1055
1056    def _intHandler(self, signum, frame):
1057        self._keepGoing = False
1058
1059    def run(self, timeout=1.0):
1060        """
1061        The main loop. Exits on SIGHUP, SIGINT, SIGTERM. Returns True if
1062        SIGHUP was received, False otherwise.
1063        """
1064        web_server_addrs = os.environ.get('FCGI_WEB_SERVER_ADDRS')
1065        if web_server_addrs is not None:
1066            web_server_addrs = map(lambda x: x.strip(),
1067                                   web_server_addrs.split(','))
1068
1069        sock = self._setupSocket()
1070
1071        self._keepGoing = True
1072        self._hupReceived = False
1073
1074        # Install signal handlers.
1075        self._installSignalHandlers()
1076
1077        while self._keepGoing:
1078            try:
1079                r, w, e = select.select([sock], [], [], timeout)
1080            except select.error, e:
1081                if e.errno == errno.EINTR:
1082                    continue
1083                raise
1084
1085            if r:
1086                try:
1087                    clientSock, addr = sock.accept()
1088                except socket.error as e:
1089                    if e.errno in (errno.EINTR, errno.EAGAIN):
1090                        continue
1091                    raise
1092
1093                if web_server_addrs and \
1094                       (len(addr) != 2 or addr[0] not in web_server_addrs):
1095                    clientSock.close()
1096                    continue
1097
1098                # Instantiate a new Connection and begin processing FastCGI
1099                # messages (either in a new thread or this thread).
1100                conn = self._connectionClass(clientSock, addr, self)
1101                thread.start_new_thread(conn.run, ())
1102
1103            self._mainloopPeriodic()
1104
1105        # Restore signal handlers.
1106        self._restoreSignalHandlers()
1107
1108        self._cleanupSocket(sock)
1109
1110        return self._hupReceived
1111
1112    def _mainloopPeriodic(self):
1113        """
1114        Called with just about each iteration of the main loop. Meant to
1115        be overridden.
1116        """
1117        pass
1118
1119    def _exit(self, reload=False):
1120        """
1121        Protected convenience method for subclasses to force an exit. Not
1122        really thread-safe, which is why it isn't public.
1123        """
1124        if self._keepGoing:
1125            self._keepGoing = False
1126            self._hupReceived = reload
1127
1128    def handler(self, req):
1129        """
1130        Default handler, which just raises an exception. Unless a handler
1131        is passed at initialization time, this must be implemented by
1132        a subclass.
1133        """
1134        raise NotImplementedError, self.__class__.__name__ + '.handler'
1135
1136    def error(self, req):
1137        """
1138        Called by Request if an exception occurs within the handler. May and
1139        should be overridden.
1140        """
1141        import cgitb
1142        req.stdout.write('Content-Type: text/html\r\n\r\n' +
1143                         cgitb.html(sys.exc_info()))
1144
1145class WSGIServer(Server):
1146    """
1147    FastCGI server that supports the Web Server Gateway Interface. See
1148    <http://www.python.org/peps/pep-0333.html>.
1149    """
1150    def __init__(self, application, environ=None,
1151                 multithreaded=True, **kw):
1152        """
1153        environ, if present, must be a dictionary-like object. Its
1154        contents will be copied into application's environ. Useful
1155        for passing application-specific variables.
1156
1157        Set multithreaded to False if your application is not MT-safe.
1158        """
1159        if kw.has_key('handler'):
1160            del kw['handler'] # Doesn't make sense to let this through
1161        super(WSGIServer, self).__init__(**kw)
1162
1163        if environ is None:
1164            environ = {}
1165
1166        self.application = application
1167        self.environ = environ
1168        self.multithreaded = multithreaded
1169
1170        # Used to force single-threadedness
1171        self._app_lock = thread.allocate_lock()
1172
1173    def handler(self, req):
1174        """Special handler for WSGI."""
1175        if req.role != FCGI_RESPONDER:
1176            return FCGI_UNKNOWN_ROLE, 0
1177
1178        # Mostly taken from example CGI gateway.
1179        environ = req.params
1180        environ.update(self.environ)
1181
1182        environ['wsgi.version'] = (1,0)
1183        environ['wsgi.input'] = req.stdin
1184        if self._bindAddress is None:
1185            stderr = req.stderr
1186        else:
1187            stderr = TeeOutputStream((sys.stderr, req.stderr))
1188        environ['wsgi.errors'] = stderr
1189        environ['wsgi.multithread'] = not isinstance(req, CGIRequest) and \
1190                                      thread_available and self.multithreaded
1191        # Rationale for the following: If started by the web server
1192        # (self._bindAddress is None) in either FastCGI or CGI mode, the
1193        # possibility of being spawned multiple times simultaneously is quite
1194        # real. And, if started as an external server, multiple copies may be
1195        # spawned for load-balancing/redundancy. (Though I don't think
1196        # mod_fastcgi supports this?)
1197        environ['wsgi.multiprocess'] = True
1198        environ['wsgi.run_once'] = isinstance(req, CGIRequest)
1199
1200        if environ.get('HTTPS', 'off') in ('on', '1'):
1201            environ['wsgi.url_scheme'] = 'https'
1202        else:
1203            environ['wsgi.url_scheme'] = 'http'
1204
1205        self._sanitizeEnv(environ)
1206
1207        headers_set = []
1208        headers_sent = []
1209        result = None
1210
1211        def write(data):
1212            assert type(data) is str, 'write() argument must be string'
1213            assert headers_set, 'write() before start_response()'
1214
1215            if not headers_sent:
1216                status, responseHeaders = headers_sent[:] = headers_set
1217                found = False
1218                for header,value in responseHeaders:
1219                    if header.lower() == 'content-length':
1220                        found = True
1221                        break
1222                if not found and result is not None:
1223                    try:
1224                        if len(result) == 1:
1225                            responseHeaders.append(('Content-Length',
1226                                                    str(len(data))))
1227                    except:
1228                        pass
1229                s = 'Status: %s\r\n' % status
1230                for header in responseHeaders:
1231                    s += '%s: %s\r\n' % header
1232                s += '\r\n'
1233                req.stdout.write(s)
1234
1235            req.stdout.write(data)
1236            req.stdout.flush()
1237
1238        def start_response(status, response_headers, exc_info=None):
1239            if exc_info:
1240                try:
1241                    if headers_sent:
1242                        # Re-raise if too late
1243                        raise exc_info[0], exc_info[1], exc_info[2]
1244                finally:
1245                    exc_info = None # avoid dangling circular ref
1246            else:
1247                assert not headers_set, 'Headers already set!'
1248
1249            assert type(status) is str, 'Status must be a string'
1250            assert len(status) >= 4, 'Status must be at least 4 characters'
1251            assert int(status[:3]), 'Status must begin with 3-digit code'
1252            assert status[3] == ' ', 'Status must have a space after code'
1253            assert type(response_headers) is list, 'Headers must be a list'
1254            if __debug__:
1255                for name,val in response_headers:
1256                    assert type(name) is str, 'Header names must be strings'
1257                    assert type(val) is str, 'Header values must be strings'
1258
1259            headers_set[:] = [status, response_headers]
1260            return write
1261
1262        if not self.multithreaded:
1263            self._app_lock.acquire()
1264        try:
1265            try:
1266                result = self.application(environ, start_response)
1267                try:
1268                    for data in result:
1269                        if data:
1270                            write(data)
1271                    if not headers_sent:
1272                        write('') # in case body was empty
1273                finally:
1274                    if hasattr(result, 'close'):
1275                        result.close()
1276            except socket.error as e:
1277                if e.errno != errno.EPIPE:
1278                    raise # Don't let EPIPE propagate beyond server
1279        finally:
1280            if not self.multithreaded:
1281                self._app_lock.release()
1282
1283        return FCGI_REQUEST_COMPLETE, 0
1284
1285    def _sanitizeEnv(self, environ):
1286        """Ensure certain values are present, if required by WSGI."""
1287        if not environ.has_key('SCRIPT_NAME'):
1288            environ['SCRIPT_NAME'] = ''
1289        if not environ.has_key('PATH_INFO'):
1290            environ['PATH_INFO'] = ''
1291
1292        # If any of these are missing, it probably signifies a broken
1293        # server...
1294        for name,default in [('REQUEST_METHOD', 'GET'),
1295                             ('SERVER_NAME', 'localhost'),
1296                             ('SERVER_PORT', '80'),
1297                             ('SERVER_PROTOCOL', 'HTTP/1.0')]:
1298            if not environ.has_key(name):
1299                environ['wsgi.errors'].write('%s: missing FastCGI param %s '
1300                                             'required by WSGI!\n' %
1301                                             (self.__class__.__name__, name))
1302                environ[name] = default
1303
1304if __name__ == '__main__':
1305    def test_app(environ, start_response):
1306        """Probably not the most efficient example."""
1307        import cgi
1308        start_response('200 OK', [('Content-Type', 'text/html')])
1309        yield '<html><head><title>Hello World!</title></head>\n' \
1310              '<body>\n' \
1311              '<p>Hello World!</p>\n' \
1312              '<table border="1">'
1313        names = environ.keys()
1314        names.sort()
1315        for name in names:
1316            yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
1317                name, cgi.escape(`environ[name]`))
1318
1319        form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
1320                                keep_blank_values=1)
1321        if form.list:
1322            yield '<tr><th colspan="2">Form data</th></tr>'
1323
1324        for field in form.list:
1325            yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
1326                field.name, field.value)
1327
1328        yield '</table>\n' \
1329              '</body></html>\n'
1330
1331    WSGIServer(test_app).run()
Note: See TracBrowser for help on using the repository browser.