1 | # Copyright (c) 2002, 2003, 2005, 2006 Allan Saddi <allan@saddi.com> |
---|
2 | # All rights reserved. |
---|
3 | # |
---|
4 | # Redistribution and use in source and binary forms, with or without |
---|
5 | # modification, are permitted provided that the following conditions |
---|
6 | # are met: |
---|
7 | # 1. Redistributions of source code must retain the above copyright |
---|
8 | # notice, this list of conditions and the following disclaimer. |
---|
9 | # 2. Redistributions in binary form must reproduce the above copyright |
---|
10 | # notice, this list of conditions and the following disclaimer in the |
---|
11 | # documentation and/or other materials provided with the distribution. |
---|
12 | # |
---|
13 | # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND |
---|
14 | # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
---|
15 | # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
---|
16 | # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE |
---|
17 | # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
---|
18 | # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS |
---|
19 | # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) |
---|
20 | # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
---|
21 | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY |
---|
22 | # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF |
---|
23 | # SUCH DAMAGE. |
---|
24 | # |
---|
25 | # $Id$ |
---|
26 | |
---|
27 | """ |
---|
28 | fcgi - a FastCGI/WSGI gateway. |
---|
29 | |
---|
30 | For more information about FastCGI, see <http://www.fastcgi.com/>. |
---|
31 | |
---|
32 | For more information about the Web Server Gateway Interface, see |
---|
33 | <http://www.python.org/peps/pep-0333.html>. |
---|
34 | |
---|
35 | Example usage: |
---|
36 | |
---|
37 | #!/usr/bin/env python |
---|
38 | from myapplication import app # Assume app is your WSGI application object |
---|
39 | from fcgi import WSGIServer |
---|
40 | WSGIServer(app).run() |
---|
41 | |
---|
42 | See the documentation for WSGIServer/Server for more information. |
---|
43 | |
---|
44 | On most platforms, fcgi will fallback to regular CGI behavior if run in a |
---|
45 | non-FastCGI context. If you want to force CGI behavior, set the environment |
---|
46 | variable FCGI_FORCE_CGI to "Y" or "y". |
---|
47 | """ |
---|
48 | |
---|
49 | __author__ = 'Allan Saddi <allan@saddi.com>' |
---|
50 | __version__ = '$Revision$' |
---|
51 | |
---|
52 | import sys |
---|
53 | import os |
---|
54 | import signal |
---|
55 | import struct |
---|
56 | import cStringIO as StringIO |
---|
57 | import select |
---|
58 | import socket |
---|
59 | import errno |
---|
60 | import traceback |
---|
61 | |
---|
62 | try: |
---|
63 | import thread |
---|
64 | import threading |
---|
65 | thread_available = True |
---|
66 | except ImportError: |
---|
67 | import dummy_thread as thread |
---|
68 | import dummy_threading as threading |
---|
69 | thread_available = False |
---|
70 | |
---|
71 | # Apparently 2.3 doesn't define SHUT_WR? Assume it is 1 in this case. |
---|
72 | if not hasattr(socket, 'SHUT_WR'): |
---|
73 | socket.SHUT_WR = 1 |
---|
74 | |
---|
75 | __all__ = ['WSGIServer'] |
---|
76 | |
---|
77 | # Constants from the spec. |
---|
78 | FCGI_LISTENSOCK_FILENO = 0 |
---|
79 | |
---|
80 | FCGI_HEADER_LEN = 8 |
---|
81 | |
---|
82 | FCGI_VERSION_1 = 1 |
---|
83 | |
---|
84 | FCGI_BEGIN_REQUEST = 1 |
---|
85 | FCGI_ABORT_REQUEST = 2 |
---|
86 | FCGI_END_REQUEST = 3 |
---|
87 | FCGI_PARAMS = 4 |
---|
88 | FCGI_STDIN = 5 |
---|
89 | FCGI_STDOUT = 6 |
---|
90 | FCGI_STDERR = 7 |
---|
91 | FCGI_DATA = 8 |
---|
92 | FCGI_GET_VALUES = 9 |
---|
93 | FCGI_GET_VALUES_RESULT = 10 |
---|
94 | FCGI_UNKNOWN_TYPE = 11 |
---|
95 | FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE |
---|
96 | |
---|
97 | FCGI_NULL_REQUEST_ID = 0 |
---|
98 | |
---|
99 | FCGI_KEEP_CONN = 1 |
---|
100 | |
---|
101 | FCGI_RESPONDER = 1 |
---|
102 | FCGI_AUTHORIZER = 2 |
---|
103 | FCGI_FILTER = 3 |
---|
104 | |
---|
105 | FCGI_REQUEST_COMPLETE = 0 |
---|
106 | FCGI_CANT_MPX_CONN = 1 |
---|
107 | FCGI_OVERLOADED = 2 |
---|
108 | FCGI_UNKNOWN_ROLE = 3 |
---|
109 | |
---|
110 | FCGI_MAX_CONNS = 'FCGI_MAX_CONNS' |
---|
111 | FCGI_MAX_REQS = 'FCGI_MAX_REQS' |
---|
112 | FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS' |
---|
113 | |
---|
114 | FCGI_Header = '!BBHHBx' |
---|
115 | FCGI_BeginRequestBody = '!HB5x' |
---|
116 | FCGI_EndRequestBody = '!LB3x' |
---|
117 | FCGI_UnknownTypeBody = '!B7x' |
---|
118 | |
---|
119 | FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody) |
---|
120 | FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody) |
---|
121 | |
---|
122 | if __debug__: |
---|
123 | import time |
---|
124 | |
---|
125 | # Set non-zero to write debug output to a file. |
---|
126 | DEBUG = 0 |
---|
127 | DEBUGLOG = '/tmp/fcgi.log' |
---|
128 | |
---|
129 | def _debug(level, msg): |
---|
130 | if DEBUG < level: |
---|
131 | return |
---|
132 | |
---|
133 | try: |
---|
134 | f = open(DEBUGLOG, 'a') |
---|
135 | f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg)) |
---|
136 | f.close() |
---|
137 | except: |
---|
138 | pass |
---|
139 | |
---|
140 | class InputStream(object): |
---|
141 | """ |
---|
142 | File-like object representing FastCGI input streams (FCGI_STDIN and |
---|
143 | FCGI_DATA). Supports the minimum methods required by WSGI spec. |
---|
144 | """ |
---|
145 | def __init__(self, conn): |
---|
146 | self._conn = conn |
---|
147 | |
---|
148 | # See Server. |
---|
149 | self._shrinkThreshold = conn.server.inputStreamShrinkThreshold |
---|
150 | |
---|
151 | self._buf = '' |
---|
152 | self._bufList = [] |
---|
153 | self._pos = 0 # Current read position. |
---|
154 | self._avail = 0 # Number of bytes currently available. |
---|
155 | |
---|
156 | self._eof = False # True when server has sent EOF notification. |
---|
157 | |
---|
158 | def _shrinkBuffer(self): |
---|
159 | """Gets rid of already read data (since we can't rewind).""" |
---|
160 | if self._pos >= self._shrinkThreshold: |
---|
161 | self._buf = self._buf[self._pos:] |
---|
162 | self._avail -= self._pos |
---|
163 | self._pos = 0 |
---|
164 | |
---|
165 | assert self._avail >= 0 |
---|
166 | |
---|
167 | def _waitForData(self): |
---|
168 | """Waits for more data to become available.""" |
---|
169 | self._conn.process_input() |
---|
170 | |
---|
171 | def read(self, n=-1): |
---|
172 | if self._pos == self._avail and self._eof: |
---|
173 | return '' |
---|
174 | while True: |
---|
175 | if n < 0 or (self._avail - self._pos) < n: |
---|
176 | # Not enough data available. |
---|
177 | if self._eof: |
---|
178 | # And there's no more coming. |
---|
179 | newPos = self._avail |
---|
180 | break |
---|
181 | else: |
---|
182 | # Wait for more data. |
---|
183 | self._waitForData() |
---|
184 | continue |
---|
185 | else: |
---|
186 | newPos = self._pos + n |
---|
187 | break |
---|
188 | # Merge buffer list, if necessary. |
---|
189 | if self._bufList: |
---|
190 | self._buf += ''.join(self._bufList) |
---|
191 | self._bufList = [] |
---|
192 | r = self._buf[self._pos:newPos] |
---|
193 | self._pos = newPos |
---|
194 | self._shrinkBuffer() |
---|
195 | return r |
---|
196 | |
---|
197 | def readline(self, length=None): |
---|
198 | if self._pos == self._avail and self._eof: |
---|
199 | return '' |
---|
200 | while True: |
---|
201 | # Unfortunately, we need to merge the buffer list early. |
---|
202 | if self._bufList: |
---|
203 | self._buf += ''.join(self._bufList) |
---|
204 | self._bufList = [] |
---|
205 | # Find newline. |
---|
206 | i = self._buf.find('\n', self._pos) |
---|
207 | if i < 0: |
---|
208 | # Not found? |
---|
209 | if self._eof: |
---|
210 | # No more data coming. |
---|
211 | newPos = self._avail |
---|
212 | break |
---|
213 | else: |
---|
214 | # Wait for more to come. |
---|
215 | self._waitForData() |
---|
216 | continue |
---|
217 | else: |
---|
218 | newPos = i + 1 |
---|
219 | break |
---|
220 | if length is not None: |
---|
221 | if self._pos + length < newPos: |
---|
222 | newPos = self._pos + length |
---|
223 | r = self._buf[self._pos:newPos] |
---|
224 | self._pos = newPos |
---|
225 | self._shrinkBuffer() |
---|
226 | return r |
---|
227 | |
---|
228 | def readlines(self, sizehint=0): |
---|
229 | total = 0 |
---|
230 | lines = [] |
---|
231 | line = self.readline() |
---|
232 | while line: |
---|
233 | lines.append(line) |
---|
234 | total += len(line) |
---|
235 | if 0 < sizehint <= total: |
---|
236 | break |
---|
237 | line = self.readline() |
---|
238 | return lines |
---|
239 | |
---|
240 | def __iter__(self): |
---|
241 | return self |
---|
242 | |
---|
243 | def next(self): |
---|
244 | r = self.readline() |
---|
245 | if not r: |
---|
246 | raise StopIteration |
---|
247 | return r |
---|
248 | |
---|
249 | def add_data(self, data): |
---|
250 | if not data: |
---|
251 | self._eof = True |
---|
252 | else: |
---|
253 | self._bufList.append(data) |
---|
254 | self._avail += len(data) |
---|
255 | |
---|
256 | class MultiplexedInputStream(InputStream): |
---|
257 | """ |
---|
258 | A version of InputStream meant to be used with MultiplexedConnections. |
---|
259 | Assumes the MultiplexedConnection (the producer) and the Request |
---|
260 | (the consumer) are running in different threads. |
---|
261 | """ |
---|
262 | def __init__(self, conn): |
---|
263 | super(MultiplexedInputStream, self).__init__(conn) |
---|
264 | |
---|
265 | # Arbitrates access to this InputStream (it's used simultaneously |
---|
266 | # by a Request and its owning Connection object). |
---|
267 | lock = threading.RLock() |
---|
268 | |
---|
269 | # Notifies Request thread that there is new data available. |
---|
270 | self._lock = threading.Condition(lock) |
---|
271 | |
---|
272 | def _waitForData(self): |
---|
273 | # Wait for notification from add_data(). |
---|
274 | self._lock.wait() |
---|
275 | |
---|
276 | def read(self, n=-1): |
---|
277 | self._lock.acquire() |
---|
278 | try: |
---|
279 | return super(MultiplexedInputStream, self).read(n) |
---|
280 | finally: |
---|
281 | self._lock.release() |
---|
282 | |
---|
283 | def readline(self, length=None): |
---|
284 | self._lock.acquire() |
---|
285 | try: |
---|
286 | return super(MultiplexedInputStream, self).readline(length) |
---|
287 | finally: |
---|
288 | self._lock.release() |
---|
289 | |
---|
290 | def add_data(self, data): |
---|
291 | self._lock.acquire() |
---|
292 | try: |
---|
293 | super(MultiplexedInputStream, self).add_data(data) |
---|
294 | self._lock.notify() |
---|
295 | finally: |
---|
296 | self._lock.release() |
---|
297 | |
---|
298 | class OutputStream(object): |
---|
299 | """ |
---|
300 | FastCGI output stream (FCGI_STDOUT/FCGI_STDERR). By default, calls to |
---|
301 | write() or writelines() immediately result in Records being sent back |
---|
302 | to the server. Buffering should be done in a higher level! |
---|
303 | """ |
---|
304 | def __init__(self, conn, req, type, buffered=False): |
---|
305 | self._conn = conn |
---|
306 | self._req = req |
---|
307 | self._type = type |
---|
308 | self._buffered = buffered |
---|
309 | self._bufList = [] # Used if buffered is True |
---|
310 | self.dataWritten = False |
---|
311 | self.closed = False |
---|
312 | |
---|
313 | def _write(self, data): |
---|
314 | length = len(data) |
---|
315 | while length: |
---|
316 | toWrite = min(length, self._req.server.maxwrite - FCGI_HEADER_LEN) |
---|
317 | |
---|
318 | rec = Record(self._type, self._req.requestId) |
---|
319 | rec.contentLength = toWrite |
---|
320 | rec.contentData = data[:toWrite] |
---|
321 | self._conn.writeRecord(rec) |
---|
322 | |
---|
323 | data = data[toWrite:] |
---|
324 | length -= toWrite |
---|
325 | |
---|
326 | def write(self, data): |
---|
327 | assert not self.closed |
---|
328 | |
---|
329 | if not data: |
---|
330 | return |
---|
331 | |
---|
332 | self.dataWritten = True |
---|
333 | |
---|
334 | if self._buffered: |
---|
335 | self._bufList.append(data) |
---|
336 | else: |
---|
337 | self._write(data) |
---|
338 | |
---|
339 | def writelines(self, lines): |
---|
340 | assert not self.closed |
---|
341 | |
---|
342 | for line in lines: |
---|
343 | self.write(line) |
---|
344 | |
---|
345 | def flush(self): |
---|
346 | # Only need to flush if this OutputStream is actually buffered. |
---|
347 | if self._buffered: |
---|
348 | data = ''.join(self._bufList) |
---|
349 | self._bufList = [] |
---|
350 | self._write(data) |
---|
351 | |
---|
352 | # Though available, the following should NOT be called by WSGI apps. |
---|
353 | def close(self): |
---|
354 | """Sends end-of-stream notification, if necessary.""" |
---|
355 | if not self.closed and self.dataWritten: |
---|
356 | self.flush() |
---|
357 | rec = Record(self._type, self._req.requestId) |
---|
358 | self._conn.writeRecord(rec) |
---|
359 | self.closed = True |
---|
360 | |
---|
361 | class TeeOutputStream(object): |
---|
362 | """ |
---|
363 | Simple wrapper around two or more output file-like objects that copies |
---|
364 | written data to all streams. |
---|
365 | """ |
---|
366 | def __init__(self, streamList): |
---|
367 | self._streamList = streamList |
---|
368 | |
---|
369 | def write(self, data): |
---|
370 | for f in self._streamList: |
---|
371 | f.write(data) |
---|
372 | |
---|
373 | def writelines(self, lines): |
---|
374 | for line in lines: |
---|
375 | self.write(line) |
---|
376 | |
---|
377 | def flush(self): |
---|
378 | for f in self._streamList: |
---|
379 | f.flush() |
---|
380 | |
---|
381 | class StdoutWrapper(object): |
---|
382 | """ |
---|
383 | Wrapper for sys.stdout so we know if data has actually been written. |
---|
384 | """ |
---|
385 | def __init__(self, stdout): |
---|
386 | self._file = stdout |
---|
387 | self.dataWritten = False |
---|
388 | |
---|
389 | def write(self, data): |
---|
390 | if data: |
---|
391 | self.dataWritten = True |
---|
392 | self._file.write(data) |
---|
393 | |
---|
394 | def writelines(self, lines): |
---|
395 | for line in lines: |
---|
396 | self.write(line) |
---|
397 | |
---|
398 | def __getattr__(self, name): |
---|
399 | return getattr(self._file, name) |
---|
400 | |
---|
401 | def decode_pair(s, pos=0): |
---|
402 | """ |
---|
403 | Decodes a name/value pair. |
---|
404 | |
---|
405 | The number of bytes decoded as well as the name/value pair |
---|
406 | are returned. |
---|
407 | """ |
---|
408 | nameLength = ord(s[pos]) |
---|
409 | if nameLength & 128: |
---|
410 | nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff |
---|
411 | pos += 4 |
---|
412 | else: |
---|
413 | pos += 1 |
---|
414 | |
---|
415 | valueLength = ord(s[pos]) |
---|
416 | if valueLength & 128: |
---|
417 | valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff |
---|
418 | pos += 4 |
---|
419 | else: |
---|
420 | pos += 1 |
---|
421 | |
---|
422 | name = s[pos:pos+nameLength] |
---|
423 | pos += nameLength |
---|
424 | value = s[pos:pos+valueLength] |
---|
425 | pos += valueLength |
---|
426 | |
---|
427 | return (pos, (name, value)) |
---|
428 | |
---|
429 | def encode_pair(name, value): |
---|
430 | """ |
---|
431 | Encodes a name/value pair. |
---|
432 | |
---|
433 | The encoded string is returned. |
---|
434 | """ |
---|
435 | nameLength = len(name) |
---|
436 | if nameLength < 128: |
---|
437 | s = chr(nameLength) |
---|
438 | else: |
---|
439 | s = struct.pack('!L', nameLength | 0x80000000L) |
---|
440 | |
---|
441 | valueLength = len(value) |
---|
442 | if valueLength < 128: |
---|
443 | s += chr(valueLength) |
---|
444 | else: |
---|
445 | s += struct.pack('!L', valueLength | 0x80000000L) |
---|
446 | |
---|
447 | return s + name + value |
---|
448 | |
---|
449 | class Record(object): |
---|
450 | """ |
---|
451 | A FastCGI Record. |
---|
452 | |
---|
453 | Used for encoding/decoding records. |
---|
454 | """ |
---|
455 | def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID): |
---|
456 | self.version = FCGI_VERSION_1 |
---|
457 | self.type = type |
---|
458 | self.requestId = requestId |
---|
459 | self.contentLength = 0 |
---|
460 | self.paddingLength = 0 |
---|
461 | self.contentData = '' |
---|
462 | |
---|
463 | def _recvall(sock, length): |
---|
464 | """ |
---|
465 | Attempts to receive length bytes from a socket, blocking if necessary. |
---|
466 | (Socket may be blocking or non-blocking.) |
---|
467 | """ |
---|
468 | dataList = [] |
---|
469 | recvLen = 0 |
---|
470 | while length: |
---|
471 | try: |
---|
472 | data = sock.recv(length) |
---|
473 | except socket.error as e: |
---|
474 | if e.errno == errno.EAGAIN: |
---|
475 | select.select([sock], [], []) |
---|
476 | continue |
---|
477 | else: |
---|
478 | raise |
---|
479 | if not data: # EOF |
---|
480 | break |
---|
481 | dataList.append(data) |
---|
482 | dataLen = len(data) |
---|
483 | recvLen += dataLen |
---|
484 | length -= dataLen |
---|
485 | return ''.join(dataList), recvLen |
---|
486 | _recvall = staticmethod(_recvall) |
---|
487 | |
---|
488 | def read(self, sock): |
---|
489 | """Read and decode a Record from a socket.""" |
---|
490 | try: |
---|
491 | header, length = self._recvall(sock, FCGI_HEADER_LEN) |
---|
492 | except: |
---|
493 | raise EOFError |
---|
494 | |
---|
495 | if length < FCGI_HEADER_LEN: |
---|
496 | raise EOFError |
---|
497 | |
---|
498 | self.version, self.type, self.requestId, self.contentLength, \ |
---|
499 | self.paddingLength = struct.unpack(FCGI_Header, header) |
---|
500 | |
---|
501 | if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, ' |
---|
502 | 'contentLength = %d' % |
---|
503 | (sock.fileno(), self.type, self.requestId, |
---|
504 | self.contentLength)) |
---|
505 | |
---|
506 | if self.contentLength: |
---|
507 | try: |
---|
508 | self.contentData, length = self._recvall(sock, |
---|
509 | self.contentLength) |
---|
510 | except: |
---|
511 | raise EOFError |
---|
512 | |
---|
513 | if length < self.contentLength: |
---|
514 | raise EOFError |
---|
515 | |
---|
516 | if self.paddingLength: |
---|
517 | try: |
---|
518 | self._recvall(sock, self.paddingLength) |
---|
519 | except: |
---|
520 | raise EOFError |
---|
521 | |
---|
522 | def _sendall(sock, data): |
---|
523 | """ |
---|
524 | Writes data to a socket and does not return until all the data is sent. |
---|
525 | """ |
---|
526 | length = len(data) |
---|
527 | while length: |
---|
528 | try: |
---|
529 | sent = sock.send(data) |
---|
530 | except socket.error as e: |
---|
531 | if e.errno == errno.EAGAIN: |
---|
532 | select.select([], [sock], []) |
---|
533 | continue |
---|
534 | else: |
---|
535 | raise |
---|
536 | data = data[sent:] |
---|
537 | length -= sent |
---|
538 | _sendall = staticmethod(_sendall) |
---|
539 | |
---|
540 | def write(self, sock): |
---|
541 | """Encode and write a Record to a socket.""" |
---|
542 | self.paddingLength = -self.contentLength & 7 |
---|
543 | |
---|
544 | if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, ' |
---|
545 | 'contentLength = %d' % |
---|
546 | (sock.fileno(), self.type, self.requestId, |
---|
547 | self.contentLength)) |
---|
548 | |
---|
549 | header = struct.pack(FCGI_Header, self.version, self.type, |
---|
550 | self.requestId, self.contentLength, |
---|
551 | self.paddingLength) |
---|
552 | self._sendall(sock, header) |
---|
553 | if self.contentLength: |
---|
554 | self._sendall(sock, self.contentData) |
---|
555 | if self.paddingLength: |
---|
556 | self._sendall(sock, '\x00'*self.paddingLength) |
---|
557 | |
---|
558 | class Request(object): |
---|
559 | """ |
---|
560 | Represents a single FastCGI request. |
---|
561 | |
---|
562 | These objects are passed to your handler and is the main interface |
---|
563 | between your handler and the fcgi module. The methods should not |
---|
564 | be called by your handler. However, server, params, stdin, stdout, |
---|
565 | stderr, and data are free for your handler's use. |
---|
566 | """ |
---|
567 | def __init__(self, conn, inputStreamClass): |
---|
568 | self._conn = conn |
---|
569 | |
---|
570 | self.server = conn.server |
---|
571 | self.params = {} |
---|
572 | self.stdin = inputStreamClass(conn) |
---|
573 | self.stdout = OutputStream(conn, self, FCGI_STDOUT) |
---|
574 | self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True) |
---|
575 | self.data = inputStreamClass(conn) |
---|
576 | |
---|
577 | def run(self): |
---|
578 | """Runs the handler, flushes the streams, and ends the request.""" |
---|
579 | try: |
---|
580 | protocolStatus, appStatus = self.server.handler(self) |
---|
581 | except: |
---|
582 | traceback.print_exc(file=self.stderr) |
---|
583 | self.stderr.flush() |
---|
584 | if not self.stdout.dataWritten: |
---|
585 | self.server.error(self) |
---|
586 | |
---|
587 | protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0 |
---|
588 | |
---|
589 | if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' % |
---|
590 | (protocolStatus, appStatus)) |
---|
591 | |
---|
592 | self._flush() |
---|
593 | self._end(appStatus, protocolStatus) |
---|
594 | |
---|
595 | def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE): |
---|
596 | self._conn.end_request(self, appStatus, protocolStatus) |
---|
597 | |
---|
598 | def _flush(self): |
---|
599 | self.stdout.close() |
---|
600 | self.stderr.close() |
---|
601 | |
---|
602 | class CGIRequest(Request): |
---|
603 | """A normal CGI request disguised as a FastCGI request.""" |
---|
604 | def __init__(self, server): |
---|
605 | # These are normally filled in by Connection. |
---|
606 | self.requestId = 1 |
---|
607 | self.role = FCGI_RESPONDER |
---|
608 | self.flags = 0 |
---|
609 | self.aborted = False |
---|
610 | |
---|
611 | self.server = server |
---|
612 | self.params = dict(os.environ) |
---|
613 | self.stdin = sys.stdin |
---|
614 | self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity! |
---|
615 | self.stderr = sys.stderr |
---|
616 | self.data = StringIO.StringIO() |
---|
617 | |
---|
618 | def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE): |
---|
619 | sys.exit(appStatus) |
---|
620 | |
---|
621 | def _flush(self): |
---|
622 | # Not buffered, do nothing. |
---|
623 | pass |
---|
624 | |
---|
625 | class Connection(object): |
---|
626 | """ |
---|
627 | A Connection with the web server. |
---|
628 | |
---|
629 | Each Connection is associated with a single socket (which is |
---|
630 | connected to the web server) and is responsible for handling all |
---|
631 | the FastCGI message processing for that socket. |
---|
632 | """ |
---|
633 | _multiplexed = False |
---|
634 | _inputStreamClass = InputStream |
---|
635 | |
---|
636 | def __init__(self, sock, addr, server): |
---|
637 | self._sock = sock |
---|
638 | self._addr = addr |
---|
639 | self.server = server |
---|
640 | |
---|
641 | # Active Requests for this Connection, mapped by request ID. |
---|
642 | self._requests = {} |
---|
643 | |
---|
644 | def _cleanupSocket(self): |
---|
645 | """Close the Connection's socket.""" |
---|
646 | try: |
---|
647 | self._sock.shutdown(socket.SHUT_WR) |
---|
648 | except: |
---|
649 | return |
---|
650 | try: |
---|
651 | while True: |
---|
652 | r, w, e = select.select([self._sock], [], []) |
---|
653 | if not r or not self._sock.recv(1024): |
---|
654 | break |
---|
655 | except: |
---|
656 | pass |
---|
657 | self._sock.close() |
---|
658 | |
---|
659 | def run(self): |
---|
660 | """Begin processing data from the socket.""" |
---|
661 | self._keepGoing = True |
---|
662 | while self._keepGoing: |
---|
663 | try: |
---|
664 | self.process_input() |
---|
665 | except EOFError: |
---|
666 | break |
---|
667 | except (select.error, socket.error), e: |
---|
668 | if e.errno == errno.EBADF: # Socket was closed by Request. |
---|
669 | break |
---|
670 | raise |
---|
671 | |
---|
672 | self._cleanupSocket() |
---|
673 | |
---|
674 | def process_input(self): |
---|
675 | """Attempt to read a single Record from the socket and process it.""" |
---|
676 | # Currently, any children Request threads notify this Connection |
---|
677 | # that it is no longer needed by closing the Connection's socket. |
---|
678 | # We need to put a timeout on select, otherwise we might get |
---|
679 | # stuck in it indefinitely... (I don't like this solution.) |
---|
680 | while self._keepGoing: |
---|
681 | try: |
---|
682 | r, w, e = select.select([self._sock], [], [], 1.0) |
---|
683 | except ValueError: |
---|
684 | # Sigh. ValueError gets thrown sometimes when passing select |
---|
685 | # a closed socket. |
---|
686 | raise EOFError |
---|
687 | if r: break |
---|
688 | if not self._keepGoing: |
---|
689 | return |
---|
690 | rec = Record() |
---|
691 | rec.read(self._sock) |
---|
692 | |
---|
693 | if rec.type == FCGI_GET_VALUES: |
---|
694 | self._do_get_values(rec) |
---|
695 | elif rec.type == FCGI_BEGIN_REQUEST: |
---|
696 | self._do_begin_request(rec) |
---|
697 | elif rec.type == FCGI_ABORT_REQUEST: |
---|
698 | self._do_abort_request(rec) |
---|
699 | elif rec.type == FCGI_PARAMS: |
---|
700 | self._do_params(rec) |
---|
701 | elif rec.type == FCGI_STDIN: |
---|
702 | self._do_stdin(rec) |
---|
703 | elif rec.type == FCGI_DATA: |
---|
704 | self._do_data(rec) |
---|
705 | elif rec.requestId == FCGI_NULL_REQUEST_ID: |
---|
706 | self._do_unknown_type(rec) |
---|
707 | else: |
---|
708 | # Need to complain about this. |
---|
709 | pass |
---|
710 | |
---|
711 | def writeRecord(self, rec): |
---|
712 | """ |
---|
713 | Write a Record to the socket. |
---|
714 | """ |
---|
715 | rec.write(self._sock) |
---|
716 | |
---|
717 | def end_request(self, req, appStatus=0L, |
---|
718 | protocolStatus=FCGI_REQUEST_COMPLETE, remove=True): |
---|
719 | """ |
---|
720 | End a Request. |
---|
721 | |
---|
722 | Called by Request objects. An FCGI_END_REQUEST Record is |
---|
723 | sent to the web server. If the web server no longer requires |
---|
724 | the connection, the socket is closed, thereby ending this |
---|
725 | Connection (run() returns). |
---|
726 | """ |
---|
727 | rec = Record(FCGI_END_REQUEST, req.requestId) |
---|
728 | rec.contentData = struct.pack(FCGI_EndRequestBody, appStatus, |
---|
729 | protocolStatus) |
---|
730 | rec.contentLength = FCGI_EndRequestBody_LEN |
---|
731 | self.writeRecord(rec) |
---|
732 | |
---|
733 | if remove: |
---|
734 | del self._requests[req.requestId] |
---|
735 | |
---|
736 | if __debug__: _debug(2, 'end_request: flags = %d' % req.flags) |
---|
737 | |
---|
738 | if not (req.flags & FCGI_KEEP_CONN) and not self._requests: |
---|
739 | self._cleanupSocket() |
---|
740 | self._keepGoing = False |
---|
741 | |
---|
742 | def _do_get_values(self, inrec): |
---|
743 | """Handle an FCGI_GET_VALUES request from the web server.""" |
---|
744 | outrec = Record(FCGI_GET_VALUES_RESULT) |
---|
745 | |
---|
746 | pos = 0 |
---|
747 | while pos < inrec.contentLength: |
---|
748 | pos, (name, value) = decode_pair(inrec.contentData, pos) |
---|
749 | cap = self.server.capability.get(name) |
---|
750 | if cap is not None: |
---|
751 | outrec.contentData += encode_pair(name, str(cap)) |
---|
752 | |
---|
753 | outrec.contentLength = len(outrec.contentData) |
---|
754 | self.writeRecord(outrec) |
---|
755 | |
---|
756 | def _do_begin_request(self, inrec): |
---|
757 | """Handle an FCGI_BEGIN_REQUEST from the web server.""" |
---|
758 | role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData) |
---|
759 | |
---|
760 | req = self.server.request_class(self, self._inputStreamClass) |
---|
761 | req.requestId, req.role, req.flags = inrec.requestId, role, flags |
---|
762 | req.aborted = False |
---|
763 | |
---|
764 | if not self._multiplexed and self._requests: |
---|
765 | # Can't multiplex requests. |
---|
766 | self.end_request(req, 0L, FCGI_CANT_MPX_CONN, remove=False) |
---|
767 | else: |
---|
768 | self._requests[inrec.requestId] = req |
---|
769 | |
---|
770 | def _do_abort_request(self, inrec): |
---|
771 | """ |
---|
772 | Handle an FCGI_ABORT_REQUEST from the web server. |
---|
773 | |
---|
774 | We just mark a flag in the associated Request. |
---|
775 | """ |
---|
776 | req = self._requests.get(inrec.requestId) |
---|
777 | if req is not None: |
---|
778 | req.aborted = True |
---|
779 | |
---|
780 | def _start_request(self, req): |
---|
781 | """Run the request.""" |
---|
782 | # Not multiplexed, so run it inline. |
---|
783 | req.run() |
---|
784 | |
---|
785 | def _do_params(self, inrec): |
---|
786 | """ |
---|
787 | Handle an FCGI_PARAMS Record. |
---|
788 | |
---|
789 | If the last FCGI_PARAMS Record is received, start the request. |
---|
790 | """ |
---|
791 | req = self._requests.get(inrec.requestId) |
---|
792 | if req is not None: |
---|
793 | if inrec.contentLength: |
---|
794 | pos = 0 |
---|
795 | while pos < inrec.contentLength: |
---|
796 | pos, (name, value) = decode_pair(inrec.contentData, pos) |
---|
797 | req.params[name] = value |
---|
798 | else: |
---|
799 | self._start_request(req) |
---|
800 | |
---|
801 | def _do_stdin(self, inrec): |
---|
802 | """Handle the FCGI_STDIN stream.""" |
---|
803 | req = self._requests.get(inrec.requestId) |
---|
804 | if req is not None: |
---|
805 | req.stdin.add_data(inrec.contentData) |
---|
806 | |
---|
807 | def _do_data(self, inrec): |
---|
808 | """Handle the FCGI_DATA stream.""" |
---|
809 | req = self._requests.get(inrec.requestId) |
---|
810 | if req is not None: |
---|
811 | req.data.add_data(inrec.contentData) |
---|
812 | |
---|
813 | def _do_unknown_type(self, inrec): |
---|
814 | """Handle an unknown request type. Respond accordingly.""" |
---|
815 | outrec = Record(FCGI_UNKNOWN_TYPE) |
---|
816 | outrec.contentData = struct.pack(FCGI_UnknownTypeBody, inrec.type) |
---|
817 | outrec.contentLength = FCGI_UnknownTypeBody_LEN |
---|
818 | self.writeRecord(outrec) |
---|
819 | |
---|
820 | class MultiplexedConnection(Connection): |
---|
821 | """ |
---|
822 | A version of Connection capable of handling multiple requests |
---|
823 | simultaneously. |
---|
824 | """ |
---|
825 | _multiplexed = True |
---|
826 | _inputStreamClass = MultiplexedInputStream |
---|
827 | |
---|
828 | def __init__(self, sock, addr, server): |
---|
829 | super(MultiplexedConnection, self).__init__(sock, addr, server) |
---|
830 | |
---|
831 | # Used to arbitrate access to self._requests. |
---|
832 | lock = threading.RLock() |
---|
833 | |
---|
834 | # Notification is posted everytime a request completes, allowing us |
---|
835 | # to quit cleanly. |
---|
836 | self._lock = threading.Condition(lock) |
---|
837 | |
---|
838 | def _cleanupSocket(self): |
---|
839 | # Wait for any outstanding requests before closing the socket. |
---|
840 | self._lock.acquire() |
---|
841 | while self._requests: |
---|
842 | self._lock.wait() |
---|
843 | self._lock.release() |
---|
844 | |
---|
845 | super(MultiplexedConnection, self)._cleanupSocket() |
---|
846 | |
---|
847 | def writeRecord(self, rec): |
---|
848 | # Must use locking to prevent intermingling of Records from different |
---|
849 | # threads. |
---|
850 | self._lock.acquire() |
---|
851 | try: |
---|
852 | # Probably faster than calling super. ;) |
---|
853 | rec.write(self._sock) |
---|
854 | finally: |
---|
855 | self._lock.release() |
---|
856 | |
---|
857 | def end_request(self, req, appStatus=0L, |
---|
858 | protocolStatus=FCGI_REQUEST_COMPLETE, remove=True): |
---|
859 | self._lock.acquire() |
---|
860 | try: |
---|
861 | super(MultiplexedConnection, self).end_request(req, appStatus, |
---|
862 | protocolStatus, |
---|
863 | remove) |
---|
864 | self._lock.notify() |
---|
865 | finally: |
---|
866 | self._lock.release() |
---|
867 | |
---|
868 | def _do_begin_request(self, inrec): |
---|
869 | self._lock.acquire() |
---|
870 | try: |
---|
871 | super(MultiplexedConnection, self)._do_begin_request(inrec) |
---|
872 | finally: |
---|
873 | self._lock.release() |
---|
874 | |
---|
875 | def _do_abort_request(self, inrec): |
---|
876 | self._lock.acquire() |
---|
877 | try: |
---|
878 | super(MultiplexedConnection, self)._do_abort_request(inrec) |
---|
879 | finally: |
---|
880 | self._lock.release() |
---|
881 | |
---|
882 | def _start_request(self, req): |
---|
883 | thread.start_new_thread(req.run, ()) |
---|
884 | |
---|
885 | def _do_params(self, inrec): |
---|
886 | self._lock.acquire() |
---|
887 | try: |
---|
888 | super(MultiplexedConnection, self)._do_params(inrec) |
---|
889 | finally: |
---|
890 | self._lock.release() |
---|
891 | |
---|
892 | def _do_stdin(self, inrec): |
---|
893 | self._lock.acquire() |
---|
894 | try: |
---|
895 | super(MultiplexedConnection, self)._do_stdin(inrec) |
---|
896 | finally: |
---|
897 | self._lock.release() |
---|
898 | |
---|
899 | def _do_data(self, inrec): |
---|
900 | self._lock.acquire() |
---|
901 | try: |
---|
902 | super(MultiplexedConnection, self)._do_data(inrec) |
---|
903 | finally: |
---|
904 | self._lock.release() |
---|
905 | |
---|
906 | class Server(object): |
---|
907 | """ |
---|
908 | The FastCGI server. |
---|
909 | |
---|
910 | Waits for connections from the web server, processing each |
---|
911 | request. |
---|
912 | |
---|
913 | If run in a normal CGI context, it will instead instantiate a |
---|
914 | CGIRequest and run the handler through there. |
---|
915 | """ |
---|
916 | request_class = Request |
---|
917 | cgirequest_class = CGIRequest |
---|
918 | |
---|
919 | # Limits the size of the InputStream's string buffer to this size + the |
---|
920 | # server's maximum Record size. Since the InputStream is not seekable, |
---|
921 | # we throw away already-read data once this certain amount has been read. |
---|
922 | inputStreamShrinkThreshold = 102400 - 8192 |
---|
923 | |
---|
924 | def __init__(self, handler=None, maxwrite=8192, bindAddress=None, |
---|
925 | umask=None, multiplexed=False): |
---|
926 | """ |
---|
927 | handler, if present, must reference a function or method that |
---|
928 | takes one argument: a Request object. If handler is not |
---|
929 | specified at creation time, Server *must* be subclassed. |
---|
930 | (The handler method below is abstract.) |
---|
931 | |
---|
932 | maxwrite is the maximum number of bytes (per Record) to write |
---|
933 | to the server. I've noticed mod_fastcgi has a relatively small |
---|
934 | receive buffer (8K or so). |
---|
935 | |
---|
936 | bindAddress, if present, must either be a string or a 2-tuple. If |
---|
937 | present, run() will open its own listening socket. You would use |
---|
938 | this if you wanted to run your application as an 'external' FastCGI |
---|
939 | app. (i.e. the webserver would no longer be responsible for starting |
---|
940 | your app) If a string, it will be interpreted as a filename and a UNIX |
---|
941 | socket will be opened. If a tuple, the first element, a string, |
---|
942 | is the interface name/IP to bind to, and the second element (an int) |
---|
943 | is the port number. |
---|
944 | |
---|
945 | Set multiplexed to True if you want to handle multiple requests |
---|
946 | per connection. Some FastCGI backends (namely mod_fastcgi) don't |
---|
947 | multiplex requests at all, so by default this is off (which saves |
---|
948 | on thread creation/locking overhead). If threads aren't available, |
---|
949 | this keyword is ignored; it's not possible to multiplex requests |
---|
950 | at all. |
---|
951 | """ |
---|
952 | if handler is not None: |
---|
953 | self.handler = handler |
---|
954 | self.maxwrite = maxwrite |
---|
955 | if thread_available: |
---|
956 | try: |
---|
957 | import resource |
---|
958 | # Attempt to glean the maximum number of connections |
---|
959 | # from the OS. |
---|
960 | maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0] |
---|
961 | except ImportError: |
---|
962 | maxConns = 100 # Just some made up number. |
---|
963 | maxReqs = maxConns |
---|
964 | if multiplexed: |
---|
965 | self._connectionClass = MultiplexedConnection |
---|
966 | maxReqs *= 5 # Another made up number. |
---|
967 | else: |
---|
968 | self._connectionClass = Connection |
---|
969 | self.capability = { |
---|
970 | FCGI_MAX_CONNS: maxConns, |
---|
971 | FCGI_MAX_REQS: maxReqs, |
---|
972 | FCGI_MPXS_CONNS: multiplexed and 1 or 0 |
---|
973 | } |
---|
974 | else: |
---|
975 | self._connectionClass = Connection |
---|
976 | self.capability = { |
---|
977 | # If threads aren't available, these are pretty much correct. |
---|
978 | FCGI_MAX_CONNS: 1, |
---|
979 | FCGI_MAX_REQS: 1, |
---|
980 | FCGI_MPXS_CONNS: 0 |
---|
981 | } |
---|
982 | self._bindAddress = bindAddress |
---|
983 | self._umask = umask |
---|
984 | |
---|
985 | def _setupSocket(self): |
---|
986 | if self._bindAddress is None: # Run as a normal FastCGI? |
---|
987 | isFCGI = True |
---|
988 | |
---|
989 | sock = socket.fromfd(FCGI_LISTENSOCK_FILENO, socket.AF_INET, |
---|
990 | socket.SOCK_STREAM) |
---|
991 | try: |
---|
992 | sock.getpeername() |
---|
993 | except socket.error as e: |
---|
994 | if e.errno == errno.ENOTSOCK: |
---|
995 | # Not a socket, assume CGI context. |
---|
996 | isFCGI = False |
---|
997 | elif e.errno != errno.ENOTCONN: |
---|
998 | raise |
---|
999 | |
---|
1000 | # FastCGI/CGI discrimination is broken on Mac OS X. |
---|
1001 | # Set the environment variable FCGI_FORCE_CGI to "Y" or "y" |
---|
1002 | # if you want to run your app as a simple CGI. (You can do |
---|
1003 | # this with Apache's mod_env [not loaded by default in OS X |
---|
1004 | # client, ha ha] and the SetEnv directive.) |
---|
1005 | if not isFCGI or \ |
---|
1006 | os.environ.get('FCGI_FORCE_CGI', 'N').upper().startswith('Y'): |
---|
1007 | req = self.cgirequest_class(self) |
---|
1008 | req.run() |
---|
1009 | sys.exit(0) |
---|
1010 | else: |
---|
1011 | # Run as a server |
---|
1012 | oldUmask = None |
---|
1013 | if type(self._bindAddress) is str: |
---|
1014 | # Unix socket |
---|
1015 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
---|
1016 | try: |
---|
1017 | os.unlink(self._bindAddress) |
---|
1018 | except OSError: |
---|
1019 | pass |
---|
1020 | if self._umask is not None: |
---|
1021 | oldUmask = os.umask(self._umask) |
---|
1022 | else: |
---|
1023 | # INET socket |
---|
1024 | assert type(self._bindAddress) is tuple |
---|
1025 | assert len(self._bindAddress) == 2 |
---|
1026 | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
---|
1027 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
---|
1028 | |
---|
1029 | sock.bind(self._bindAddress) |
---|
1030 | sock.listen(socket.SOMAXCONN) |
---|
1031 | |
---|
1032 | if oldUmask is not None: |
---|
1033 | os.umask(oldUmask) |
---|
1034 | |
---|
1035 | return sock |
---|
1036 | |
---|
1037 | def _cleanupSocket(self, sock): |
---|
1038 | """Closes the main socket.""" |
---|
1039 | sock.close() |
---|
1040 | |
---|
1041 | def _installSignalHandlers(self): |
---|
1042 | self._oldSIGs = [(x,signal.getsignal(x)) for x in |
---|
1043 | (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)] |
---|
1044 | signal.signal(signal.SIGHUP, self._hupHandler) |
---|
1045 | signal.signal(signal.SIGINT, self._intHandler) |
---|
1046 | signal.signal(signal.SIGTERM, self._intHandler) |
---|
1047 | |
---|
1048 | def _restoreSignalHandlers(self): |
---|
1049 | for signum,handler in self._oldSIGs: |
---|
1050 | signal.signal(signum, handler) |
---|
1051 | |
---|
1052 | def _hupHandler(self, signum, frame): |
---|
1053 | self._hupReceived = True |
---|
1054 | self._keepGoing = False |
---|
1055 | |
---|
1056 | def _intHandler(self, signum, frame): |
---|
1057 | self._keepGoing = False |
---|
1058 | |
---|
1059 | def run(self, timeout=1.0): |
---|
1060 | """ |
---|
1061 | The main loop. Exits on SIGHUP, SIGINT, SIGTERM. Returns True if |
---|
1062 | SIGHUP was received, False otherwise. |
---|
1063 | """ |
---|
1064 | web_server_addrs = os.environ.get('FCGI_WEB_SERVER_ADDRS') |
---|
1065 | if web_server_addrs is not None: |
---|
1066 | web_server_addrs = map(lambda x: x.strip(), |
---|
1067 | web_server_addrs.split(',')) |
---|
1068 | |
---|
1069 | sock = self._setupSocket() |
---|
1070 | |
---|
1071 | self._keepGoing = True |
---|
1072 | self._hupReceived = False |
---|
1073 | |
---|
1074 | # Install signal handlers. |
---|
1075 | self._installSignalHandlers() |
---|
1076 | |
---|
1077 | while self._keepGoing: |
---|
1078 | try: |
---|
1079 | r, w, e = select.select([sock], [], [], timeout) |
---|
1080 | except select.error, e: |
---|
1081 | if e.errno == errno.EINTR: |
---|
1082 | continue |
---|
1083 | raise |
---|
1084 | |
---|
1085 | if r: |
---|
1086 | try: |
---|
1087 | clientSock, addr = sock.accept() |
---|
1088 | except socket.error as e: |
---|
1089 | if e.errno in (errno.EINTR, errno.EAGAIN): |
---|
1090 | continue |
---|
1091 | raise |
---|
1092 | |
---|
1093 | if web_server_addrs and \ |
---|
1094 | (len(addr) != 2 or addr[0] not in web_server_addrs): |
---|
1095 | clientSock.close() |
---|
1096 | continue |
---|
1097 | |
---|
1098 | # Instantiate a new Connection and begin processing FastCGI |
---|
1099 | # messages (either in a new thread or this thread). |
---|
1100 | conn = self._connectionClass(clientSock, addr, self) |
---|
1101 | thread.start_new_thread(conn.run, ()) |
---|
1102 | |
---|
1103 | self._mainloopPeriodic() |
---|
1104 | |
---|
1105 | # Restore signal handlers. |
---|
1106 | self._restoreSignalHandlers() |
---|
1107 | |
---|
1108 | self._cleanupSocket(sock) |
---|
1109 | |
---|
1110 | return self._hupReceived |
---|
1111 | |
---|
1112 | def _mainloopPeriodic(self): |
---|
1113 | """ |
---|
1114 | Called with just about each iteration of the main loop. Meant to |
---|
1115 | be overridden. |
---|
1116 | """ |
---|
1117 | pass |
---|
1118 | |
---|
1119 | def _exit(self, reload=False): |
---|
1120 | """ |
---|
1121 | Protected convenience method for subclasses to force an exit. Not |
---|
1122 | really thread-safe, which is why it isn't public. |
---|
1123 | """ |
---|
1124 | if self._keepGoing: |
---|
1125 | self._keepGoing = False |
---|
1126 | self._hupReceived = reload |
---|
1127 | |
---|
1128 | def handler(self, req): |
---|
1129 | """ |
---|
1130 | Default handler, which just raises an exception. Unless a handler |
---|
1131 | is passed at initialization time, this must be implemented by |
---|
1132 | a subclass. |
---|
1133 | """ |
---|
1134 | raise NotImplementedError, self.__class__.__name__ + '.handler' |
---|
1135 | |
---|
1136 | def error(self, req): |
---|
1137 | """ |
---|
1138 | Called by Request if an exception occurs within the handler. May and |
---|
1139 | should be overridden. |
---|
1140 | """ |
---|
1141 | import cgitb |
---|
1142 | req.stdout.write('Content-Type: text/html\r\n\r\n' + |
---|
1143 | cgitb.html(sys.exc_info())) |
---|
1144 | |
---|
1145 | class WSGIServer(Server): |
---|
1146 | """ |
---|
1147 | FastCGI server that supports the Web Server Gateway Interface. See |
---|
1148 | <http://www.python.org/peps/pep-0333.html>. |
---|
1149 | """ |
---|
1150 | def __init__(self, application, environ=None, |
---|
1151 | multithreaded=True, **kw): |
---|
1152 | """ |
---|
1153 | environ, if present, must be a dictionary-like object. Its |
---|
1154 | contents will be copied into application's environ. Useful |
---|
1155 | for passing application-specific variables. |
---|
1156 | |
---|
1157 | Set multithreaded to False if your application is not MT-safe. |
---|
1158 | """ |
---|
1159 | if kw.has_key('handler'): |
---|
1160 | del kw['handler'] # Doesn't make sense to let this through |
---|
1161 | super(WSGIServer, self).__init__(**kw) |
---|
1162 | |
---|
1163 | if environ is None: |
---|
1164 | environ = {} |
---|
1165 | |
---|
1166 | self.application = application |
---|
1167 | self.environ = environ |
---|
1168 | self.multithreaded = multithreaded |
---|
1169 | |
---|
1170 | # Used to force single-threadedness |
---|
1171 | self._app_lock = thread.allocate_lock() |
---|
1172 | |
---|
1173 | def handler(self, req): |
---|
1174 | """Special handler for WSGI.""" |
---|
1175 | if req.role != FCGI_RESPONDER: |
---|
1176 | return FCGI_UNKNOWN_ROLE, 0 |
---|
1177 | |
---|
1178 | # Mostly taken from example CGI gateway. |
---|
1179 | environ = req.params |
---|
1180 | environ.update(self.environ) |
---|
1181 | |
---|
1182 | environ['wsgi.version'] = (1,0) |
---|
1183 | environ['wsgi.input'] = req.stdin |
---|
1184 | if self._bindAddress is None: |
---|
1185 | stderr = req.stderr |
---|
1186 | else: |
---|
1187 | stderr = TeeOutputStream((sys.stderr, req.stderr)) |
---|
1188 | environ['wsgi.errors'] = stderr |
---|
1189 | environ['wsgi.multithread'] = not isinstance(req, CGIRequest) and \ |
---|
1190 | thread_available and self.multithreaded |
---|
1191 | # Rationale for the following: If started by the web server |
---|
1192 | # (self._bindAddress is None) in either FastCGI or CGI mode, the |
---|
1193 | # possibility of being spawned multiple times simultaneously is quite |
---|
1194 | # real. And, if started as an external server, multiple copies may be |
---|
1195 | # spawned for load-balancing/redundancy. (Though I don't think |
---|
1196 | # mod_fastcgi supports this?) |
---|
1197 | environ['wsgi.multiprocess'] = True |
---|
1198 | environ['wsgi.run_once'] = isinstance(req, CGIRequest) |
---|
1199 | |
---|
1200 | if environ.get('HTTPS', 'off') in ('on', '1'): |
---|
1201 | environ['wsgi.url_scheme'] = 'https' |
---|
1202 | else: |
---|
1203 | environ['wsgi.url_scheme'] = 'http' |
---|
1204 | |
---|
1205 | self._sanitizeEnv(environ) |
---|
1206 | |
---|
1207 | headers_set = [] |
---|
1208 | headers_sent = [] |
---|
1209 | result = None |
---|
1210 | |
---|
1211 | def write(data): |
---|
1212 | assert type(data) is str, 'write() argument must be string' |
---|
1213 | assert headers_set, 'write() before start_response()' |
---|
1214 | |
---|
1215 | if not headers_sent: |
---|
1216 | status, responseHeaders = headers_sent[:] = headers_set |
---|
1217 | found = False |
---|
1218 | for header,value in responseHeaders: |
---|
1219 | if header.lower() == 'content-length': |
---|
1220 | found = True |
---|
1221 | break |
---|
1222 | if not found and result is not None: |
---|
1223 | try: |
---|
1224 | if len(result) == 1: |
---|
1225 | responseHeaders.append(('Content-Length', |
---|
1226 | str(len(data)))) |
---|
1227 | except: |
---|
1228 | pass |
---|
1229 | s = 'Status: %s\r\n' % status |
---|
1230 | for header in responseHeaders: |
---|
1231 | s += '%s: %s\r\n' % header |
---|
1232 | s += '\r\n' |
---|
1233 | req.stdout.write(s) |
---|
1234 | |
---|
1235 | req.stdout.write(data) |
---|
1236 | req.stdout.flush() |
---|
1237 | |
---|
1238 | def start_response(status, response_headers, exc_info=None): |
---|
1239 | if exc_info: |
---|
1240 | try: |
---|
1241 | if headers_sent: |
---|
1242 | # Re-raise if too late |
---|
1243 | raise exc_info[0], exc_info[1], exc_info[2] |
---|
1244 | finally: |
---|
1245 | exc_info = None # avoid dangling circular ref |
---|
1246 | else: |
---|
1247 | assert not headers_set, 'Headers already set!' |
---|
1248 | |
---|
1249 | assert type(status) is str, 'Status must be a string' |
---|
1250 | assert len(status) >= 4, 'Status must be at least 4 characters' |
---|
1251 | assert int(status[:3]), 'Status must begin with 3-digit code' |
---|
1252 | assert status[3] == ' ', 'Status must have a space after code' |
---|
1253 | assert type(response_headers) is list, 'Headers must be a list' |
---|
1254 | if __debug__: |
---|
1255 | for name,val in response_headers: |
---|
1256 | assert type(name) is str, 'Header names must be strings' |
---|
1257 | assert type(val) is str, 'Header values must be strings' |
---|
1258 | |
---|
1259 | headers_set[:] = [status, response_headers] |
---|
1260 | return write |
---|
1261 | |
---|
1262 | if not self.multithreaded: |
---|
1263 | self._app_lock.acquire() |
---|
1264 | try: |
---|
1265 | try: |
---|
1266 | result = self.application(environ, start_response) |
---|
1267 | try: |
---|
1268 | for data in result: |
---|
1269 | if data: |
---|
1270 | write(data) |
---|
1271 | if not headers_sent: |
---|
1272 | write('') # in case body was empty |
---|
1273 | finally: |
---|
1274 | if hasattr(result, 'close'): |
---|
1275 | result.close() |
---|
1276 | except socket.error as e: |
---|
1277 | if e.errno != errno.EPIPE: |
---|
1278 | raise # Don't let EPIPE propagate beyond server |
---|
1279 | finally: |
---|
1280 | if not self.multithreaded: |
---|
1281 | self._app_lock.release() |
---|
1282 | |
---|
1283 | return FCGI_REQUEST_COMPLETE, 0 |
---|
1284 | |
---|
1285 | def _sanitizeEnv(self, environ): |
---|
1286 | """Ensure certain values are present, if required by WSGI.""" |
---|
1287 | if not environ.has_key('SCRIPT_NAME'): |
---|
1288 | environ['SCRIPT_NAME'] = '' |
---|
1289 | if not environ.has_key('PATH_INFO'): |
---|
1290 | environ['PATH_INFO'] = '' |
---|
1291 | |
---|
1292 | # If any of these are missing, it probably signifies a broken |
---|
1293 | # server... |
---|
1294 | for name,default in [('REQUEST_METHOD', 'GET'), |
---|
1295 | ('SERVER_NAME', 'localhost'), |
---|
1296 | ('SERVER_PORT', '80'), |
---|
1297 | ('SERVER_PROTOCOL', 'HTTP/1.0')]: |
---|
1298 | if not environ.has_key(name): |
---|
1299 | environ['wsgi.errors'].write('%s: missing FastCGI param %s ' |
---|
1300 | 'required by WSGI!\n' % |
---|
1301 | (self.__class__.__name__, name)) |
---|
1302 | environ[name] = default |
---|
1303 | |
---|
1304 | if __name__ == '__main__': |
---|
1305 | def test_app(environ, start_response): |
---|
1306 | """Probably not the most efficient example.""" |
---|
1307 | import cgi |
---|
1308 | start_response('200 OK', [('Content-Type', 'text/html')]) |
---|
1309 | yield '<html><head><title>Hello World!</title></head>\n' \ |
---|
1310 | '<body>\n' \ |
---|
1311 | '<p>Hello World!</p>\n' \ |
---|
1312 | '<table border="1">' |
---|
1313 | names = environ.keys() |
---|
1314 | names.sort() |
---|
1315 | for name in names: |
---|
1316 | yield '<tr><td>%s</td><td>%s</td></tr>\n' % ( |
---|
1317 | name, cgi.escape(`environ[name]`)) |
---|
1318 | |
---|
1319 | form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ, |
---|
1320 | keep_blank_values=1) |
---|
1321 | if form.list: |
---|
1322 | yield '<tr><th colspan="2">Form data</th></tr>' |
---|
1323 | |
---|
1324 | for field in form.list: |
---|
1325 | yield '<tr><td>%s</td><td>%s</td></tr>\n' % ( |
---|
1326 | field.name, field.value) |
---|
1327 | |
---|
1328 | yield '</table>\n' \ |
---|
1329 | '</body></html>\n' |
---|
1330 | |
---|
1331 | WSGIServer(test_app).run() |
---|