[PATCH 15 of 15] Threaded FastCGI module
vincent at ricardis.tudelft.nl
vincent at ricardis.tudelft.nl
Wed Aug 24 16:49:23 CDT 2005
Patch subject is complete summary.
# HG changeset patch
# User Vincent Wagelaar <vincent at ricardis.tudelft.nl>
# Node ID b48e6d0b232a38560668b1c8be43cd5ab4b44ee5
# Parent f859e9cba1b92000abe2c7628e0957918be5c32d
Threaded FastCGI module
diff -r f859e9cba1b9 -r b48e6d0b232a mercurial/thfcgi.py
--- /dev/null Mon Aug 22 08:22:29 2005
+++ b/mercurial/thfcgi.py Tue Aug 23 22:32:19 2005
@@ -0,0 +1,560 @@
+# -*- coding: iso-8859-1 -*-
+"""
+
+ thfcgi.py - FastCGI communication with thread support
+
+ Copyright Peter Åstrand <astrand at lysator.liu.se> 2001
+
+ Modified for MoinMoin by Oliver Graf <ograf at bitart.de> 2003
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+ $Id: thfcgi.py,v 1.1 2004/01/30 21:29:58 thomaswaldmann Exp $
+"""
+
+# TODO:
+#
+# Compare compare the number of bytes received on FCGI_STDIN with
+# CONTENT_LENGTH and abort the update if the two numbers are not equal.
+#
+
+# Imports
+import os
+import sys
+import select
+import string
+import socket
+import errno
+import cgi
+import thread
+from cStringIO import StringIO
+import struct
+
+# Maximum number of requests that can be handled
+FCGI_MAX_REQS = 50
+FCGI_MAX_CONNS = 50
+FCGI_VERSION_1 = 1
+# Can this application multiplex connections?
+FCGI_MPXS_CONNS = 0
+
+# Record types
+FCGI_BEGIN_REQUEST = 1
+FCGI_ABORT_REQUEST = 2
+FCGI_END_REQUEST = 3
+FCGI_PARAMS = 4
+FCGI_STDIN = 5
+FCGI_STDOUT = 6
+FCGI_STDERR = 7
+FCGI_DATA = 8
+FCGI_GET_VALUES = 9
+FCGI_GET_VALUES_RESULT = 10
+FCGI_UNKNOWN_TYPE = 11
+FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE
+
+# Types of management records
+KNOWN_MANAGEMENT_TYPES = [FCGI_GET_VALUES]
+
+FCGI_NULL_REQUEST_ID = 0
+
+# Masks for flags component of FCGI_BEGIN_REQUEST
+FCGI_KEEP_CONN = 1
+
+# Values for role component of FCGI_BEGIN_REQUEST
+FCGI_RESPONDER = 1
+FCGI_AUTHORIZER = 2
+FCGI_FILTER = 3
+
+# Values for protocolStatus component of FCGI_END_REQUEST
+FCGI_REQUEST_COMPLETE = 0 # Request completed ok
+FCGI_CANT_MPX_CONN = 1 # This app cannot multiplex
+FCGI_OVERLOADED = 2 # Too busy
+FCGI_UNKNOWN_ROLE = 3 # Role value not known
+
+# Struct format types
+FCGI_BeginRequestBody = "!HB5x"
+FCGI_Record_header = "!BBHHBx"
+FCGI_UnknownTypeBody = "!B7x"
+FCGI_EndRequestBody = "!IB3x"
+
+class Record:
+ """Class representing FastCGI records"""
+
+ def __init__(self):
+ """Initialize FastCGI record"""
+ self.version = FCGI_VERSION_1
+ self.rec_type = FCGI_UNKNOWN_TYPE
+ self.req_id = FCGI_NULL_REQUEST_ID
+ self.content = ""
+
+ # Only in FCGI_BEGIN_REQUEST
+ self.role = None
+ self.flags = None
+ self.keep_conn = 0
+
+ # Only in FCGI_UNKNOWN_TYPE
+ self.unknownType = None
+
+ # Only in FCGI_END_REQUEST
+ self.appStatus = None
+ self.protocolStatus = None
+
+ def read_pair(self, data, pos):
+ """Read a FastCGI key-value pair from the server."""
+ namelen = struct.unpack("!B", data[pos])[0]
+ if namelen & 128:
+ # 4-byte name length
+ namelen = struct.unpack("!I", data[pos:pos+4])[0]
+ pos += 4
+ else:
+ pos += 1
+
+ valuelen = struct.unpack("!B", data[pos])[0]
+ if valuelen & 128:
+ # 4-byte value length
+ valuelen = struct.unpack("!I", data[pos:pos+4])[0]
+ pos += 4
+ else:
+ pos += 1
+
+ name = data[pos:pos+namelen]
+ pos += namelen
+ value = data[pos:pos+valuelen]
+ pos += valuelen
+
+ return (name, value, pos)
+
+ def write_pair(self, name, value):
+ """Write a FastCGI key-value pair to the server."""
+ namelen = len(name)
+ if namelen < 128:
+ data = struct.pack("!B", namelen)
+ else:
+ # 4-byte name length
+ data = struct.pack("!I", namelen)
+
+ valuelen = len(value)
+ if valuelen < 128:
+ data += struct.pack("!B", value)
+ else:
+ # 4-byte value length
+ data += struct.pack("!I", value)
+
+ return data + name + value
+
+ def readRecord(self, sock):
+ """Read a FastCGI record from the server."""
+ data = sock.recv(8)
+ if not data:
+ # No data recieved. This means EOF.
+ return None
+
+ fields = struct.unpack(FCGI_Record_header, data)
+ (self.version, self.rec_type, self.req_id,
+ contentLength, paddingLength) = fields
+
+ self.content = ""
+ while len(self.content) < contentLength:
+ data = sock.recv(contentLength - len(self.content))
+ self.content = self.content + data
+ if paddingLength != 0:
+ sock.recv(paddingLength)
+
+ # Parse the content information
+ if self.rec_type == FCGI_BEGIN_REQUEST:
+ (self.role, self.flags) = struct.unpack(FCGI_BeginRequestBody,
+ self.content)
+ self.keep_conn = self.flags & FCGI_KEEP_CONN
+
+ elif self.rec_type == FCGI_UNKNOWN_TYPE:
+ self.unknownType = struct.unpack(FCGI_UnknownTypeBody, self.content)
+
+ elif self.rec_type == FCGI_GET_VALUES or self.rec_type == FCGI_PARAMS:
+ self.values = {}
+ pos = 0
+ while pos < len(self.content):
+ name, value, pos = self.read_pair(self.content, pos)
+ self.values[name] = value
+ elif self.rec_type == FCGI_END_REQUEST:
+ (self.appStatus,
+ self.protocolStatus) = struct.unpack(FCGI_EndRequestBody,
+ self.content)
+
+ return 1
+
+ def writeRecord(self, sock):
+ """Write a FastCGI record to the server."""
+ content = self.content
+ if self.rec_type == FCGI_BEGIN_REQUEST:
+ content = struct.pack(FCGI_BeginRequestBody, self.role, self.flags)
+
+ elif self.rec_type == FCGI_UNKNOWN_TYPE:
+ content = struct.pack(FCGI_UnknownTypeBody, self.unknownType)
+
+ elif self.rec_type == FCGI_GET_VALUES or self.rec_type == FCGI_PARAMS:
+ content = ""
+ for i in self.values.keys():
+ content = content + self.write_pair(i, self.values[i])
+
+ elif self.rec_type == FCGI_END_REQUEST:
+ content = struct.pack(FCGI_EndRequestBody, self.appStatus,
+ self.protocolStatus)
+
+ # Align to 8-byte boundary
+ clen = len(content)
+ padlen = ((clen + 7) & 0xfff8) - clen
+
+ hdr = struct.pack(FCGI_Record_header, self.version, self.rec_type,
+ self.req_id, clen, padlen)
+
+ try:
+ sock.send(hdr + content + padlen*"\x00")
+ except socket.error:
+ # Write error, probably broken pipe. Exit thread.
+ thread.exit()
+
+
+class Request:
+ """A request, corresponding to an accept():ed connection and
+ a FCGI request."""
+
+ def __init__(self, conn, req_handler, multi=1):
+ """Initialize Request container."""
+ self.conn = conn
+ self.req_handler = req_handler
+ self.multi = multi
+
+ self.keep_conn = 0
+ self.req_id = None
+
+ # Input
+ self.env = {}
+ self.env_complete = 0
+ self.stdin = StringIO()
+ self.stdin_complete = 0
+ self.data = StringIO()
+ self.data_complete = 0
+
+ # Output
+ self.out = StringIO()
+ self.err = StringIO()
+
+ self.have_finished = 0
+
+ def run(self):
+ """Read records for this request and handle them through the
+ request handler."""
+ while 1:
+ if self.conn.fileno() < 1:
+ # Connection lost
+ return
+
+ select.select([self.conn], [], [])
+ rec = Record()
+ if rec.readRecord(self.conn):
+ self._handle_record(rec)
+ else:
+ # EOF, connection closed. Break loop, end thread.
+ return
+
+ def getFieldStorage(self):
+ """Return a cgi FieldStorage constructed from the stdin and
+ environ read from the server for this request."""
+ self.stdin.reset()
+ return cgi.FieldStorage(fp=self.stdin, environ=self.env,
+ keep_blank_values=1)
+
+ def _flush(self, stream):
+ """Flush a stream of this request."""
+ stream.reset()
+
+ rec = Record()
+ rec.rec_type = FCGI_STDOUT
+ rec.req_id = self.req_id
+ data = stream.read()
+
+ if not data:
+ # Writing zero bytes would mean stream termination
+ return
+
+ while data:
+ chunk, data = self.getNextChunk(data)
+ rec.content = chunk
+ rec.writeRecord(self.conn)
+ # Truncate
+ stream.reset()
+ stream.truncate()
+
+ def flush_out(self):
+ """Flush Requests stdout stream."""
+ self._flush(self.out)
+
+ def flush_err(self):
+ """Flush Requests stderr stream."""
+ self._flush(self.err)
+
+ def finish(self, status=0):
+ """Finish this Request, flushing all output and
+ possible exiting this thread."""
+ if self.have_finished:
+ return
+
+ self.have_finished = 1
+
+ # stderr
+ self.err.reset()
+ rec = Record()
+ rec.rec_type = FCGI_STDERR
+ rec.req_id = self.req_id
+ data = self.err.read()
+ while data:
+ chunk, data = self.getNextChunk(data)
+ rec.content = chunk
+ rec.writeRecord(self.conn)
+ rec.content = ""
+ rec.writeRecord(self.conn) # Terminate stream
+
+ # stdout
+ self.out.reset()
+ rec = Record()
+ rec.rec_type = FCGI_STDOUT
+ rec.req_id = self.req_id
+ data = self.out.read()
+ while data:
+ chunk, data = self.getNextChunk(data)
+ rec.content = chunk
+ rec.writeRecord(self.conn)
+ rec.content = ""
+ rec.writeRecord(self.conn) # Terminate stream
+
+ # end request
+ rec = Record()
+ rec.rec_type = FCGI_END_REQUEST
+ rec.req_id = self.req_id
+ rec.appStatus = status
+ rec.protocolStatus = FCGI_REQUEST_COMPLETE
+ rec.writeRecord(self.conn)
+ if not self.keep_conn:
+ self.conn.close()
+ if self.multi:
+ thread.exit()
+
+ #
+ # Record handlers
+ #
+ def _handle_record(self, rec):
+ """Handle record."""
+ if rec.req_id == FCGI_NULL_REQUEST_ID:
+ # Management record
+ self._handle_man_record(rec)
+ else:
+ # Application record
+ self._handle_app_record(rec)
+
+ def _handle_man_record(self, rec):
+ """Handle management record."""
+ rec_type = rec.rec_type
+ if rec_type in KNOWN_MANAGEMENT_TYPES:
+ self._handle_known_man_types(rec)
+ else:
+ # It's a management record of an unknown
+ # type. Signal the error.
+ rec = Record()
+ rec.rec_type = FCGI_UNKNOWN_TYPE
+ rec.unknownType = rec_type
+ rec.writeRecord(self.conn)
+
+ def _handle_known_man_types(self, rec):
+ """Handle a known management record."""
+ if rec.rec_type == FCGI_GET_VALUES:
+ reply_rec = Record()
+ reply_rec.rec_type = FCGI_GET_VALUES_RESULT
+
+ params = {'FCGI_MAX_CONNS' : FCGI_MAX_CONNS,
+ 'FCGI_MAX_REQS' : FCGI_MAX_REQS,
+ 'FCGI_MPXS_CONNS' : FCGI_MPXS_CONNS}
+
+ for name in rec.values.keys():
+ if params.has_key(name):
+ # We known this value, include in reply
+ reply_rec.values[name] = params[name]
+
+ rec.writeRecord(self.conn)
+
+ def _handle_app_record(self, rec):
+ """Handle an application record. This calls the specified
+ request_handler, if environ and stdin is complete."""
+ if rec.rec_type == FCGI_BEGIN_REQUEST:
+ # Discrete
+ self._handle_begin_request(rec)
+ return
+ elif rec.req_id != self.req_id:
+ #print >> sys.stderr, "Recieved unknown request ID", rec.req_id
+ # Ignore requests that aren't active
+ return
+ if rec.rec_type == FCGI_ABORT_REQUEST:
+ # Discrete
+ rec.rec_type = FCGI_END_REQUEST
+ rec.protocolStatus = FCGI_REQUEST_COMPLETE
+ rec.appStatus = 0
+ rec.writeRecord(self.conn)
+ return
+ elif rec.rec_type == FCGI_PARAMS:
+ # Stream
+ self._handle_params(rec)
+ elif rec.rec_type == FCGI_STDIN:
+ # Stream
+ self._handle_stdin(rec)
+ elif rec.rec_type == FCGI_DATA:
+ # Stream
+ self._handle_data(rec)
+ else:
+ # Should never happen.
+ #print >> sys.stderr, "Recieved unknown FCGI record type", rec.rec_type
+ pass
+
+ if self.env_complete and self.stdin_complete:
+ # Call application request handler.
+ # The arguments sent to the request handler is:
+ # self: us.
+ # req: The request.
+ # env: The request environment
+ # form: FieldStorage.
+ self.req_handler(self, self.env, self.getFieldStorage())
+ #, self.getFieldStorage())
+
+ def _handle_begin_request(self, rec):
+ """Handle begin request."""
+ if rec.role != FCGI_RESPONDER:
+ # Unknown role, signal error.
+ rec.rec_type = FCGI_END_REQUEST
+ rec.appStatus = 0
+ rec.protocolStatus = FCGI_UNKNOWN_ROLE
+ rec.writeRecord(self.conn)
+ return
+
+ self.req_id = rec.req_id
+ self.keep_conn = rec.keep_conn
+
+ def _handle_params(self, rec):
+ """Handle environment."""
+ if self.env_complete:
+ # Should not happen
+ #print >> sys.stderr, "Recieved FCGI_PARAMS more than once"
+ return
+
+ if not rec.content:
+ self.env_complete = 1
+
+ # Add all vars to our environment
+ self.env.update(rec.values)
+
+ def _handle_stdin(self, rec):
+ """Handle stdin."""
+ if self.stdin_complete:
+ # Should not happen
+ #print >> sys.stderr, "Recieved FCGI_STDIN more than once"
+ return
+
+ if not rec.content:
+ self.stdin_complete = 1
+
+ self.stdin.write(rec.content)
+
+ def _handle_data(self, rec):
+ """Handle data."""
+ if self.data_complete:
+ # Should not happen
+ #print >> sys.stderr, "Recieved FCGI_DATA more than once"
+ return
+
+ if not rec.content:
+ self.data_complete = 1
+
+ self.data.write(rec.content)
+
+ def getNextChunk(self, data):
+ """Helper function which returns chunks of data."""
+ chunk = data[:8192]
+ data = data[8192:]
+ return chunk, data
+
+
+class THFCGI:
+ """Multi-threaded main loop to handle FastCGI Requests."""
+
+ def __init__(self, req_handler, fd=sys.stdin):
+ """Initialize main loop and set request_handler."""
+ self.req_handler = req_handler
+ self.fd = fd
+ self.multi = 1
+ self._make_socket()
+
+ def run(self):
+ """Wait & serve. Calls request_handler in new
+ thread on every request."""
+ self.sock.listen(5)
+
+ while 1:
+ (conn, addr) = self.sock.accept()
+ thread.start_new_thread(self.accept_handler, (conn, addr))
+
+ def accept_handler(self, conn, addr):
+ """Construct Request and run() it.
+ Executed from run in a new thread."""
+ self._check_good_addrs(addr)
+ req = Request(conn, self.req_handler, self.multi)
+ req.run()
+
+ def _make_socket(self):
+ """Create socket and verify FCGI environment."""
+ try:
+ s = socket.fromfd(self.fd.fileno(), socket.AF_INET,
+ socket.SOCK_STREAM)
+ s.getpeername()
+ except socket.error, (err, errmsg):
+ if err != errno.ENOTCONN:
+ raise "No FastCGI environment"
+
+ self.sock = s
+
+ def _check_good_addrs(self, addr):
+ """Check if request is done from the right server."""
+ # Apaches mod_fastcgi seems not to use FCGI_WEB_SERVER_ADDRS.
+ if os.environ.has_key('FCGI_WEB_SERVER_ADDRS'):
+ good_addrs = string.split(os.environ['FCGI_WEB_SERVER_ADDRS'], ',')
+ good_addrs = map(string.strip, good_addrs) # Remove whitespace
+ else:
+ good_addrs = None
+
+ # Check if the connection is from a legal address
+ if good_addrs != None and addr not in good_addrs:
+ raise "Connection from invalid server!"
+
+
+class unTHFCGI(THFCGI):
+ """Single-threaded main loop to handle FastCGI Requests."""
+
+ def __init__(self, req_handler, fd=sys.stdin):
+ """Initialize main loop and set request_handler."""
+ THFCGI.__init__(self, req_handler, fd)
+ self.multi = 0
+
+ def run(self):
+ """Wait & serve. Calls request handler for every request (blocking)."""
+ self.sock.listen(5)
+
+ while 1:
+ (conn, addr) = self.sock.accept()
+ self.accept_handler(conn, addr)
+
More information about the Mercurial
mailing list