data = None
for filename in self.__filenames:
try:
- with open(filename, "r") as fp:
+ with open(filename, encoding='utf-8') as fp:
data = fp.read()
break
except OSError:
self.baseFilename = os.path.abspath(filename)
self.mode = mode
self.encoding = encoding
+ if "b" not in mode:
+ self.encoding = io.text_encoding(encoding)
self.errors = errors
self.delay = delay
# bpo-26789: FileHandler keeps a reference to the builtin open()
filename = kwargs.pop("filename", None)
mode = kwargs.pop("filemode", 'a')
if filename:
- if 'b'in mode:
+ if 'b' in mode:
errors = None
+ else:
+ encoding = io.text_encoding(encoding)
h = FileHandler(filename, mode,
encoding=encoding, errors=errors)
else:
To use, simply 'import logging.handlers' and log away!
"""
-import logging, socket, os, pickle, struct, time, re
+import io, logging, socket, os, pickle, struct, time, re
from stat import ST_DEV, ST_INO, ST_MTIME
import queue
import threading
# on each run.
if maxBytes > 0:
mode = 'a'
+ if "b" not in mode:
+ encoding = io.text_encoding(encoding)
BaseRotatingHandler.__init__(self, filename, mode, encoding=encoding,
delay=delay, errors=errors)
self.maxBytes = maxBytes
def __init__(self, filename, when='h', interval=1, backupCount=0,
encoding=None, delay=False, utc=False, atTime=None,
errors=None):
+ encoding = io.text_encoding(encoding)
BaseRotatingHandler.__init__(self, filename, 'a', encoding=encoding,
delay=delay, errors=errors)
self.when = when.upper()
"""
def __init__(self, filename, mode='a', encoding=None, delay=False,
errors=None):
+ if "b" not in mode:
+ encoding = io.text_encoding(encoding)
logging.FileHandler.__init__(self, filename, mode=mode,
encoding=encoding, delay=delay,
errors=errors)
fodderModule = None
def setUp(self):
- with open(inspect.getsourcefile(self.fodderModule)) as fp:
+ with open(inspect.getsourcefile(self.fodderModule), encoding="utf-8") as fp:
self.source = fp.read()
def sourcerange(self, top, bottom):
def setUp(self):
self.tempdir = TESTFN + '_dir'
os.mkdir(self.tempdir)
- with open(os.path.join(self.tempdir,
- 'inspect_fodder3%spy' % os.extsep), 'w') as f:
+ with open(os.path.join(self.tempdir, 'inspect_fodder3%spy' % os.extsep),
+ 'w', encoding='utf-8') as f:
f.write("class X:\n pass # No EOL")
with DirsOnSysPath(self.tempdir):
import inspect_fodder3 as mod3
def test_no_dict_no_slots_instance_member(self):
# returns descriptor
- with open(__file__) as handle:
+ with open(__file__, encoding='utf-8') as handle:
self.assertEqual(inspect.getattr_static(handle, 'name'), type(handle).name)
def test_inherited_slots(self):
def assertInspectEqual(self, path, source):
inspected_src = inspect.getsource(source)
- with open(path) as src:
+ with open(path, encoding='utf-8') as src:
self.assertEqual(
src.read().splitlines(True),
inspected_src.splitlines(True)
with _ready_to_import('reload_bug', self.src_before) as (name, path):
module = importlib.import_module(name)
self.assertInspectEqual(path, module)
- with open(path, 'w') as src:
+ with open(path, 'w', encoding='utf-8') as src:
src.write(self.src_after)
self.assertInspectEqual(path, module)
indented = script.replace('\n', '\n ')
wrapped = dedent(f"""
import contextlib
- with open({w}, 'w') as spipe:
+ with open({w}, 'w', encoding='utf-8') as spipe:
with contextlib.redirect_stdout(spipe):
{indented}
""")
- return wrapped, open(r)
+ return wrapped, open(r, encoding='utf-8')
def clean_up_interpreters():
def test_fork(self):
interp = interpreters.create()
import tempfile
- with tempfile.NamedTemporaryFile('w+') as file:
+ with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file:
file.write('')
file.flush()
try:
os.fork()
except RuntimeError:
- with open('{file.name}', 'w') as out:
+ with open('{file.name}', 'w', encoding='utf-8') as out:
out.write('{expected}')
""")
interp.run(script)
self.assertEqual(t.encoding, "utf-8")
self.assertEqual(t.line_buffering, True)
self.assertEqual("\xe9\n", t.readline())
- self.assertRaises(TypeError, t.__init__, b, newline=42)
- self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
+ self.assertRaises(TypeError, t.__init__, b, encoding="utf-8", newline=42)
+ self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
def test_uninitialized(self):
t = self.TextIOWrapper.__new__(self.TextIOWrapper)
r = self.BytesIO(b"\xc3\xa9\n\n")
b = self.BufferedReader(r, 1000)
t = self.TextIOWrapper(b, encoding="utf-8")
- self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
+ self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
self.assertRaises(ValueError, t.read)
t = self.TextIOWrapper.__new__(self.TextIOWrapper)
# Test a file
def test_iter_file(self):
- f = open(TESTFN, "w")
+ f = open(TESTFN, "w", encoding="utf-8")
try:
for i in range(5):
f.write("%d\n" % i)
finally:
f.close()
- f = open(TESTFN, "r")
+ f = open(TESTFN, "r", encoding="utf-8")
try:
self.check_for_loop(f, ["0\n", "1\n", "2\n", "3\n", "4\n"], pickle=False)
self.check_for_loop(f, [], pickle=False)
self.assertRaises(TypeError, list, list)
self.assertRaises(TypeError, list, 42)
- f = open(TESTFN, "w")
+ f = open(TESTFN, "w", encoding="utf-8")
try:
for i in range(5):
f.write("%d\n" % i)
finally:
f.close()
- f = open(TESTFN, "r")
+ f = open(TESTFN, "r", encoding="utf-8")
try:
self.assertEqual(list(f), ["0\n", "1\n", "2\n", "3\n", "4\n"])
f.seek(0, 0)
self.assertRaises(TypeError, tuple, list)
self.assertRaises(TypeError, tuple, 42)
- f = open(TESTFN, "w")
+ f = open(TESTFN, "w", encoding="utf-8")
try:
for i in range(5):
f.write("%d\n" % i)
finally:
f.close()
- f = open(TESTFN, "r")
+ f = open(TESTFN, "r", encoding="utf-8")
try:
self.assertEqual(tuple(f), ("0\n", "1\n", "2\n", "3\n", "4\n"))
f.seek(0, 0)
self.assertEqual(max(d.values()), 3)
self.assertEqual(min(iter(d.values())), 1)
- f = open(TESTFN, "w")
+ f = open(TESTFN, "w", encoding="utf-8")
try:
f.write("medium line\n")
f.write("xtra large line\n")
f.write("itty-bitty line\n")
finally:
f.close()
- f = open(TESTFN, "r")
+ f = open(TESTFN, "r", encoding="utf-8")
try:
self.assertEqual(min(f), "itty-bitty line\n")
f.seek(0, 0)
i < len(d) and dkeys[i] or None)
for i in range(3)]
- f = open(TESTFN, "w")
+ f = open(TESTFN, "w", encoding="utf-8")
try:
for i in range(10):
f.write("xy" * i + "\n") # line i has len 2*i+1
finally:
f.close()
- f = open(TESTFN, "r")
+ f = open(TESTFN, "r", encoding="utf-8")
try:
self.assertEqual(list(map(len, f)), list(range(1, 21, 2)))
finally:
self.i = i+1
return i
- f = open(TESTFN, "w")
+ f = open(TESTFN, "w", encoding="utf-8")
try:
f.write("a\n" "bbb\n" "cc\n")
finally:
f.close()
- f = open(TESTFN, "r")
+ f = open(TESTFN, "r", encoding="utf-8")
try:
self.assertEqual(list(zip(IntsFrom(0), f, IntsFrom(-100))),
[(0, "a\n", -100),
return "fooled you!"
return next(self.it)
- f = open(TESTFN, "w")
+ f = open(TESTFN, "w", encoding="utf-8")
try:
f.write("a\n" + "b\n" + "c\n")
finally:
f.close()
- f = open(TESTFN, "r")
+ f = open(TESTFN, "r", encoding="utf-8")
# Nasty: string.join(s) can't know whether unicode.join() is needed
# until it's seen all of s's elements. But in this case, f's
# iterator cannot be restarted. So what we're testing here is
self.assertIn((k, v), d.items())
self.assertNotIn((v, k), d.items())
- f = open(TESTFN, "w")
+ f = open(TESTFN, "w", encoding="utf-8")
try:
f.write("a\n" "b\n" "c\n")
finally:
f.close()
- f = open(TESTFN, "r")
+ f = open(TESTFN, "r", encoding="utf-8")
try:
for chunk in "abc":
f.seek(0, 0)
self.assertEqual(countOf(d.values(), 2j), 1)
self.assertEqual(countOf(d.values(), 1j), 0)
- f = open(TESTFN, "w")
+ f = open(TESTFN, "w", encoding="utf-8")
try:
f.write("a\n" "b\n" "c\n" "b\n")
finally:
f.close()
- f = open(TESTFN, "r")
+ f = open(TESTFN, "r", encoding="utf-8")
try:
for letter, count in ("a", 1), ("b", 2), ("c", 1), ("d", 0):
f.seek(0, 0)
self.assertRaises(TypeError, indexOf, indexOf, indexOf)
self.assertRaises(ZeroDivisionError, indexOf, BadIterableClass(), 1)
- f = open(TESTFN, "w")
+ f = open(TESTFN, "w", encoding="utf-8")
try:
f.write("a\n" "b\n" "c\n" "d\n" "e\n")
finally:
f.close()
- f = open(TESTFN, "r")
+ f = open(TESTFN, "r", encoding="utf-8")
try:
fiter = iter(f)
self.assertEqual(indexOf(fiter, "b\n"), 1)
# Test iterators with file.writelines().
def test_writelines(self):
- f = open(TESTFN, "w")
+ f = open(TESTFN, "w", encoding="utf-8")
try:
self.assertRaises(TypeError, f.writelines, None)
f.writelines(Whatever(6, 6+2000))
f.close()
- f = open(TESTFN)
+ f = open(TESTFN, encoding="utf-8")
expected = [str(i) + "\n" for i in range(1, 2006)]
self.assertEqual(list(f), expected)
a, b, c = {1: 42, 2: 42, 3: 42}.values()
self.assertEqual((a, b, c), (42, 42, 42))
- f = open(TESTFN, "w")
+ f = open(TESTFN, "w", encoding="utf-8")
lines = ("a\n", "bb\n", "ccc\n")
try:
for line in lines:
f.write(line)
finally:
f.close()
- f = open(TESTFN, "r")
+ f = open(TESTFN, "r", encoding="utf-8")
try:
a, b, c = f
self.assertEqual((a, b, c), lines)
outfile = os_helper.TESTFN + '.out'
rc, out, err = assert_python_ok('-m', 'json.tool', infile, outfile)
self.addCleanup(os.remove, outfile)
- with open(outfile, "r") as fp:
+ with open(outfile, "r", encoding="utf-8") as fp:
self.assertEqual(fp.read(), self.expect)
self.assertEqual(rc, 0)
self.assertEqual(out, b'')
# Check module loading
for entry in MODULES:
filename = os.path.join(MODULE_PATH, entry) + '.py'
- with open(filename) as file:
+ with open(filename, encoding='utf-8') as file:
for index, line in enumerate(file):
self.assertEqual(line, getline(filename, index + 1))
def test_no_ending_newline(self):
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
- with open(os_helper.TESTFN, "w") as fp:
+ with open(os_helper.TESTFN, "w", encoding='utf-8') as fp:
fp.write(SOURCE_3)
lines = linecache.getlines(os_helper.TESTFN)
self.assertEqual(lines, ["\n", "def f():\n", " return 3\n"])
# Create a source file and cache its contents
source_name = os_helper.TESTFN + '.py'
self.addCleanup(os_helper.unlink, source_name)
- with open(source_name, 'w') as source:
+ with open(source_name, 'w', encoding='utf-8') as source:
source.write(SOURCE_1)
getline(source_name, 1)
# Keep a copy of the old contents
source_list = []
- with open(source_name) as source:
+ with open(source_name, encoding='utf-8') as source:
for index, line in enumerate(source):
self.assertEqual(line, getline(source_name, index + 1))
source_list.append(line)
- with open(source_name, 'w') as source:
+ with open(source_name, 'w', encoding='utf-8') as source:
source.write(SOURCE_2)
# Try to update a bogus cache entry
# Update the cache and check whether it matches the new source file
linecache.checkcache(source_name)
- with open(source_name) as source:
+ with open(source_name, encoding='utf-8') as source:
for index, line in enumerate(source):
self.assertEqual(line, getline(source_name, index + 1))
source_list.append(line)
# bpo-34113. The crash happened at the command line console of
# debug Python builds with __ltrace__ enabled (only possible in console),
# when the interal Python stack was negatively adjusted
- with open(os_helper.TESTFN, 'w') as fd:
+ with open(os_helper.TESTFN, 'w', encoding='utf-8') as fd:
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
fd.write(textwrap.dedent("""\
import code
os.close(fd)
if not existing:
os.unlink(fn)
- h = logging.handlers.WatchedFileHandler(fn, delay=True)
+ h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=True)
if existing:
dev, ino = h.dev, h.ino
self.assertEqual(dev, -1)
if sys.platform in ('linux', 'darwin'):
cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),)
for cls, args in cases:
- h = cls(*args)
+ h = cls(*args, encoding="utf-8")
self.assertTrue(os.path.exists(fn))
h.close()
os.unlink(fn)
remover = threading.Thread(target=remove_loop, args=(fn, del_count))
remover.daemon = True
remover.start()
- h = logging.handlers.WatchedFileHandler(fn, delay=delay)
+ h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=delay)
f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
h.setFormatter(f)
try:
def __init__(self):
super().__init__()
self.sub_handler = logging.StreamHandler(
- stream=open('/dev/null', 'wt'))
+ stream=open('/dev/null', 'wt', encoding='utf-8'))
def emit(self, record):
self.sub_handler.acquire()
# basicConfig() opens the file, but logging.shutdown() closes
# it at Python exit. When A.__del__() is called,
# FileHandler._open() must be called again to re-open the file.
- logging.basicConfig(filename={filename!r})
+ logging.basicConfig(filename={filename!r}, encoding="utf-8")
a = A()
""")
assert_python_ok("-c", code)
- with open(filename) as fp:
+ with open(filename, encoding="utf-8") as fp:
self.assertEqual(fp.read().rstrip(), "ERROR:root:log in __del__")
def test_recursion_error(self):
h2.close()
os.remove(fn)
- logging.basicConfig(filename='test.log')
+ logging.basicConfig(filename='test.log', encoding='utf-8')
self.assertEqual(len(logging.root.handlers), 1)
handler = logging.root.handlers[0]
self.assertIsInstance(handler, logging.FileHandler)
- expected = logging.FileHandler('test.log', 'a')
+ expected = logging.FileHandler('test.log', 'a', encoding='utf-8')
self.assertEqual(handler.stream.mode, expected.stream.mode)
self.assertEqual(handler.stream.name, expected.stream.name)
self.addCleanup(cleanup, handler, expected, 'test.log')
class FileHandlerTest(BaseFileTest):
def test_delay(self):
os.unlink(self.fn)
- fh = logging.FileHandler(self.fn, delay=True)
+ fh = logging.FileHandler(self.fn, encoding='utf-8', delay=True)
self.assertIsNone(fh.stream)
self.assertFalse(os.path.exists(self.fn))
fh.handle(logging.makeLogRecord({}))
def test_should_not_rollover(self):
# If maxbytes is zero rollover never occurs
- rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0)
+ rh = logging.handlers.RotatingFileHandler(
+ self.fn, encoding="utf-8", maxBytes=0)
self.assertFalse(rh.shouldRollover(None))
rh.close()
def test_should_rollover(self):
- rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1)
+ rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8", maxBytes=1)
self.assertTrue(rh.shouldRollover(self.next_rec()))
rh.close()
def test_file_created(self):
# checks that the file is created and assumes it was created
# by us
- rh = logging.handlers.RotatingFileHandler(self.fn)
+ rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8")
rh.emit(self.next_rec())
self.assertLogFile(self.fn)
rh.close()
def namer(name):
return name + ".test"
rh = logging.handlers.RotatingFileHandler(
- self.fn, backupCount=2, maxBytes=1)
+ self.fn, encoding="utf-8", backupCount=2, maxBytes=1)
rh.namer = namer
rh.emit(self.next_rec())
self.assertLogFile(self.fn)
os.rename(source, dest + ".rotated")
rh = HandlerWithNamerAndRotator(
- self.fn, backupCount=2, maxBytes=1)
+ self.fn, encoding="utf-8", backupCount=2, maxBytes=1)
self.assertEqual(rh.namer(self.fn), self.fn + ".test")
rh.emit(self.next_rec())
self.assertLogFile(self.fn)
os.remove(source)
rh = logging.handlers.RotatingFileHandler(
- self.fn, backupCount=2, maxBytes=1)
+ self.fn, encoding="utf-8", backupCount=2, maxBytes=1)
rh.rotator = rotator
rh.namer = namer
m1 = self.next_rec()