--- /dev/null
+from ctypes import *
+import array
+import gc
+import unittest
+
+class X(Structure):
+ _fields_ = [("c_int", c_int)]
+ init_called = False
+ def __init__(self):
+ self._init_called = True
+
+class Test(unittest.TestCase):
+ def test_fom_buffer(self):
+ a = array.array("i", range(16))
+ x = (c_int * 16).from_buffer(a)
+
+ y = X.from_buffer(a)
+ self.assertEqual(y.c_int, a[0])
+ self.failIf(y.init_called)
+
+ self.assertEqual(x[:], a.tolist())
+
+ a[0], a[-1] = 200, -200
+ self.assertEqual(x[:], a.tolist())
+
+ self.assert_(a in x._objects.values())
+
+ self.assertRaises(ValueError,
+ c_int.from_buffer, a, -1)
+
+ expected = x[:]
+ del a; gc.collect(); gc.collect(); gc.collect()
+ self.assertEqual(x[:], expected)
+
+ self.assertRaises(TypeError,
+ (c_char * 16).from_buffer, "a" * 16)
+
+ def test_fom_buffer_with_offset(self):
+ a = array.array("i", range(16))
+ x = (c_int * 15).from_buffer(a, sizeof(c_int))
+
+ self.assertEqual(x[:], a.tolist()[1:])
+ self.assertRaises(ValueError, lambda: (c_int * 16).from_buffer(a, sizeof(c_int)))
+ self.assertRaises(ValueError, lambda: (c_int * 1).from_buffer(a, 16 * sizeof(c_int)))
+
+ def test_from_buffer_copy(self):
+ a = array.array("i", range(16))
+ x = (c_int * 16).from_buffer_copy(a)
+
+ y = X.from_buffer_copy(a)
+ self.assertEqual(y.c_int, a[0])
+ self.failIf(y.init_called)
+
+ self.assertEqual(x[:], range(16))
+
+ a[0], a[-1] = 200, -200
+ self.assertEqual(x[:], range(16))
+
+ self.assertEqual(x._objects, None)
+
+ self.assertRaises(ValueError,
+ c_int.from_buffer, a, -1)
+
+ del a; gc.collect(); gc.collect(); gc.collect()
+ self.assertEqual(x[:], range(16))
+
+ x = (c_char * 16).from_buffer_copy("a" * 16)
+ self.assertEqual(x[:], "a" * 16)
+
+ def test_fom_buffer_copy_with_offset(self):
+ a = array.array("i", range(16))
+ x = (c_int * 15).from_buffer_copy(a, sizeof(c_int))
+
+ self.assertEqual(x[:], a.tolist()[1:])
+ self.assertRaises(ValueError,
+ (c_int * 16).from_buffer_copy, a, sizeof(c_int))
+ self.assertRaises(ValueError,
+ (c_int * 1).from_buffer_copy, a, 16 * sizeof(c_int))
+
+if __name__ == '__main__':
+ unittest.main()
return CData_AtAddress(type, buf);
}
+static char from_buffer_doc[] =
+"C.from_buffer(object, offset=0) -> C instance\ncreate a C instance from a writeable buffer";
+
+static int
+KeepRef(CDataObject *target, Py_ssize_t index, PyObject *keep);
+
+static PyObject *
+CDataType_from_buffer(PyObject *type, PyObject *args)
+{
+ void *buffer;
+ Py_ssize_t buffer_len;
+ Py_ssize_t offset = 0;
+ PyObject *obj, *result;
+ StgDictObject *dict = PyType_stgdict(type);
+ assert (dict);
+
+ if (!PyArg_ParseTuple(args,
+#if (PY_VERSION_HEX < 0x02050000)
+ "O|i:from_buffer",
+#else
+ "O|n:from_buffer",
+#endif
+ &obj, &offset))
+ return NULL;
+
+ if (-1 == PyObject_AsWriteBuffer(obj, &buffer, &buffer_len))
+ return NULL;
+
+ if (offset < 0) {
+ PyErr_SetString(PyExc_ValueError,
+ "offset cannit be negative");
+ return NULL;
+ }
+ if (dict->size > buffer_len - offset) {
+ PyErr_Format(PyExc_ValueError,
+#if (PY_VERSION_HEX < 0x02050000)
+ "Buffer size too small (%d instead of at least %d bytes)",
+#else
+ "Buffer size too small (%zd instead of at least %zd bytes)",
+#endif
+ buffer_len, dict->size + offset);
+ return NULL;
+ }
+
+ result = CData_AtAddress(type, (char *)buffer + offset);
+ if (result == NULL)
+ return NULL;
+
+ Py_INCREF(obj);
+ if (-1 == KeepRef((CDataObject *)result, -1, obj)) {
+ Py_DECREF(result);
+ return NULL;
+ }
+ return result;
+}
+
+static char from_buffer_copy_doc[] =
+"C.from_buffer_copy(object, offset=0) -> C instance\ncreate a C instance from a readable buffer";
+
+static PyObject *
+GenericCData_new(PyTypeObject *type, PyObject *args, PyObject *kwds);
+
+static PyObject *
+CDataType_from_buffer_copy(PyObject *type, PyObject *args)
+{
+ void *buffer;
+ Py_ssize_t buffer_len;
+ Py_ssize_t offset = 0;
+ PyObject *obj, *result;
+ StgDictObject *dict = PyType_stgdict(type);
+ assert (dict);
+
+ if (!PyArg_ParseTuple(args,
+#if (PY_VERSION_HEX < 0x02050000)
+ "O|i:from_buffer",
+#else
+ "O|n:from_buffer",
+#endif
+ &obj, &offset))
+ return NULL;
+
+ if (-1 == PyObject_AsReadBuffer(obj, &buffer, &buffer_len))
+ return NULL;
+
+ if (offset < 0) {
+ PyErr_SetString(PyExc_ValueError,
+ "offset cannit be negative");
+ return NULL;
+ }
+
+ if (dict->size > buffer_len - offset) {
+ PyErr_Format(PyExc_ValueError,
+#if (PY_VERSION_HEX < 0x02050000)
+ "Buffer size too small (%d instead of at least %d bytes)",
+#else
+ "Buffer size too small (%zd instead of at least %zd bytes)",
+#endif
+ buffer_len, dict->size + offset);
+ return NULL;
+ }
+
+ result = GenericCData_new((PyTypeObject *)type, NULL, NULL);
+ if (result == NULL)
+ return NULL;
+ memcpy(((CDataObject *)result)->b_ptr,
+ (char *)buffer+offset, dict->size);
+ return result;
+}
+
static char in_dll_doc[] =
"C.in_dll(dll, name) -> C instance\naccess a C instance in a dll";
static PyMethodDef CDataType_methods[] = {
{ "from_param", CDataType_from_param, METH_O, from_param_doc },
{ "from_address", CDataType_from_address, METH_O, from_address_doc },
+ { "from_buffer", CDataType_from_buffer, METH_VARARGS, from_buffer_doc, },
+ { "from_buffer_copy", CDataType_from_buffer_copy, METH_VARARGS, from_buffer_copy_doc, },
{ "in_dll", CDataType_in_dll, METH_VARARGS, in_dll_doc },
{ NULL, NULL },
};
static PyMethodDef PointerType_methods[] = {
{ "from_address", CDataType_from_address, METH_O, from_address_doc },
+ { "from_buffer", CDataType_from_buffer, METH_VARARGS, from_buffer_doc, },
+ { "from_buffer_copy", CDataType_from_buffer_copy, METH_VARARGS, from_buffer_copy_doc, },
{ "in_dll", CDataType_in_dll, METH_VARARGS, in_dll_doc},
{ "from_param", (PyCFunction)PointerType_from_param, METH_O, from_param_doc},
{ "set_type", (PyCFunction)PointerType_set_type, METH_O },
static PyMethodDef SimpleType_methods[] = {
{ "from_param", SimpleType_from_param, METH_O, from_param_doc },
{ "from_address", CDataType_from_address, METH_O, from_address_doc },
+ { "from_buffer", CDataType_from_buffer, METH_VARARGS, from_buffer_doc, },
+ { "from_buffer_copy", CDataType_from_buffer_copy, METH_VARARGS, from_buffer_copy_doc, },
{ "in_dll", CDataType_in_dll, METH_VARARGS, in_dll_doc},
{ NULL, NULL },
};