To map anonymous memory, -1 should be passed as the fileno along with the length.
-.. class:: mmap(fileno, length, tagname=None, access=ACCESS_DEFAULT, offset=0)
+.. class:: mmap(fileno, length, tagname=None, \
+ access=ACCESS_DEFAULT, offset=0, *, trackfd=True)
**(Windows version)** Maps *length* bytes from the file specified by the
- file handle *fileno*, and creates a mmap object. If *length* is larger
+ file descriptor *fileno*, and creates a mmap object. If *length* is larger
than the current size of the file, the file is extended to contain *length*
bytes. If *length* is ``0``, the maximum length of the map is the current
size of the file, except that if the file is empty Windows raises an
will be relative to the offset from the beginning of the file. *offset*
defaults to 0. *offset* must be a multiple of the :const:`ALLOCATIONGRANULARITY`.
+ If *trackfd* is ``False``, the file handle corresponding to *fileno* will
+ not be duplicated, and the resulting :class:`!mmap` object will not
+ be associated with the map's underlying file.
+ This means that the :meth:`~mmap.mmap.size` and :meth:`~mmap.mmap.resize`
+ methods will fail.
+ This mode is useful to limit the number of open file handles.
+ The original file can be renamed (but not deleted) after closing *fileno*.
+
+ .. versionchanged:: next
+ The *trackfd* parameter was added.
+
.. audit-event:: mmap.__new__ fileno,length,access,offset mmap.mmap
.. class:: mmap(fileno, length, flags=MAP_SHARED, prot=PROT_WRITE|PROT_READ, \
(Contributed by Bénédikt Tran in :gh:`135853`.)
+mmap
+----
+
+* :class:`mmap.mmap` now has a *trackfd* parameter on Windows;
+ if it is ``False``, the file handle corresponding to *fileno* will
+ not be duplicated.
+ (Contributed by Serhiy Storchaka in :gh:`78502`.)
+
+
os.path
-------
+from test import support
from test.support import (
requires, _2G, _4G, gc_collect, cpython_only, is_emscripten, is_apple,
in_systemd_nspawn_sync_suppressed,
self.assertRaises(TypeError, m.write_byte, 0)
m.close()
- @unittest.skipIf(os.name == 'nt', 'trackfd not present on Windows')
- def test_trackfd_parameter(self):
+ @support.subTests('close_original_fd', (True, False))
+ def test_trackfd_parameter(self, close_original_fd):
size = 64
with open(TESTFN, "wb") as f:
f.write(b"a"*size)
- for close_original_fd in True, False:
- with self.subTest(close_original_fd=close_original_fd):
- with open(TESTFN, "r+b") as f:
- with mmap.mmap(f.fileno(), size, trackfd=False) as m:
- if close_original_fd:
- f.close()
- self.assertEqual(len(m), size)
- with self.assertRaises(ValueError):
- m.size()
- with self.assertRaises(ValueError):
- m.resize(size * 2)
- with self.assertRaises(ValueError):
- m.resize(size // 2)
- self.assertEqual(m.closed, False)
-
- # Smoke-test other API
- m.write_byte(ord('X'))
- m[2] = ord('Y')
- m.flush()
- with open(TESTFN, "rb") as f:
- self.assertEqual(f.read(4), b'XaYa')
- self.assertEqual(m.tell(), 1)
- m.seek(0)
- self.assertEqual(m.tell(), 0)
- self.assertEqual(m.read_byte(), ord('X'))
-
- self.assertEqual(m.closed, True)
- self.assertEqual(os.stat(TESTFN).st_size, size)
-
- @unittest.skipIf(os.name == 'nt', 'trackfd not present on Windows')
+ with open(TESTFN, "r+b") as f:
+ with mmap.mmap(f.fileno(), size, trackfd=False) as m:
+ if close_original_fd:
+ f.close()
+ self.assertEqual(len(m), size)
+ with self.assertRaises(ValueError):
+ m.size()
+ with self.assertRaises(ValueError):
+ m.resize(size * 2)
+ with self.assertRaises(ValueError):
+ m.resize(size // 2)
+ self.assertIs(m.closed, False)
+
+ # Smoke-test other API
+ m.write_byte(ord('X'))
+ m[2] = ord('Y')
+ m.flush()
+ with open(TESTFN, "rb") as f:
+ self.assertEqual(f.read(4), b'XaYa')
+ self.assertEqual(m.tell(), 1)
+ m.seek(0)
+ self.assertEqual(m.tell(), 0)
+ self.assertEqual(m.read_byte(), ord('X'))
+
+ if os.name == 'nt' and not close_original_fd:
+ self.assertRaises(PermissionError, os.rename, TESTFN, TESTFN+'1')
+ else:
+ os.rename(TESTFN, TESTFN+'1')
+ os.rename(TESTFN+'1', TESTFN)
+
+ self.assertIs(m.closed, True)
+ self.assertEqual(os.stat(TESTFN).st_size, size)
+
def test_trackfd_neg1(self):
size = 64
with mmap.mmap(-1, size, trackfd=False) as m:
m[0] = ord('a')
assert m[0] == ord('a')
- @unittest.skipIf(os.name != 'nt', 'trackfd only fails on Windows')
- def test_no_trackfd_parameter_on_windows(self):
- # 'trackffd' is an invalid keyword argument for this function
- size = 64
- with self.assertRaises(TypeError):
- mmap.mmap(-1, size, trackfd=True)
- with self.assertRaises(TypeError):
- mmap.mmap(-1, size, trackfd=False)
-
def test_bad_file_desc(self):
# Try opening a bad file descriptor...
self.assertRaises(OSError, mmap.mmap, -2, 4096)
--- /dev/null
+:class:`mmap.mmap` now has a *trackfd* parameter on Windows; if it is
+``False``, the file handle corresponding to *fileno* will not be duplicated.
#ifdef UNIX
int fd;
- _Bool trackfd;
int flags;
#endif
PyObject *weakreflist;
access_mode access;
+ _Bool trackfd;
} mmap_object;
#define mmap_object_CAST(op) ((mmap_object *)(op))
"mmap can't resize with extant buffers exported.");
return 0;
}
-#ifdef UNIX
if (!self->trackfd) {
PyErr_SetString(PyExc_ValueError,
"mmap can't resize with trackfd=False.");
return 0;
}
-#endif
if ((self->access == ACCESS_WRITE) || (self->access == ACCESS_DEFAULT))
return 1;
PyErr_Format(PyExc_TypeError,
return PyLong_FromLong((long)low);
size = (((long long)high)<<32) + low;
return PyLong_FromLongLong(size);
- } else {
- return PyLong_FromSsize_t(self->size);
}
#endif /* MS_WINDOWS */
return PyLong_FromLong(status.st_size);
#endif
}
+#endif /* UNIX */
else if (self->trackfd) {
return PyLong_FromSsize_t(self->size);
}
"can't get size with trackfd=False");
return NULL;
}
-#endif /* UNIX */
}
/* This assumes that you want the entire file mapped,
new_mmap_object(PyTypeObject *type, PyObject *args, PyObject *kwdict);
PyDoc_STRVAR(mmap_doc,
-"Windows: mmap(fileno, length[, tagname[, access[, offset]]])\n\
+"Windows: mmap(fileno, length[, tagname[, access[, offset[, trackfd]]]])\n\
\n\
Maps length bytes from the file specified by the file handle fileno,\n\
and returns a mmap object. If length is larger than the current size\n\
PyObject *tagname = Py_None;
DWORD dwErr = 0;
int fileno;
- HANDLE fh = 0;
+ HANDLE fh = INVALID_HANDLE_VALUE;
int access = (access_mode)ACCESS_DEFAULT;
+ int trackfd = 1;
DWORD flProtect, dwDesiredAccess;
static char *keywords[] = { "fileno", "length",
"tagname",
- "access", "offset", NULL };
+ "access", "offset", "trackfd", NULL };
- if (!PyArg_ParseTupleAndKeywords(args, kwdict, "in|OiL", keywords,
+ if (!PyArg_ParseTupleAndKeywords(args, kwdict, "in|OiL$p", keywords,
&fileno, &map_size,
- &tagname, &access, &offset)) {
+ &tagname, &access, &offset, &trackfd)) {
return NULL;
}
m_obj->map_handle = NULL;
m_obj->tagname = NULL;
m_obj->offset = offset;
+ m_obj->trackfd = trackfd;
- if (fh) {
- /* It is necessary to duplicate the handle, so the
- Python code can close it on us */
- if (!DuplicateHandle(
- GetCurrentProcess(), /* source process handle */
- fh, /* handle to be duplicated */
- GetCurrentProcess(), /* target proc handle */
- (LPHANDLE)&m_obj->file_handle, /* result */
- 0, /* access - ignored due to options value */
- FALSE, /* inherited by child processes? */
- DUPLICATE_SAME_ACCESS)) { /* options */
- dwErr = GetLastError();
- Py_DECREF(m_obj);
- PyErr_SetFromWindowsErr(dwErr);
- return NULL;
+ if (fh != INVALID_HANDLE_VALUE) {
+ if (trackfd) {
+ /* It is necessary to duplicate the handle, so the
+ Python code can close it on us */
+ if (!DuplicateHandle(
+ GetCurrentProcess(), /* source process handle */
+ fh, /* handle to be duplicated */
+ GetCurrentProcess(), /* target proc handle */
+ &fh, /* result */
+ 0, /* access - ignored due to options value */
+ FALSE, /* inherited by child processes? */
+ DUPLICATE_SAME_ACCESS)) /* options */
+ {
+ dwErr = GetLastError();
+ Py_DECREF(m_obj);
+ PyErr_SetFromWindowsErr(dwErr);
+ return NULL;
+ }
+ m_obj->file_handle = fh;
}
if (!map_size) {
DWORD low,high;
/* low might just happen to have the value INVALID_FILE_SIZE;
so we need to check the last error also. */
if (low == INVALID_FILE_SIZE &&
- (dwErr = GetLastError()) != NO_ERROR) {
+ (dwErr = GetLastError()) != NO_ERROR)
+ {
Py_DECREF(m_obj);
return PyErr_SetFromWindowsErr(dwErr);
}
off_lo = (DWORD)(offset & 0xFFFFFFFF);
/* For files, it would be sufficient to pass 0 as size.
For anonymous maps, we have to pass the size explicitly. */
- m_obj->map_handle = CreateFileMappingW(m_obj->file_handle,
+ m_obj->map_handle = CreateFileMappingW(fh,
NULL,
flProtect,
size_hi,