# Copyright (C) 2020-2021 The Psycopg Team
import codecs
-from typing import Dict, Union, TYPE_CHECKING
+from typing import Dict, TYPE_CHECKING
from .errors import NotSupportedError
"WIN874": "cp874",
}
-py_codecs: Dict[Union[bytes, str], str] = {}
+py_codecs: Dict[bytes, str] = {}
py_codecs.update((k.encode(), v) for k, v in _py_codecs.items())
# Add an alias without underscore, for lenient lookups
from .abc import PQGen, PQGenConn
from .pq.abc import PGconn, PGresult
from .waiting import Wait, Ready
-from ._encodings import py_codecs, conninfo_encoding
+from ._encodings import pgconn_encoding, conninfo_encoding
logger = logging.getLogger(__name__)
raise e.ProgrammingError("you cannot mix COPY with other operations")
result = results[0]
if result.status != ExecStatus.COMMAND_OK:
- encoding = py_codecs.get(
- pgconn.parameter_status(b"client_encoding") or "", "utf-8"
- )
+ encoding = pgconn_encoding(pgconn)
raise e.error_from_result(result, encoding=encoding)
return result
# Retrieve the final result of copy
(result,) = yield from fetch_many(pgconn)
if result.status != ExecStatus.COMMAND_OK:
- encoding = py_codecs.get(
- pgconn.parameter_status(b"client_encoding") or "", "utf-8"
- )
+ encoding = pgconn_encoding(pgconn)
raise e.error_from_result(result, encoding=encoding)
return result
from .abc import PGconn, PGresult
from ._enums import ConnStatus, TransactionStatus
-from .._encodings import py_codecs
+from .._encodings import pgconn_encoding
class PGnotify(NamedTuple):
# obj is a PGconn
obj = cast(PGconn, obj)
if obj.status == ConnStatus.OK:
- encoding = py_codecs.get(
- obj.parameter_status(b"client_encoding") or "", "utf-8"
- )
+ encoding = pgconn_encoding(obj)
bmsg = obj.error_message
# strip severity and whitespaces