while(++d_pos < d_end && d_string[d_pos]!='"') {
if(d_string[d_pos]=='\\' && d_pos+1!=d_end) {
val.append(1, d_string[d_pos++]);
+ char chr = d_string[d_pos];
+ if (chr >= '0' && chr <= '9') {
+ bool valid{false};
+ // Must be a three-digit character escape sequence
+ if (d_end - d_pos >= 3) {
+ char chr2 = d_string[d_pos + 1];
+ char chr3 = d_string[d_pos + 2];
+ if (chr2 >= '0' && chr2 <= '9' && chr3 >= '0' && chr3 <= '9') {
+ valid = true;
+ }
+ }
+ if (!valid) {
+ throw RecordTextException("Data field in DNS contains an invalid escape at position "+std::to_string(d_pos)+" of '"+d_string+"'");
+ }
+ }
+ // Not advancing d_pos, we'll append the next 1 or 3 characters as
+ // part of the regular case.
}
val.append(1, d_string[d_pos]);
}
data = self.get_zone(name)
self.assertIsNone(get_rrset(data, name, 'MX'))
+ def test_zone_rr_update_invalid_txt(self):
+ name, payload, zone = self.create_zone()
+ # do a replace (= update)
+ rrset = {
+ 'changetype': 'replace',
+ 'name': 'ill-formed-txt.' + name,
+ 'type': 'txt',
+ 'ttl': 3600,
+ 'records': [
+ {
+ "content": "\"TEST\\1\" \"TEST2\"",
+ "disabled": False
+ }
+ ]
+ }
+ payload = {'rrsets': [rrset]}
+ r = self.session.patch(
+ self.url("/api/v1/servers/localhost/zones/" + name),
+ data=json.dumps(payload),
+ headers={'content-type': 'application/json'})
+ self.assertEqual(r.status_code, 422)
+ self.assertIn('contains an invalid escape', r.json()['error'])
+
+ def test_zone_rr_update_with_escapes(self):
+ name, payload, zone = self.create_zone()
+ # do a replace (= update)
+ recname = 'well-formed-txt.' + name
+ content = "\"valid\\000record\""
+ rrset = {
+ 'changetype': 'replace',
+ 'name': recname,
+ 'type': 'txt',
+ 'ttl': 3600,
+ 'records': [
+ {
+ "content": content,
+ "disabled": False
+ }
+ ]
+ }
+ payload = {'rrsets': [rrset]}
+ r = self.session.patch(
+ self.url("/api/v1/servers/localhost/zones/" + name),
+ data=json.dumps(payload),
+ headers={'content-type': 'application/json'})
+ self.assert_success(r)
+ # verify that the new record has been correctly processed and the \000
+ # escape is unchanged
+ data = self.get_zone(name)
+ self.assertEqual(get_rrset(data, recname, 'TXT')['records'][0]['content'], content)
+
def test_zone_rr_update_opt(self):
name, payload, zone = self.create_zone()
# do a replace (= update)