# General configuration
# ---------------------
-extensions = ['sphinx.addons.refcounting', 'sphinx.addons.coverage']
+extensions = ['sphinx.ext.refcounting', 'sphinx.ext.coverage']
# General substitutions.
project = 'Python'
yield element
+.. function:: combinations(iterable, r)
+
+ Return successive *r* length combinations of elements in the *iterable*.
+
+ Combinations are emitted in a lexicographic sort order. So, if the
+ input *iterable* is sorted, the combination tuples will be produced
+ in sorted order.
+
+ Elements are treated as unique based on their position, not on their
+ value. So if the input elements are unique, there will be no repeat
+ values within a single combination.
+
+ Each result tuple is ordered to match the input order. So, every
+ combination is a subsequence of the input *iterable*.
+
+ Example: ``combinations(range(4), 3) --> (0,1,2), (0,1,3), (0,2,3), (1,2,3)``
+
+ Equivalent to::
+
+ def combinations(iterable, r):
+ pool = tuple(iterable)
+ if pool:
+ n = len(pool)
+ vec = range(r)
+ yield tuple(pool[i] for i in vec)
+ while 1:
+ for i in reversed(range(r)):
+ if vec[i] == i + n-r:
+ continue
+ vec[i] += 1
+ for j in range(i+1, r):
+ vec[j] = vec[j-1] + 1
+ yield tuple(pool[i] for i in vec)
+ break
+ else:
+ return
+
+ .. versionadded:: 2.6
+
.. function:: count([n])
Make an iterator that returns consecutive integers starting with *n*. If not
The leftmost iterators are in the outermost for-loop, so the output tuples
cycle in a manner similar to an odometer (with the rightmost element
- changing on every iteration).
+ changing on every iteration). This results in a lexicographic ordering
+ so that if the inputs iterables are sorted, the product tuples are emitted
+ in sorted order.
- Equivalent to (but without building the entire result in memory)::
+ Equivalent to the following except that the actual implementation does not
+ build-up intermediate results in memory::
def product(*args):
pools = map(tuple, args)
exception will be received by an arbitrary thread. (When the :mod:`signal`
module is available, interrupts always go to the main thread.)
-* The import machinery is not thread safe. In general, an import may not
- have the side effect of importing a module, and only the main thread
- should import modules. Imports within or caused by a thread other than
- the main thread isn't safe.
-
* Calling :func:`sys.exit` or raising the :exc:`SystemExit` exception is
equivalent to calling :func:`exit`.
* When the main thread exits, it does not do any of its usual cleanup (except
that :keyword:`try` ... :keyword:`finally` clauses are honored), and the
standard I/O files are not flushed.
+
There is a "main thread" object; this corresponds to the initial thread of
control in the Python program. It is not a daemon thread.
-.. warning::
-
- The import machinery is not thread safe. In general, an import may not
- have the side effect of importing a module, and only the main thread
- should import modules. Imports within or caused by a thread other than
- the main thread isn't safe.
-
There is the possibility that "dummy thread objects" are created. These are
thread objects corresponding to "alien threads", which are threads of control
started outside the threading module, such as directly from C code. Dummy
from _curses import *
from curses.wrapper import wrapper
import os as _os
+import sys as _sys
# Some constants, most notably the ACS_* ones, are only added to the C
# _curses module's dictionary after initscr() is called. (Some
import _curses, curses
# we call setupterm() here because it raises an error
# instead of calling exit() in error cases.
- setupterm(term=_os.environ.get("TERM", "unknown"))
+ setupterm(term=_os.environ.get("TERM", "unknown"),
+ fd=_sys.__stdout__.fileno())
stdscr = _curses.initscr()
for key, value in _curses.__dict__.items():
if key[0:4] == 'ACS_' or key in ('LINES', 'COLS'):
curses.wrapper(main)
unit_tests()
else:
+ # testing setupterm() inside initscr/endwin
+ # causes terminal breakage
+ curses.setupterm(fd=sys.__stdout__.fileno())
try:
- # testing setupterm() inside initscr/endwin
- # causes terminal breakage
- curses.setupterm(fd=sys.__stdout__.fileno())
stdscr = curses.initscr()
main(stdscr)
finally:
curses.endwin()
-
unit_tests()
from unittest import TestCase
from test import test_support
-def server(evt, ready):
+server_port = None
+
+# This function sets the evt 3 times:
+# 1) when the connection is ready to be accepted.
+# 2) when it is safe for the caller to close the connection
+# 3) when we have closed the socket
+def server(evt):
+ global server_port
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv.settimeout(3)
serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", 9091))
+ server_port = test_support.bind_port(serv, "", 9091)
serv.listen(5)
- ready.set()
+
+ # (1) Signal the caller that we are ready to accept the connection.
+ evt.set()
try:
conn, addr = serv.accept()
except socket.timeout:
pass
else:
conn.send(b"1 Hola mundo\n")
+ # (2) Signal the caller that it is safe to close the socket.
+ evt.set()
conn.close()
finally:
serv.close()
+ # (3) Signal the caller that we are done.
evt.set()
class GeneralTests(TestCase):
def setUp(self):
- ftplib.FTP.port = 9091
self.evt = threading.Event()
- ready = threading.Event()
- threading.Thread(target=server, args=(self.evt, ready)).start()
- ready.wait()
+ threading.Thread(target=server, args=(self.evt,)).start()
+ # Wait for the server to be ready.
+ self.evt.wait()
+ self.evt.clear()
+ ftplib.FTP.port = server_port
def tearDown(self):
+ # Wait on the closing of the socket (this shouldn't be necessary).
self.evt.wait()
def testBasic(self):
# connects
ftp = ftplib.FTP("localhost")
+ self.evt.wait()
ftp.sock.close()
def testTimeoutDefault(self):
# default
ftp = ftplib.FTP("localhost")
self.assertTrue(ftp.sock.gettimeout() is None)
+ self.evt.wait()
ftp.sock.close()
def testTimeoutValue(self):
# a value
ftp = ftplib.FTP("localhost", timeout=30)
self.assertEqual(ftp.sock.gettimeout(), 30)
+ self.evt.wait()
ftp.sock.close()
def testTimeoutConnect(self):
ftp = ftplib.FTP()
ftp.connect("localhost", timeout=30)
self.assertEqual(ftp.sock.gettimeout(), 30)
+ self.evt.wait()
ftp.sock.close()
def testTimeoutDifferentOrder(self):
ftp = ftplib.FTP(timeout=30)
ftp.connect("localhost")
self.assertEqual(ftp.sock.gettimeout(), 30)
+ self.evt.wait()
ftp.sock.close()
def testTimeoutDirectAccess(self):
ftp.timeout = 30
ftp.connect("localhost")
self.assertEqual(ftp.sock.gettimeout(), 30)
+ self.evt.wait()
ftp.sock.close()
def testTimeoutNone(self):
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(ftp.sock.gettimeout(), 30)
+ self.evt.wait()
ftp.close()
-
def test_main(verbose=None):
test_support.run_unittest(GeneralTests)
def test_ifilter(self):
self.assertEqual(list(ifilter(isEven, range(6))), [0,2,4])
self.assertEqual(list(ifilter(None, [0,1,0,2,0])), [1,2])
+ self.assertEqual(list(ifilter(bool, [0,1,0,2,0])), [1,2])
self.assertEqual(take(4, ifilter(isEven, count())), [0,2,4,6])
self.assertRaises(TypeError, ifilter)
self.assertRaises(TypeError, ifilter, lambda x:x)
def test_ifilterfalse(self):
self.assertEqual(list(ifilterfalse(isEven, range(6))), [1,3,5])
self.assertEqual(list(ifilterfalse(None, [0,1,0,2,0])), [0,0,0])
+ self.assertEqual(list(ifilterfalse(bool, [0,1,0,2,0])), [0,0,0])
self.assertEqual(take(4, ifilterfalse(isEven, count())), [1,3,5,7])
self.assertRaises(TypeError, ifilterfalse)
self.assertRaises(TypeError, ifilterfalse, lambda x:x)
for _name, _value in list(globals().items()):
if type(_value) is type(0):
tok_name[_value] = _name
+del _name, _value
def ISTERMINAL(x):
0, /*tp_as_number*/
&dbm_as_sequence, /*tp_as_sequence*/
&dbm_as_mapping, /*tp_as_mapping*/
+ 0, /*tp_hash*/
+ 0, /*tp_call*/
+ 0, /*tp_str*/
+ 0, /*tp_getattro*/
+ 0, /*tp_setattro*/
+ 0, /*tp_as_buffer*/
+ Py_TPFLAGS_DEFAULT, /*tp_xxx4*/
};
/* ----------------------------------------------------------------- */
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
- 0, /*tp_xxx4*/
+ Py_TPFLAGS_DEFAULT, /*tp_xxx4*/
gdbm_object__doc__, /*tp_doc*/
};
if (item == NULL)
return NULL;
- if (lz->func == Py_None) {
+ if (lz->func == Py_None || lz->func == (PyObject *)&PyBool_Type) {
ok = PyObject_IsTrue(item);
} else {
PyObject *good;
if (item == NULL)
return NULL;
- if (lz->func == Py_None) {
+ if (lz->func == Py_None || lz->func == (PyObject *)&PyBool_Type) {
ok = PyObject_IsTrue(item);
} else {
PyObject *good;
PyMem_FREE(ptr);
return -1;
}
- if(PyList_Append(*freelist, cobj)) {
+ if (PyList_Append(*freelist, cobj)) {
PyMem_FREE(ptr);
Py_DECREF(cobj);
return -1;
static int
cleanreturn(int retval, PyObject *freelist)
{
- if(freelist) {
- if((retval) == 0) {
+ if (freelist) {
+ if (retval == 0) {
Py_ssize_t len = PyList_GET_SIZE(freelist), i;
for (i = 0; i < len; i++)
PyMem_FREE(PyCObject_AsVoidPtr(
case 'L': {/* PY_LONG_LONG */
PY_LONG_LONG *p = va_arg( *p_va, PY_LONG_LONG * );
PY_LONG_LONG ival = PyLong_AsLongLong( arg );
- if( ival == (PY_LONG_LONG)-1 && PyErr_Occurred() ) {
+ if (ival == (PY_LONG_LONG)-1 && PyErr_Occurred() ) {
return converterr("long<L>", arg, msgbuf, bufsize);
} else {
*p = ival;
"(memory error)",
arg, msgbuf, bufsize);
}
- if(addcleanup(*buffer, freelist)) {
+ if (addcleanup(*buffer, freelist)) {
Py_DECREF(s);
return converterr(
"(cleanup problem)",
return converterr("(memory error)",
arg, msgbuf, bufsize);
}
- if(addcleanup(*buffer, freelist)) {
+ if (addcleanup(*buffer, freelist)) {
Py_DECREF(s);
return converterr("(cleanup problem)",
arg, msgbuf, bufsize);