class variable.
+.. attribute:: BaseHTTPRequestHandler.error_content_type
+
+ Specifies the Content-Type HTTP header of error responses sent to the client.
+ The default value is ``'text/html'``.
+
+ .. versionadded:: 2.6
+ Previously, the content type was always ``'text/html'``.
+
+
.. attribute:: BaseHTTPRequestHandler.protocol_version
This specifies the HTTP protocol version used in responses. If set to
expressed in the format returned by :func:`time.ctime`. If not specified, the
strings default to blanks.
- :file:`Tools/scripts/diff.py` is a command-line front-end for this function.
+ ::
+
+ >>> s1 = ['bacon\n', 'eggs\n', 'ham\n', 'guido\n']
+ >>> s2 = ['python\n', 'eggy\n', 'hamster\n', 'guido\n']
+ >>> for line in context_diff(s1, s2, fromfile='before.py', tofile='after.py'):
+ ... sys.stdout.write(line)
+ *** before.py
+ --- after.py
+ ***************
+ *** 1,4 ****
+ ! bacon
+ ! eggs
+ ! ham
+ guido
+ --- 1,4 ----
+ ! python
+ ! eggy
+ ! hamster
+ guido
+
+ See :ref:`difflib-interface` for a more detailed example.
.. function:: get_close_matches(word, possibilities[, n][, cutoff])
expressed in the format returned by :func:`time.ctime`. If not specified, the
strings default to blanks.
- :file:`Tools/scripts/diff.py` is a command-line front-end for this function.
+ ::
+
+ >>> s1 = ['bacon\n', 'eggs\n', 'ham\n', 'guido\n']
+ >>> s2 = ['python\n', 'eggy\n', 'hamster\n', 'guido\n']
+ >>> for line in unified_diff(s1, s2, fromfile='before.py', tofile='after.py'):
+ ... sys.stdout.write(line)
+ --- before.py
+ +++ after.py
+ @@ -1,4 +1,4 @@
+ -bacon
+ -eggs
+ -ham
+ +python
+ +eggy
+ +hamster
+ guido
+
+ See :ref:`difflib-interface` for a more detailed example.
.. function:: IS_LINE_JUNK(line)
? ++++ ^ ^
+ 5. Flat is better than nested.
+
+.. _difflib-interface:
+
+A command-line interface to difflib
+-----------------------------------
+
+This example shows how to use difflib to create a ``diff``-like utility.
+It is also contained in the Python source distribution, as
+:file:`Tools/scripts/diff.py`.
+
+::
+
+ """ Command line interface to difflib.py providing diffs in four formats:
+
+ * ndiff: lists every line and highlights interline changes.
+ * context: highlights clusters of changes in a before/after format.
+ * unified: highlights clusters of changes in an inline format.
+ * html: generates side by side comparison with change highlights.
+
+ """
+
+ import sys, os, time, difflib, optparse
+
+ def main():
+ # Configure the option parser
+ usage = "usage: %prog [options] fromfile tofile"
+ parser = optparse.OptionParser(usage)
+ parser.add_option("-c", action="store_true", default=False,
+ help='Produce a context format diff (default)')
+ parser.add_option("-u", action="store_true", default=False,
+ help='Produce a unified format diff')
+ hlp = 'Produce HTML side by side diff (can use -c and -l in conjunction)'
+ parser.add_option("-m", action="store_true", default=False, help=hlp)
+ parser.add_option("-n", action="store_true", default=False,
+ help='Produce a ndiff format diff')
+ parser.add_option("-l", "--lines", type="int", default=3,
+ help='Set number of context lines (default 3)')
+ (options, args) = parser.parse_args()
+
+ if len(args) == 0:
+ parser.print_help()
+ sys.exit(1)
+ if len(args) != 2:
+ parser.error("need to specify both a fromfile and tofile")
+
+ n = options.lines
+ fromfile, tofile = args # as specified in the usage string
+
+ # we're passing these as arguments to the diff function
+ fromdate = time.ctime(os.stat(fromfile).st_mtime)
+ todate = time.ctime(os.stat(tofile).st_mtime)
+ fromlines = open(fromfile, 'U').readlines()
+ tolines = open(tofile, 'U').readlines()
+
+ if options.u:
+ diff = difflib.unified_diff(fromlines, tolines, fromfile, tofile,
+ fromdate, todate, n=n)
+ elif options.n:
+ diff = difflib.ndiff(fromlines, tolines)
+ elif options.m:
+ diff = difflib.HtmlDiff().make_file(fromlines, tolines, fromfile,
+ tofile, context=options.c,
+ numlines=n)
+ else:
+ diff = difflib.context_diff(fromlines, tolines, fromfile, tofile,
+ fromdate, todate, n=n)
+
+ # we're using writelines because diff is a generator
+ sys.stdout.writelines(diff)
+
+ if __name__ == '__main__':
+ main()
.. opcode:: STORE_NAME (namei)
Implements ``name = TOS``. *namei* is the index of *name* in the attribute
- :attr:`co_names` of the code object. The compiler tries to use ``STORE_LOCAL``
+ :attr:`co_names` of the code object. The compiler tries to use ``STORE_FAST``
or ``STORE_GLOBAL`` if possible.
exception to be raised.
+
+.. function:: siginterrupt(signalnum, flag)
+
+ Change system call restart behaviour: if *flag* is :const:`False`, system calls
+ will be restarted when interrupted by signal *signalnum*, else system calls will
+ be interrupted. Returns nothing. Availability: Unix, Mac (see the man page
+ :manpage:`siginterrupt(3)` for further information).
+
+ Note that installing a signal handler with :func:`signal` will reset the restart
+ behaviour to interruptible by implicitly calling siginterrupt with a true *flag*
+ value for the given signal.
+
+ .. versionadded:: 2.6
+
+
.. function:: signal(signalnum, handler)
Set the handler for signal *signalnum* to the function *handler*. *handler* can
Registers the XML-RPC multicall function system.multicall.
-.. attribute:: SimpleXMLRPCServer.rpc_paths
+.. attribute:: SimpleXMLRPCRequestHandler.rpc_paths
An attribute value that must be a tuple listing valid path portions of the URL
for receiving XML-RPC requests. Requests posted to other paths will result in a
Server code::
from SimpleXMLRPCServer import SimpleXMLRPCServer
+ from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
+
+ # Restrict to a particular path.
+ class RequestHandler(SimpleXMLRPCRequestHandler):
+ rpc_paths = ('/RPC2',)
# Create server
- server = SimpleXMLRPCServer(("localhost", 8000))
+ server = SimpleXMLRPCServer(("localhost", 8000),
+ requestHandler=RequestHandler)
server.register_introspection_functions()
# Register pow() function; this will use the value of
import mimetools
import SocketServer
-# Default error message
+# Default error message template
DEFAULT_ERROR_MESSAGE = """\
<head>
<title>Error response</title>
</body>
"""
+DEFAULT_ERROR_CONTENT_TYPE = "text/html;charset=utf-8"
+
def _quote_html(html):
return html.replace("&", "&").replace("<", "<").replace(">", ">")
# where each string is of the form name[/version].
server_version = "BaseHTTP/" + __version__
+ error_message_format = DEFAULT_ERROR_MESSAGE
+ error_content_type = DEFAULT_ERROR_CONTENT_TYPE
+
def parse_request(self):
"""Parse a request (internal).
content = (self.error_message_format %
{'code': code, 'message': _quote_html(message), 'explain': explain})
self.send_response(code, message)
- self.send_header("Content-Type", "text/html;charset=utf-8")
+ self.send_header("Content-Type", self.error_content_type)
self.send_header('Connection', 'close')
self.end_headers()
if self.command != 'HEAD' and code >= 200 and code not in (204, 304):
self.wfile.write(content.encode('UTF-8', 'replace'))
- error_message_format = DEFAULT_ERROR_MESSAGE
-
def send_response(self, code, message=None):
"""Send the response header and log the response code.
from _curses import *
from curses.wrapper import wrapper
+import os as _os
# Some constants, most notably the ACS_* ones, are only added to the C
# _curses module's dictionary after initscr() is called. (Some
def initscr():
import _curses, curses
+ # we call setupterm() here because it raises an error
+ # instead of calling exit() in error cases.
+ setupterm(term=_os.environ.get("TERM", "unknown"))
stdscr = _curses.initscr()
for key, value in _curses.__dict__.items():
if key[0:4] == 'ACS_' or key in ('LINES', 'COLS'):
>>> import logging, logging.handlers, logging.config
>>> from test import test_logging
-XXX: The test is unstable!
-#>>> test_logging.test_main_inner()
+>>> test_logging.test_main_inner()
ERR -> CRITICAL: Message 0 (via logrecv.tcp.ERR)
ERR -> ERROR: Message 1 (via logrecv.tcp.ERR)
INF -> CRITICAL: Message 2 (via logrecv.tcp.INF)
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
ThreadingTCPServer.__init__(self, (host, port), handler)
- self.abort = 0
+ self.abort = False
self.timeout = 1
def serve_until_stopped(self):
self.timeout)
if rd:
self.handle_request()
+ socketDataProcessed.set()
# close the listen socket
self.server_close()
def process_request(self, request, client_address):
- #import threading
t = threading.Thread(target = self.finish_request,
args = (request, client_address))
t.start()
rootLogger = logging.getLogger("")
rootLogger.setLevel(logging.DEBUG)
- # Find an unused port number
- port = logging.handlers.DEFAULT_TCP_LOGGING_PORT
- while port < logging.handlers.DEFAULT_TCP_LOGGING_PORT+100:
- try:
- tcpserver = LogRecordSocketReceiver(port=port)
- except socket.error:
- port += 1
- else:
- break
- else:
- raise ImportError("Could not find unused port")
-
-
- #Set up a handler such that all events are sent via a socket to the log
- #receiver (logrecv).
- #The handler will only be added to the rootLogger for some of the tests
+ tcpserver = LogRecordSocketReceiver(port=0)
+ port = tcpserver.socket.getsockname()[1]
+
+ # Set up a handler such that all events are sent via a socket to the log
+ # receiver (logrecv).
+ # The handler will only be added to the rootLogger for some of the tests
shdlr = logging.handlers.SocketHandler('localhost', port)
rootLogger.addHandler(shdlr)
- #Configure the logger for logrecv so events do not propagate beyond it.
- #The sockLogger output is buffered in memory until the end of the test,
- #and printed at the end.
+ # Configure the logger for logrecv so events do not propagate beyond it.
+ # The sockLogger output is buffered in memory until the end of the test,
+ # and printed at the end.
sockOut = io.StringIO()
sockLogger = logging.getLogger("logrecv")
sockLogger.setLevel(logging.DEBUG)
finally:
#wait for TCP receiver to terminate
-# socketDataProcessed.wait()
+ socketDataProcessed.wait()
# ensure the server dies
- tcpserver.abort = 1
+ tcpserver.abort = True
for thread in threads:
thread.join(2.0)
print(sockOut.getvalue())
import unittest
from test import test_support
import signal
-import os, sys, time
+import os, sys, time, errno
class HandlerBCalled(Exception):
pass
os.close(self.write)
signal.signal(signal.SIGALRM, self.alrm)
+class SiginterruptTest(unittest.TestCase):
+ signum = signal.SIGUSR1
+ def readpipe_interrupted(self, cb):
+ r, w = os.pipe()
+ ppid = os.getpid()
+ pid = os.fork()
+
+ oldhandler = signal.signal(self.signum, lambda x,y: None)
+ cb()
+ if pid==0:
+ # child code: sleep, kill, sleep. and then exit,
+ # which closes the pipe from which the parent process reads
+ try:
+ time.sleep(0.2)
+ os.kill(ppid, self.signum)
+ time.sleep(0.2)
+ finally:
+ os._exit(0)
+
+ try:
+ os.close(w)
+
+ try:
+ d=os.read(r, 1)
+ return False
+ except OSError as err:
+ if err.errno != errno.EINTR:
+ raise
+ return True
+ finally:
+ signal.signal(self.signum, oldhandler)
+ os.waitpid(pid, 0)
+
+ def test_without_siginterrupt(self):
+ i=self.readpipe_interrupted(lambda: None)
+ self.assertEquals(i, True)
+
+ def test_siginterrupt_on(self):
+ i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1))
+ self.assertEquals(i, True)
+
+ def test_siginterrupt_off(self):
+ i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0))
+ self.assertEquals(i, False)
def test_main():
if sys.platform[:3] in ('win', 'os2'):
sys.platform)
test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
- WakeupSignalTests)
+ WakeupSignalTests, SiginterruptTest)
if __name__ == "__main__":
None -- if an unknown handler is in effect\n\
anything else -- the callable Python object used as a handler");
+#ifdef HAVE_SIGINTERRUPT
+PyDoc_STRVAR(siginterrupt_doc,
+"siginterrupt(sig, flag) -> None\n\
+change system call restart behaviour: if flag is False, system calls\n\
+will be restarted when interrupted by signal sig, else system calls\n\
+will be interrupted.");
+
+static PyObject *
+signal_siginterrupt(PyObject *self, PyObject *args)
+{
+ int sig_num;
+ int flag;
+
+ if (!PyArg_ParseTuple(args, "ii:siginterrupt", &sig_num, &flag))
+ return NULL;
+ if (sig_num < 1 || sig_num >= NSIG) {
+ PyErr_SetString(PyExc_ValueError,
+ "signal number out of range");
+ return NULL;
+ }
+ if (siginterrupt(sig_num, flag)<0) {
+ PyErr_SetFromErrno(PyExc_RuntimeError);
+ return NULL;
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+#endif
static PyObject *
signal_set_wakeup_fd(PyObject *self, PyObject *args)
{"signal", signal_signal, METH_VARARGS, signal_doc},
{"getsignal", signal_getsignal, METH_VARARGS, getsignal_doc},
{"set_wakeup_fd", signal_set_wakeup_fd, METH_VARARGS, set_wakeup_fd_doc},
+#ifdef HAVE_SIGINTERRUPT
+ {"siginterrupt", signal_siginterrupt, METH_VARARGS, siginterrupt_doc},
+#endif
#ifdef HAVE_PAUSE
{"pause", (PyCFunction)signal_pause,
METH_NOARGS,pause_doc},