if not buf and self.eof and not self.rawq:
raise EOFError, 'telnet connection closed'
return buf
-
+
def read_sb_data(self):
"""Return any data available in the SB ... SE queue.
- Return '' if no SB ... SE available. Should only be called
- after seeing a SB or SE command. When a new SB command is
+ Return '' if no SB ... SE available. Should only be called
+ after seeing a SB or SE command. When a new SB command is
found, old unread SB data will be discarded. Don't block.
"""
if c in (DO, DONT, WILL, WONT):
self.iacseq += c
continue
-
+
self.iacseq = ''
if c == IAC:
buf[self.sb] = buf[self.sb] + c
self.iacseq = ''
opt = c
if cmd in (DO, DONT):
- self.msg('IAC %s %d',
+ self.msg('IAC %s %d',
cmd == DO and 'DO' or 'DONT', ord(opt))
if self.option_callback:
self.option_callback(self.sock, cmd, opt)
def setUp(self):
self.filename = tempfile.mktemp("bz2")
-
+
def tearDown(self):
if os.path.isfile(self.filename):
os.unlink(self.filename)
-
+
def createTempFile(self, crlf=0):
f = open(self.filename, "wb")
if crlf:
data = self.DATA
f.write(data)
f.close()
-
+
def testRead(self):
"Test BZ2File.read()"
self.createTempFile()
bz2f = BZ2File(self.filename)
self.assertEqual(bz2f.read(), self.TEXT)
bz2f.close()
-
+
def testReadChunk10(self):
"Test BZ2File.read() in chunks of 10 bytes"
self.createTempFile()
text += str
self.assertEqual(text, text)
bz2f.close()
-
+
def testRead100(self):
"Test BZ2File.read(100)"
self.createTempFile()
bz2f = BZ2File(self.filename)
self.assertEqual(bz2f.read(100), self.TEXT[:100])
bz2f.close()
-
+
def testReadLine(self):
"Test BZ2File.readline()"
self.createTempFile()
for line in sio.readlines():
self.assertEqual(bz2f.readline(), line)
bz2f.close()
-
+
def testReadLines(self):
"Test BZ2File.readlines()"
self.createTempFile()
sio = StringIO(self.TEXT)
self.assertEqual(bz2f.readlines(), sio.readlines())
bz2f.close()
-
+
def testIterator(self):
"Test iter(BZ2File)"
self.createTempFile()
sio = StringIO(self.TEXT)
self.assertEqual(list(iter(bz2f)), sio.readlines())
bz2f.close()
-
+
def testXReadLines(self):
"Test BZ2File.xreadlines()"
self.createTempFile()
sio = StringIO(self.TEXT)
self.assertEqual(list(bz2f.xreadlines()), sio.readlines())
bz2f.close()
-
+
def testUniversalNewlinesLF(self):
"Test BZ2File.read() with universal newlines (\\n)"
self.createTempFile()
self.assertEqual(bz2f.read(), self.TEXT)
self.assertEqual(bz2f.newlines, "\n")
bz2f.close()
-
+
def testUniversalNewlinesCRLF(self):
"Test BZ2File.read() with universal newlines (\\r\\n)"
self.createTempFile(crlf=1)
self.assertEqual(bz2f.read(), self.TEXT)
self.assertEqual(bz2f.newlines, "\r\n")
bz2f.close()
-
+
def testWrite(self):
"Test BZ2File.write()"
bz2f = BZ2File(self.filename, "w")
f = open(self.filename)
self.assertEqual(self.decompress(f.read()), self.TEXT)
f.close()
-
+
def testSeekForward(self):
"Test BZ2File.seek(150, 0)"
self.createTempFile()
bz2f = BZ2File(self.filename)
bz2f.seek(-150, 2)
self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:])
-
+
def testSeekPostEnd(self):
"Test BZ2File.seek(150000)"
self.createTempFile()
class FuncTest(BaseTest):
"Test module functions"
-
+
def testCompress(self):
"Test compress() function"
data = compress(self.TEXT)
"Test decompress() function"
text = decompress(self.DATA)
self.assertEqual(text, self.TEXT)
-
+
def testDecompressEmpty(self):
"Test decompress() function with empty string"
text = decompress("")