]> git.ipfire.org Git - thirdparty/strongswan.git/blame - src/libcharon/plugins/vici/python/vici/test/test_protocol.py
vici: Fix several PEP8 issues
[thirdparty/strongswan.git] / src / libcharon / plugins / vici / python / vici / test / test_protocol.py
CommitLineData
9b97029a
BS
1import pytest
2
c193b594 3from ..protocol import Packet, Message, FiniteStream
9b97029a
BS
4from ..exception import DeserializationException
5
6
c193b594
BS
7class TestPacket(object):
8 # test data definitions for outgoing packet types
9 cmd_request = b"\x00\x0c" b"command_type"
10 cmd_request_msg = b"\x00\x07" b"command" b"payload"
11 event_register = b"\x03\x0a" b"event_type"
12 event_unregister = b"\x04\x0a" b"event_type"
13
14 # test data definitions for incoming packet types
15 cmd_response = b"\x01" b"reply"
16 cmd_unknown = b"\x02"
17 event_confirm = b"\x05"
18 event_unknown = b"\x06"
19 event = b"\x07\x03" b"log" b"message"
20
21 def test_request(self):
22 assert Packet.request("command_type") == self.cmd_request
23 assert Packet.request("command", b"payload") == self.cmd_request_msg
24
25 def test_register_event(self):
26 assert Packet.register_event("event_type") == self.event_register
27
28 def test_unregister_event(self):
29 assert Packet.unregister_event("event_type") == self.event_unregister
30
31 def test_parse(self):
32 parsed_cmd_response = Packet.parse(self.cmd_response)
33 assert parsed_cmd_response.response_type == Packet.CMD_RESPONSE
34 assert parsed_cmd_response.payload.getvalue() == self.cmd_response
35
36 parsed_cmd_unknown = Packet.parse(self.cmd_unknown)
37 assert parsed_cmd_unknown.response_type == Packet.CMD_UNKNOWN
38 assert parsed_cmd_unknown.payload.getvalue() == self.cmd_unknown
39
40 parsed_event_confirm = Packet.parse(self.event_confirm)
41 assert parsed_event_confirm.response_type == Packet.EVENT_CONFIRM
42 assert parsed_event_confirm.payload.getvalue() == self.event_confirm
43
44 parsed_event_unknown = Packet.parse(self.event_unknown)
45 assert parsed_event_unknown.response_type == Packet.EVENT_UNKNOWN
46 assert parsed_event_unknown.payload.getvalue() == self.event_unknown
47
48 parsed_event = Packet.parse(self.event)
49 assert parsed_event.response_type == Packet.EVENT
50 assert parsed_event.payload.getvalue() == self.event
51
52
9b97029a
BS
53class TestMessage(object):
54 """Message (de)serialization test."""
55
56 # data definitions for test of de(serialization)
57 # serialized messages holding a section
58 ser_sec_unclosed = b"\x01\x08unclosed"
59 ser_sec_single = b"\x01\x07section\x02"
60 ser_sec_nested = b"\x01\x05outer\x01\x0asubsection\x02\x02"
61
62 # serialized messages holding a list
63 ser_list_invalid = b"\x04\x07invalid\x05\x00\x02e1\x02\x03sec\x06"
64 ser_list_0_item = b"\x04\x05empty\x06"
65 ser_list_1_item = b"\x04\x01l\x05\x00\x02e1\x06"
66 ser_list_2_item = b"\x04\x01l\x05\x00\x02e1\x05\x00\x02e2\x06"
67
68 # serialized messages with key value pairs
69 ser_kv_pair = b"\x03\x03key\x00\x05value"
70 ser_kv_zero = b"\x03\x0azerolength\x00\x00"
71
72 # deserialized messages holding a section
574621d8
TB
73 des_sec_single = {"section": {}}
74 des_sec_nested = {"outer": {"subsection": {}}}
9b97029a
BS
75
76 # deserialized messages holding a list
574621d8
TB
77 des_list_0_item = {"empty": []}
78 des_list_1_item = {"l": [b"e1"]}
79 des_list_2_item = {"l": [b"e1", b"e2"]}
9b97029a
BS
80
81 # deserialized messages with key value pairs
574621d8
TB
82 des_kv_pair = {"key": b"value"}
83 des_kv_zero = {"zerolength": b""}
9b97029a
BS
84
85 def test_section_serialization(self):
86 assert Message.serialize(self.des_sec_single) == self.ser_sec_single
87 assert Message.serialize(self.des_sec_nested) == self.ser_sec_nested
88
89 def test_list_serialization(self):
90 assert Message.serialize(self.des_list_0_item) == self.ser_list_0_item
91 assert Message.serialize(self.des_list_1_item) == self.ser_list_1_item
92 assert Message.serialize(self.des_list_2_item) == self.ser_list_2_item
93
94 def test_key_serialization(self):
95 assert Message.serialize(self.des_kv_pair) == self.ser_kv_pair
96 assert Message.serialize(self.des_kv_zero) == self.ser_kv_zero
97
98 def test_section_deserialization(self):
99 single = Message.deserialize(FiniteStream(self.ser_sec_single))
100 nested = Message.deserialize(FiniteStream(self.ser_sec_nested))
101
102 assert single == self.des_sec_single
103 assert nested == self.des_sec_nested
104
105 with pytest.raises(DeserializationException):
106 Message.deserialize(FiniteStream(self.ser_sec_unclosed))
107
108 def test_list_deserialization(self):
109 l0 = Message.deserialize(FiniteStream(self.ser_list_0_item))
110 l1 = Message.deserialize(FiniteStream(self.ser_list_1_item))
111 l2 = Message.deserialize(FiniteStream(self.ser_list_2_item))
112
113 assert l0 == self.des_list_0_item
114 assert l1 == self.des_list_1_item
115 assert l2 == self.des_list_2_item
116
117 with pytest.raises(DeserializationException):
118 Message.deserialize(FiniteStream(self.ser_list_invalid))
119
120 def test_key_deserialization(self):
121 pair = Message.deserialize(FiniteStream(self.ser_kv_pair))
122 zerolength = Message.deserialize(FiniteStream(self.ser_kv_zero))
123
124 assert pair == self.des_kv_pair
125 assert zerolength == self.des_kv_zero
126
127 def test_roundtrip(self):
128 message = {
129 "key1": "value1",
130 "section1": {
131 "sub-section": {
132 "key2": b"value2",
133 },
574621d8 134 "list1": ["item1", "item2"],
9b97029a
BS
135 },
136 }
137 serialized_message = FiniteStream(Message.serialize(message))
138 deserialized_message = Message.deserialize(serialized_message)
139
140 # ensure that list items and key values remain as undecoded bytes
141 deserialized_section = deserialized_message["section1"]
142 assert deserialized_message["key1"] == b"value1"
143 assert deserialized_section["sub-section"]["key2"] == b"value2"
574621d8 144 assert deserialized_section["list1"] == [b"item1", b"item2"]