# This software may be distributed under the terms of the BSD license.
# See README for more details.
+import hmac
import logging
logger = logging.getLogger()
import select
+import struct
import subprocess
import threading
import time
if req_e <= req_s:
raise Exception("Unexpected RADIUS server acct MIB value")
-def run_pyrad_server(srv, stop_event):
- srv.RunWithStop(stop_event)
+def run_pyrad_server(srv, t_events):
+ srv.RunWithStop(t_events)
def test_radius_protocol(dev, apdev):
"""RADIUS Authentication protocol tests with a fake server"""
logger.info("Received authentication request")
reply = self.CreateReplyPacket(pkt)
reply.code = pyrad.packet.AccessAccept
- # TODO: Add attributes. For now, this works as a test case for
- # missing Message-Authenticator.
+ if self.t_events['msg_auth'].is_set():
+ logger.info("Add Message-Authenticator")
+ if self.t_events['wrong_secret'].is_set():
+ logger.info("Use incorrect RADIUS shared secret")
+ pw = "incorrect"
+ else:
+ pw = reply.secret
+ hmac_obj = hmac.new(pw)
+ hmac_obj.update(struct.pack("B", reply.code))
+ hmac_obj.update(struct.pack("B", reply.id))
+
+ # reply attributes
+ reply.AddAttribute("Message-Authenticator",
+ "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")
+ attrs = reply._PktEncodeAttributes()
+
+ # Length
+ flen = 4 + 16 + len(attrs)
+ hmac_obj.update(struct.pack(">H", flen))
+ hmac_obj.update(pkt.authenticator)
+ hmac_obj.update(attrs)
+ if self.t_events['double_msg_auth'].is_set():
+ logger.info("Include two Message-Authenticator attributes")
+ else:
+ del reply[80]
+ reply.AddAttribute("Message-Authenticator", hmac_obj.digest())
self.SendReplyPacket(pkt.fd, reply)
- def RunWithStop(self, stop_event):
+ def RunWithStop(self, t_events):
self._poll = select.poll()
self._fdmap = {}
self._PrepareSockets()
+ self.t_events = t_events
- while not stop_event.is_set():
+ while not t_events['stop'].is_set():
for (fd, event) in self._poll.poll(1000):
if event == select.POLLIN:
try:
"radius",
"localhost")
srv.BindToAddress("")
- t_stop = threading.Event()
- t = threading.Thread(target=run_pyrad_server, args=(srv, t_stop))
+ t_events = {}
+ t_events['stop'] = threading.Event()
+ t_events['msg_auth'] = threading.Event()
+ t_events['wrong_secret'] = threading.Event()
+ t_events['double_msg_auth'] = threading.Event()
+ t = threading.Thread(target=run_pyrad_server, args=(srv, t_events))
t.start()
try:
if ev is None:
raise Exception("Timeout on EAP start")
time.sleep(1)
+ dev[0].request("REMOVE_NETWORK all")
+ time.sleep(0.1)
+ dev[0].dump_monitor()
+ t_events['msg_auth'].set()
+ t_events['wrong_secret'].set()
+ connect(dev[0], "radius-test", wait_connect=False)
+ time.sleep(1)
+ dev[0].request("REMOVE_NETWORK all")
+ time.sleep(0.1)
+ dev[0].dump_monitor()
+ t_events['wrong_secret'].clear()
+ connect(dev[0], "radius-test", wait_connect=False)
+ time.sleep(1)
+ dev[0].request("REMOVE_NETWORK all")
+ time.sleep(0.1)
+ dev[0].dump_monitor()
+ t_events['double_msg_auth'].set()
+ connect(dev[0], "radius-test", wait_connect=False)
+ time.sleep(1)
finally:
- t_stop.set()
+ t_events['stop'].set()
t.join()