#!/usr/bin/env python
-import unittest
import dns
import clientsubnetoption
from dnsdisttests import DNSDistTest
'127.0.0.1')
response.answer.append(rrset)
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
def testQuestionMatchTagAndValue(self):
"""
'1.2.3.50')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
def testQuestionMatchTagOnly(self):
"""
'1.2.3.100')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
def testResponseNoMatch(self):
"""
'192.0.2.1')
response.answer.append(rrset)
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
def testResponseMatchTagAndValue(self):
"""
# we will set TC if the tag matches
expectedResponse.flags |= dns.flags.TC
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- print(expectedResponse)
- print(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(expectedResponse, receivedResponse)
def testResponseMatchResponseTagMatches(self):
"""
# we will set QR=0 if the tag matches
expectedResponse.flags &= ~dns.flags.QR
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(expectedResponse, receivedResponse)