--- /dev/null
+*** Settings ***
+Library ${RSPAMD_TESTDIR}/lib/rspamd.py
+Resource ${RSPAMD_TESTDIR}/lib/rspamd.robot
+Variables ${RSPAMD_TESTDIR}/lib/vars.py
+
+*** Variables ***
+${GTUBE} ${RSPAMD_TESTDIR}/messages/gtube.eml
+${SETTINGS_NOSYMBOLS} {symbols_enabled = []}
+
+*** Test Cases ***
+Controller SSL - stat
+ [Documentation] Fetch /stat over HTTPS from the controller SSL port
+ @{result} = HTTPS GET ${RSPAMD_LOCAL_ADDR} ${RSPAMD_PORT_CONTROLLER_SSL} /stat
+ Should Be Equal As Integers ${result}[0] 200
+
+Controller SSL - errors
+ [Documentation] Fetch /errors over HTTPS from the controller SSL port
+ @{result} = HTTPS GET ${RSPAMD_LOCAL_ADDR} ${RSPAMD_PORT_CONTROLLER_SSL} /errors
+ Should Be Equal As Integers ${result}[0] 200
+
+Controller plain still works alongside SSL
+ [Documentation] Plain HTTP controller port must still work when SSL port is also configured
+ @{result} = HTTP GET ${RSPAMD_LOCAL_ADDR} ${RSPAMD_PORT_CONTROLLER} /stat
+ Should Be Equal As Integers ${result}[0] 200
+
+Normal worker SSL - checkv2
+ [Documentation] Scan a message via /checkv2 over HTTPS on the normal worker SSL port
+ Scan File SSL ${GTUBE} Settings=${SETTINGS_NOSYMBOLS}
+ Expect Symbol GTUBE
+
+Normal worker SSL - checkv3
+ [Documentation] Scan a message via /checkv3 over HTTPS on the normal worker SSL port
+ Scan File V3 SSL ${GTUBE} Settings=${SETTINGS_NOSYMBOLS}
+ Expect Symbol GTUBE
+
+Normal worker plain still works alongside SSL
+ [Documentation] Plain HTTP normal port must still work when SSL port is also configured
+ Scan File ${GTUBE} Settings=${SETTINGS_NOSYMBOLS}
+ Expect Symbol GTUBE
import shutil
import signal
import socket
+import ssl
import stat
+import subprocess
import random
import re
import sys
return [s, t, h]
+def HTTPS(method, host, port, path, data=None, headers={}):
+ ctx = ssl.create_default_context()
+ ctx.check_hostname = False
+ ctx.verify_mode = ssl.CERT_NONE
+ c = http.client.HTTPSConnection("%s:%s" % (host, port), context=ctx)
+ c.request(method, path, data, headers)
+ r = c.getresponse()
+ t = r.read()
+ s = r.status
+ c.close()
+ return [s, t]
+
+
+def HTTPS_With_Headers(method, host, port, path, data=None, headers={}):
+ """HTTPS request that returns response headers.
+ Returns [status, body, headers_dict]
+ """
+ ctx = ssl.create_default_context()
+ ctx.check_hostname = False
+ ctx.verify_mode = ssl.CERT_NONE
+ c = http.client.HTTPSConnection("%s:%s" % (host, port), context=ctx)
+ c.request(method, path, data, headers)
+ r = c.getresponse()
+ t = r.read()
+ s = r.status
+ h = dict(r.getheaders())
+ c.close()
+ return [s, t, h]
+
+
+def generate_ssl_cert(tmpdir):
+ """Generate a self-signed EC certificate and key in tmpdir.
+ Returns (cert_path, key_path).
+ """
+ cert_path = os.path.join(tmpdir, "test-cert.pem")
+ key_path = os.path.join(tmpdir, "test-key.pem")
+ subprocess.check_call([
+ "openssl", "req", "-x509", "-newkey", "ec",
+ "-pkeyopt", "ec_paramgen_curve:prime256v1",
+ "-keyout", key_path, "-out", cert_path,
+ "-days", "1", "-nodes",
+ "-subj", "/CN=rspamd-test",
+ ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+ # Make readable by rspamd worker (runs as nobody)
+ os.chmod(cert_path, 0o644)
+ os.chmod(key_path, 0o644)
+ return cert_path, key_path
+
+
def hard_link(src, dst):
os.link(src, dst)
return status
+def Scan_File_SSL(filename, port=None, **headers):
+ """Like Scan_File but over HTTPS (TLS) to the normal worker SSL port."""
+ addr = BuiltIn().get_variable_value("${RSPAMD_LOCAL_ADDR}")
+ if port is None:
+ port = BuiltIn().get_variable_value("${RSPAMD_PORT_NORMAL_SSL}")
+ headers["Queue-Id"] = BuiltIn().get_variable_value("${TEST_NAME}")
+ ctx = ssl.create_default_context()
+ ctx.check_hostname = False
+ ctx.verify_mode = ssl.CERT_NONE
+ c = http.client.HTTPSConnection("%s:%s" % (addr, port), context=ctx)
+ c.request("POST", "/checkv2", open(filename, "rb"), headers)
+ r = c.getresponse()
+ assert r.status == 200, "Expected HTTP 200 but got %d" % r.status
+ d = json.JSONDecoder(strict=True).decode(r.read().decode('utf-8'))
+ c.close()
+ BuiltIn().set_test_variable("${SCAN_RESULT}", d)
+ return
+
+
+def Scan_File_V3_SSL(filename, port=None, metadata=None, **headers):
+ """Like Scan_File_V3 but over HTTPS (TLS)."""
+ addr = BuiltIn().get_variable_value("${RSPAMD_LOCAL_ADDR}")
+ if port is None:
+ port = BuiltIn().get_variable_value("${RSPAMD_PORT_NORMAL_SSL}")
+
+ meta = metadata if metadata else {}
+ meta_json = json.dumps(meta)
+ message_data = open(filename, "rb").read()
+
+ boundary = "----rspamd-test-%016x" % random.getrandbits(64)
+ body = _build_multipart(boundary, meta_json, message_data)
+
+ headers["Content-Type"] = "multipart/form-data; boundary=" + boundary
+ if "Queue-Id" not in headers:
+ headers["Queue-Id"] = BuiltIn().get_variable_value("${TEST_NAME}")
+
+ ctx = ssl.create_default_context()
+ ctx.check_hostname = False
+ ctx.verify_mode = ssl.CERT_NONE
+ c = http.client.HTTPSConnection("%s:%s" % (addr, port), context=ctx)
+ c.request("POST", "/checkv3", body, headers)
+ r = c.getresponse()
+ assert r.status == 200, "Expected HTTP 200 but got %d" % r.status
+
+ resp_body = r.read()
+ resp_ct = r.getheader("Content-Type", "")
+ result_data = _parse_multipart_response(resp_body, resp_ct)
+
+ d = json.JSONDecoder(strict=True).decode(result_data)
+ c.close()
+ BuiltIn().set_test_variable("${SCAN_RESULT}", d)
+ return
+
+
def Send_SIGUSR1(pid):
pid = int(pid)
os.kill(pid, signal.SIGUSR1)