D415,FURB129,N818,PERF401,PERF403,PIE790,PIE808,PLW0127,Q004,RUF010,SIM101,\
SIM117,SIM118,TRY400,TRY401,RET503,RET504,UP004,B018,B904,RSE102,RET505,I001 \
"$@"
+# Checks that are in preview only, since --preview otherwise turns them all on
+ruff check --preview --select=E301,E302,E303,E304,E305,E306,E502 "$@"
except ImportError: # Python 3
import socketserver
+
log = logging.getLogger(__name__)
HOST = "localhost"
CombinationRepetitionUtil(chosen, arr, badarr, index,
r, start + 1, end)
+
# The main function that prints all
# combinations of size r in arr[] of
# size n. This function mainly uses
# temporary array 'chosen[]'
CombinationRepetitionUtil(chosen, arr, badarr, 0, r, 0, n)
+
# Driver code
badarr = [ '--ech grease', '--ech false', '--ech ecl:$badecl', '--ech pn:$badpn' ]
goodarr = [ '--ech hard', '--ech true', '--ech ecl:$goodecl', '--ech pn:$goodpn' ]
# include this fixture as test parameter if the test configures httpd itself
yield True
+
@pytest.fixture(scope='session')
def configures_nghttpx(env, httpd) -> Generator[bool, None, None]:
# include this fixture as test parameter if the test configures nghttpx itself
yield True
+
@pytest.fixture(autouse=True, scope='function')
def server_reset(request, env, httpd, nghttpx):
# make sure httpd is in default configuration when a test starts
for x in args.request_parallels:
request_parallels.extend([int(s) for s in x.split(',')])
-
if args.downloads or args.uploads or args.requests or args.handshakes:
handshakes = args.handshakes
if not args.downloads:
else:
import SocketServer as socketserver
+
log = logging.getLogger(__name__)
HOST = "localhost"
IDENT = "NTEL"
VERIFIED_REQ = "verifiedserver"
VERIFIED_RSP = "WE ROOLZ: {pid}"
+
def telnetserver(options):
"""Start up a TCP server with a telnet handler and serve DICT requests forever."""
if options.pidfile:
# leaving `with` calls server.close() automatically
return ScriptRC.SUCCESS
+
class NegotiatingTelnetHandler(socketserver.BaseRequestHandler):
"""Handler class for Telnet connections."""
except IOError:
log.exception("IOError hit during request")
+
class Negotiator:
NO_NEG = 0
START_NEG = 1
log.debug("Sending WONT %s", option_str)
self.send_iac([NegTokens.WONT, NegOptions.to_val(option_str)])
+
class NegBase:
@classmethod
def to_val(cls, name):
return "<unknown>"
+
class NegTokens(NegBase):
# The start of a negotiation sequence
IAC = 255
# The end of sub-negotiation options.
SE = 240
+
class NegOptions(NegBase):
# Binary Transmission
BINARY = 0
# Charset option
CHARSET = 42
+
def get_options():
parser = argparse.ArgumentParser()
return parser.parse_args()
+
def setup_logging(options):
"""Set up logging from the command line options."""
root_logger = logging.getLogger()
stdout_handler.setLevel(logging.DEBUG)
root_logger.addHandler(stdout_handler)
+
class ScriptRC:
"""Enum for script return codes."""
FAILURE = 1
EXCEPTION = 2
+
if __name__ == '__main__':
# Get the options from the user.
options = get_options()
'Warning: Python package impacket is required for smb testing; '
'use pip or your package manager to install it\n')
sys.exit(1)
+
from impacket import smb as imp_smb
from impacket import smbserver as imp_smbserver
from impacket.nt_errors import STATUS_ACCESS_DENIED, STATUS_NO_SUCH_FILE, STATUS_SUCCESS
VERIFIED_REQ = "verifiedserver"
VERIFIED_RSP = "WE ROOLZ: {pid}\n"
+
class ShutdownHandler(threading.Thread):
"""
Cleanly shut down the SMB server.
REPLY_DATA = re.compile("<reply>[ \t\n\r]*<data[^<]*>(.*?)</data>", re.MULTILINE | re.DOTALL)
+
class ClosingFileHandler(logging.StreamHandler):
def __init__(self, filename):
super(ClosingFileHandler, self).__init__()
self.release()
return result
+
class TestData:
def __init__(self, data_folder):
self.data_folder = data_folder
# Left-strip the data so we do not get a newline before our data.
return m.group(1).lstrip()
+
if __name__ == '__main__':
td = TestData("./data")
data = td.get_test_data(1451)