# 3. This notice may not be removed or altered from any source distribution.
import contextlib
+import os
import sqlite3 as sqlite
import subprocess
import sys
import threading
import unittest
+import urllib.parse
-from test.support import (
- SHORT_TIMEOUT,
- bigmemtest,
- check_disallow_instantiation,
- threading_helper,
-)
+from test.support import SHORT_TIMEOUT, bigmemtest, check_disallow_instantiation
+from test.support import threading_helper
from _testcapi import INT_MAX, ULLONG_MAX
from os import SEEK_SET, SEEK_CUR, SEEK_END
-from test.support.os_helper import TESTFN, unlink, temp_dir
+from test.support.os_helper import TESTFN, TESTFN_UNDECODABLE, unlink, temp_dir, FakePath
# Helper for tests using TESTFN
def test_open_with_path_like_object(self):
""" Checks that we can successfully connect to a database using an object that
is PathLike, i.e. has __fspath__(). """
- class Path:
- def __fspath__(self):
- return TESTFN
- path = Path()
+ path = FakePath(TESTFN)
with managed_connect(path) as cx:
+ self.assertTrue(os.path.exists(path))
+ cx.execute(self._sql)
+
+ @unittest.skipIf(sys.platform == "win32", "skipped on Windows")
+ @unittest.skipIf(sys.platform == "darwin", "skipped on macOS")
+ @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths")
+ def test_open_with_undecodable_path(self):
+ self.addCleanup(unlink, TESTFN_UNDECODABLE)
+ path = TESTFN_UNDECODABLE
+ with managed_connect(path) as cx:
+ self.assertTrue(os.path.exists(path))
cx.execute(self._sql)
def test_open_uri(self):
--- /dev/null
+Fix function :func:`sqlite.connect` and the :class:`sqlite.Connection`
+constructor on non-UTF-8 locales. Also, they now support bytes paths
+non-decodable with the current FS encoding.
[clinic start generated code]*/
static int
-pysqlite_connection_init_impl(pysqlite_Connection *self,
- const char *database, double timeout,
- int detect_types, const char *isolation_level,
+pysqlite_connection_init_impl(pysqlite_Connection *self, PyObject *database,
+ double timeout, int detect_types,
+ const char *isolation_level,
int check_same_thread, PyObject *factory,
int cache_size, int uri);
PyObject * const *fastargs;
Py_ssize_t nargs = PyTuple_GET_SIZE(args);
Py_ssize_t noptargs = nargs + (kwargs ? PyDict_GET_SIZE(kwargs) : 0) - 1;
- const char *database = NULL;
+ PyObject *database;
double timeout = 5.0;
int detect_types = 0;
const char *isolation_level = "";
if (!fastargs) {
goto exit;
}
- if (!clinic_fsconverter(fastargs[0], &database)) {
- goto exit;
- }
+ database = fastargs[0];
if (!noptargs) {
goto skip_optional_pos;
}
return_value = pysqlite_connection_init_impl((pysqlite_Connection *)self, database, timeout, detect_types, isolation_level, check_same_thread, factory, cache_size, uri);
exit:
- /* Cleanup for database */
- PyMem_Free((void *)database);
-
return return_value;
}
#ifndef DESERIALIZE_METHODDEF
#define DESERIALIZE_METHODDEF
#endif /* !defined(DESERIALIZE_METHODDEF) */
-/*[clinic end generated code: output=d21767843c480a10 input=a9049054013a1b77]*/
+/*[clinic end generated code: output=fb8908674e9f25ff input=a9049054013a1b77]*/
return 1;
}
-static int
-clinic_fsconverter(PyObject *pathlike, const char **result)
-{
- PyObject *bytes = NULL;
- Py_ssize_t len;
- char *str;
-
- if (!PyUnicode_FSConverter(pathlike, &bytes)) {
- goto error;
- }
- if (PyBytes_AsStringAndSize(bytes, &str, &len) < 0) {
- goto error;
- }
- if ((*result = (const char *)PyMem_Malloc(len+1)) == NULL) {
- goto error;
- }
-
- memcpy((void *)(*result), str, len+1);
- Py_DECREF(bytes);
- return 1;
-
-error:
- Py_XDECREF(bytes);
- return 0;
-}
-
#define clinic_state() (pysqlite_get_state_by_type(Py_TYPE(self)))
#include "clinic/connection.c.h"
#undef clinic_state
}
/*[python input]
-class FSConverter_converter(CConverter):
- type = "const char *"
- converter = "clinic_fsconverter"
- def converter_init(self):
- self.c_default = "NULL"
- def cleanup(self):
- return f"PyMem_Free((void *){self.name});\n"
-
class IsolationLevel_converter(CConverter):
type = "const char *"
converter = "isolation_level_converter"
[python start generated code]*/
-/*[python end generated code: output=da39a3ee5e6b4b0d input=be142323885672ab]*/
+/*[python end generated code: output=da39a3ee5e6b4b0d input=cbcfe85b253061c2]*/
/*[clinic input]
_sqlite3.Connection.__init__ as pysqlite_connection_init
- database: FSConverter
+ database: object
timeout: double = 5.0
detect_types: int = 0
isolation_level: IsolationLevel = ""
[clinic start generated code]*/
static int
-pysqlite_connection_init_impl(pysqlite_Connection *self,
- const char *database, double timeout,
- int detect_types, const char *isolation_level,
+pysqlite_connection_init_impl(pysqlite_Connection *self, PyObject *database,
+ double timeout, int detect_types,
+ const char *isolation_level,
int check_same_thread, PyObject *factory,
int cache_size, int uri)
-/*[clinic end generated code: output=7d640ae1d83abfd4 input=342173993434ba1e]*/
+/*[clinic end generated code: output=839eb2fee4293bda input=b8ce63dc6f70a383]*/
{
- if (PySys_Audit("sqlite3.connect", "s", database) < 0) {
+ if (PySys_Audit("sqlite3.connect", "O", database) < 0) {
+ return -1;
+ }
+
+ PyObject *bytes;
+ if (!PyUnicode_FSConverter(database, &bytes)) {
return -1;
}
sqlite3 *db;
int rc;
Py_BEGIN_ALLOW_THREADS
- rc = sqlite3_open_v2(database, &db,
+ rc = sqlite3_open_v2(PyBytes_AS_STRING(bytes), &db,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE |
(uri ? SQLITE_OPEN_URI : 0), NULL);
if (rc == SQLITE_OK) {
}
Py_END_ALLOW_THREADS
+ Py_DECREF(bytes);
if (db == NULL && rc == SQLITE_NOMEM) {
PyErr_NoMemory();
return -1;