--- /dev/null
+*** Settings ***
+Test Setup Metadata Exporter Structured Setup
+Test Teardown Metadata Exporter Structured Teardown
+Library Process
+Library ${RSPAMD_TESTDIR}/lib/rspamd.py
+Resource ${RSPAMD_TESTDIR}/lib/rspamd.robot
+Variables ${RSPAMD_TESTDIR}/lib/vars.py
+
+*** Variables ***
+${CONFIG} ${RSPAMD_TESTDIR}/configs/metadata_exporter_structured.conf
+${MESSAGE} ${RSPAMD_TESTDIR}/messages/spam_message.eml
+${ATTACHMENT_MESSAGE} ${RSPAMD_TESTDIR}/messages/zip.eml
+${RSPAMD_LUA_SCRIPT} ${RSPAMD_TESTDIR}/lua/metadata_exporter_structured.lua
+${RSPAMD_SCOPE} Suite
+${RSPAMD_URL_TLD} ${RSPAMD_TESTDIR}/../lua/unit/test_tld.dat
+${REDIS_SCOPE} Suite
+
+*** Test Cases ***
+Structured export to Redis stream - UUID v7 and metadata
+ [Documentation] Export structured metadata to Redis stream, decode msgpack and verify UUID v7 format
+ # Scan message - triggers default selector
+ Scan File ${MESSAGE}
+ ... Settings={symbols_enabled = []}
+
+ # Wait for async export to complete
+ Sleep 1s
+
+ # Read and decode msgpack from Redis stream
+ ${data} = Redis Stream Read Msgpack ${RSPAMD_REDIS_ADDR} ${RSPAMD_REDIS_PORT} test:structured
+ Log ${data}
+
+ # Validate required fields and UUID v7 format
+ Validate Structured Metadata ${data} uuid,ip,score,action
+
+Structured export with zstd compression
+ [Documentation] Export with zstd compression on content fields
+ Scan File ${MESSAGE}
+ ... Settings={symbols_enabled = []}
+
+ # Wait for async export
+ Sleep 1s
+
+ # Read from zstd stream
+ ${data} = Redis Stream Read Msgpack ${RSPAMD_REDIS_ADDR} ${RSPAMD_REDIS_PORT} test:structured_zstd
+ Log ${data}
+
+ # Validate required fields
+ Validate Structured Metadata ${data} uuid,ip,score
+
+ # Verify zstd compression markers are set
+ ${count} = Validate Zstd Compressed Fields ${data}
+ Log Compressed fields count: ${count}
+
+Attachment with detected MIME type
+ [Documentation] Scan message with attachment and verify content_type in export
+ Scan File ${ATTACHMENT_MESSAGE}
+ ... Settings={symbols_enabled = []}
+
+ # Wait for async export
+ Sleep 1s
+
+ # Read from stream
+ ${data} = Redis Stream Read Msgpack ${RSPAMD_REDIS_ADDR} ${RSPAMD_REDIS_PORT} test:structured
+ Log ${data}
+
+ # Validate required fields
+ Validate Structured Metadata ${data} uuid,ip,score
+
+ # Verify attachments have content_type
+ ${count} = Validate Attachments Have Content Type ${data}
+ Should Be True ${count} >= 1 msg=Expected at least 1 attachment with content_type
+
+*** Keywords ***
+Metadata Exporter Structured Setup
+ Run Redis
+ Rspamd Setup
+
+Metadata Exporter Structured Teardown
+ Rspamd Teardown
+ Redis Teardown
def file_exists(file):
return os.path.isfile(file)
+
+
+def redis_stream_read_msgpack(host, port, stream_key):
+ """Read latest entry from Redis stream and decode msgpack data.
+
+ Returns decoded dict with metadata fields.
+
+ Example:
+ | ${data} = | Redis Stream Read Msgpack | ${RSPAMD_REDIS_ADDR} | ${RSPAMD_REDIS_PORT} | test:structured |
+ """
+ try:
+ import redis
+ except ImportError:
+ raise Exception("redis module not installed - run: pip install redis")
+
+ try:
+ import msgpack
+ except ImportError:
+ raise Exception("msgpack module not installed - run: pip install msgpack")
+
+ r = redis.Redis(host=host, port=int(port), decode_responses=False)
+
+ # Read from stream
+ entries = r.xrange(stream_key, count=1)
+ if not entries:
+ raise Exception(f"No data in stream {stream_key}")
+
+ # Get the first entry's data field
+ entry_id, fields = entries[0]
+ if b'data' not in fields:
+ raise Exception(f"No data field in stream entry, keys: {list(fields.keys())}")
+
+ msgpack_data = fields[b'data']
+
+ # Decode msgpack with raw=True to preserve bytes, then convert what we can
+ decoded = msgpack.unpackb(msgpack_data, raw=True)
+
+ # Convert bytes keys to strings for easier access
+ def convert_keys(obj):
+ if isinstance(obj, dict):
+ return {k.decode('utf-8') if isinstance(k, bytes) else k: convert_keys(v) for k, v in obj.items()}
+ elif isinstance(obj, list):
+ return [convert_keys(item) for item in obj]
+ elif isinstance(obj, bytes):
+ # Try to decode as UTF-8, otherwise keep as bytes
+ try:
+ return obj.decode('utf-8')
+ except UnicodeDecodeError:
+ return obj
+ return obj
+
+ return convert_keys(decoded)
+
+
+def validate_structured_metadata(data, expected_fields=None):
+ """Validate structured metadata export format.
+
+ Checks that required fields exist and UUID v7 has correct format.
+
+ Example:
+ | Validate Structured Metadata | ${data} | uuid,ip,score,action |
+ """
+ import re
+
+ if expected_fields is None:
+ expected_fields = 'uuid,ip,score,action'
+
+ errors = []
+
+ for field in expected_fields.split(','):
+ field = field.strip()
+ if field not in data:
+ errors.append(f"Missing field: {field}")
+
+ # Validate UUID v7 format if present
+ if 'uuid' in data:
+ uuid = data['uuid']
+ # UUID v7: xxxxxxxx-xxxx-7xxx-xxxx-xxxxxxxxxxxx
+ if not re.match(r'^[0-9a-f]{8}-[0-9a-f]{4}-7[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$', str(uuid)):
+ errors.append(f"Invalid UUID v7 format: {uuid}")
+
+ if errors:
+ raise Exception("Validation errors: " + "; ".join(errors))
+
+ return True
+
+
+def validate_zstd_compressed_fields(data):
+ """Validate that zstd compression markers are set correctly.
+
+ Returns count of compressed fields found.
+
+ Example:
+ | ${count} = | Validate Zstd Compressed Fields | ${data} |
+ """
+ count = 0
+
+ # Check text_compressed flag
+ if data.get('text_compressed'):
+ count += 1
+
+ # Check attachments
+ for att in data.get('attachments', []):
+ if att.get('content_compressed'):
+ count += 1
+
+ # Check images
+ for img in data.get('images', []):
+ if img.get('content_compressed'):
+ count += 1
+
+ return count
+
+
+def validate_attachments_have_content_type(data):
+ """Validate that attachments have content_type field.
+
+ Returns count of attachments with content_type.
+
+ Example:
+ | ${count} = | Validate Attachments Have Content Type | ${data} |
+ """
+ count = 0
+ for att in data.get('attachments', []):
+ if 'content_type' in att and att['content_type']:
+ count += 1
+ return count