m.move(0, 0, 1)
m.move(0, 0, 0)
-
def test_anonymous(self):
# anonymous mmap.mmap(-1, PAGE)
m = mmap.mmap(-1, PAGESIZE)
self.assertEqual(m1[:data_length], data)
self.assertEqual(m2[:data_length], data)
+ def test_mmap_closed_by_int_scenarios(self):
+ """
+ gh-103987: Test that mmap objects raise ValueError
+ for closed mmap files
+ """
+
+ class MmapClosedByIntContext:
+ def __init__(self, access) -> None:
+ self.access = access
+
+ def __enter__(self):
+ self.f = open(TESTFN, "w+b")
+ self.f.write(random.randbytes(100))
+ self.f.flush()
+
+ m = mmap.mmap(self.f.fileno(), 100, access=self.access)
+
+ class X:
+ def __index__(self):
+ m.close()
+ return 10
+
+ return (m, X)
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.f.close()
+
+ read_access_modes = [
+ mmap.ACCESS_READ,
+ mmap.ACCESS_WRITE,
+ mmap.ACCESS_COPY,
+ mmap.ACCESS_DEFAULT,
+ ]
+
+ write_access_modes = [
+ mmap.ACCESS_WRITE,
+ mmap.ACCESS_COPY,
+ mmap.ACCESS_DEFAULT,
+ ]
+
+ for access in read_access_modes:
+ with MmapClosedByIntContext(access) as (m, X):
+ with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
+ m[X()]
+
+ with MmapClosedByIntContext(access) as (m, X):
+ with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
+ m[X() : 20]
+
+ with MmapClosedByIntContext(access) as (m, X):
+ with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
+ m[X() : 20 : 2]
+
+ with MmapClosedByIntContext(access) as (m, X):
+ with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
+ m[20 : X() : -2]
+
+ with MmapClosedByIntContext(access) as (m, X):
+ with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
+ m.read(X())
+
+ with MmapClosedByIntContext(access) as (m, X):
+ with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
+ m.find(b"1", 1, X())
+
+ for access in write_access_modes:
+ with MmapClosedByIntContext(access) as (m, X):
+ with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
+ m[X() : 20] = b"1" * 10
+
+ with MmapClosedByIntContext(access) as (m, X):
+ with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
+ m[X() : 20 : 2] = b"1" * 5
+
+ with MmapClosedByIntContext(access) as (m, X):
+ with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
+ m[20 : X() : -2] = b"1" * 5
+
+ with MmapClosedByIntContext(access) as (m, X):
+ with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
+ m.move(1, 2, X())
+
+ with MmapClosedByIntContext(access) as (m, X):
+ with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
+ m.write_byte(X())
+
class LargeMmapTests(unittest.TestCase):
def setUp(self):
CHECK_VALID(NULL);
if (!PyArg_ParseTuple(args, "|O&:read", _Py_convert_optional_to_ssize_t, &num_bytes))
- return(NULL);
+ return NULL;
+ CHECK_VALID(NULL);
/* silently 'adjust' out-of-range requests */
remaining = (self->pos < self->size) ? self->size - self->pos : 0;
end = self->size;
Py_ssize_t res;
+ CHECK_VALID(NULL);
if (reverse) {
res = _PyBytes_ReverseFind(
self->data + start, end - start,
CHECK_VALID(NULL);
if (!PyArg_ParseTuple(args, "y*:write", &data))
- return(NULL);
+ return NULL;
if (!is_writable(self)) {
PyBuffer_Release(&data);
return NULL;
}
+ CHECK_VALID(NULL);
memcpy(&self->data[self->pos], data.buf, data.len);
self->pos += data.len;
PyBuffer_Release(&data);
if (!is_writable(self))
return NULL;
+ CHECK_VALID(NULL);
if (self->pos < self->size) {
self->data[self->pos++] = value;
Py_RETURN_NONE;
if (self->size - dest < cnt || self->size - src < cnt)
goto bounds;
+ CHECK_VALID(NULL);
memmove(&self->data[dest], &self->data[src], cnt);
Py_RETURN_NONE;
length = self->size - start;
}
+ CHECK_VALID(NULL);
if (madvise(self->data + start, length, option) != 0) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
"mmap index out of range");
return NULL;
}
+ CHECK_VALID(NULL);
return PyLong_FromLong(Py_CHARMASK(self->data[i]));
}
else if (PySlice_Check(item)) {
}
slicelen = PySlice_AdjustIndices(self->size, &start, &stop, step);
+ CHECK_VALID(NULL);
if (slicelen <= 0)
return PyBytes_FromStringAndSize("", 0);
else if (step == 1)
if (result_buf == NULL)
return PyErr_NoMemory();
+
for (cur = start, i = 0; i < slicelen;
cur += step, i++) {
result_buf[i] = self->data[cur];
"in range(0, 256)");
return -1;
}
+ CHECK_VALID(-1);
self->data[i] = (char) v;
return 0;
}
return -1;
}
+ CHECK_VALID(-1);
if (slicelen == 0) {
}
else if (step == 1) {