]>
git.ipfire.org Git - people/jschlag/pbs.git/blob - src/buildservice/keys.py
12 from .decorators
import *
16 data
= io
.BytesIO(data
)
18 tmpdir
= tempfile
.mkdtemp()
19 os
.environ
["GNUPGHOME"] = tmpdir
23 res
= ctx
.import_(data
)
25 assert len(res
.imports
) == 1
26 (fpr
, trash_a
, trash_b
) = res
.imports
[0]
28 key
= ctx
.get_key(fpr
)
35 del os
.environ
["GNUPGHOME"]
38 class Keys(base
.Object
):
39 def create(self
, data
):
40 fingerprint
, key
= read_key(data
)
42 # Search for duplicates and just update them.
43 k
= pakfire
.keys
.get_by_fpr(fingerprint
)
48 # Insert new into the database.
49 res
= self
.db
.get("INSERT INTO keys(fingerprint, uids, data) \
50 VALUES(%s, %s, %s) RETURNING *", fingerprint
, ", ".join([u
.uid
for u
in key
.uids
]), data
)
52 key
= Key(self
.backend
, res
.id, data
=res
)
58 query
= self
.db
.query("SELECT id FROM keys ORDER BY uids")
62 key
= Key(self
.pakfire
, key
.id)
67 def get_by_id(self
, id):
68 key
= self
.db
.get("SELECT id FROM keys WHERE id = %s", id)
72 return Key(self
.pakfire
, key
.id)
74 def get_by_fpr(self
, fpr
):
77 key
= self
.db
.get("SELECT id FROM keys WHERE fingerprint LIKE %s", fpr
)
81 return Key(self
.pakfire
, key
.id)
84 class Key(base
.DataObject
):
87 def update(self
, data
):
88 fingerprint
, key
= read_key(data
)
90 # First, delete all subkeys.
91 self
.db
.execute("DELETE FROM keys_subkeys WHERE key_id = %s", self
.id)
93 for subkey
in key
.subkeys
:
94 time_created
= datetime
.datetime
.fromtimestamp(subkey
.timestamp
)
96 time_expires
= datetime
.datetime
.fromtimestamp(subkey
.expires
)
98 time_expires
= None # Key does never expire.
101 if subkey
.pubkey_algo
== gpgme
.PK_RSA
:
102 algo
= "RSA/%s" % subkey
.length
104 self
.db
.execute("INSERT INTO keys_subkeys(key_id, fingerprint, \
105 time_created, time_expires, algo) VALUES(%s, %s, %s, %s, %s)",
106 self
.id, subkey
.keyid
, time_created
, time_expires
, algo
)
108 self
.db
.execute("UPDATE keys SET fingerprint = %s, uids = %s, data = %s WHERE id = %s",
109 fingerprint
, ", ".join([u
.uid
for u
in key
.uids
]), data
, self
.id)
111 def can_be_deleted(self
):
112 ret
= self
.db
.query("SELECT id FROM repositories WHERE key_id = %s", self
.id)
120 assert self
.can_be_deleted()
122 self
.db
.execute("DELETE FROM keys_subkeys WHERE key_id = %s", self
.id)
123 self
.db
.execute("DELETE FROM keys WHERE id = %s", self
.id)
126 def fingerprint(self
):
127 return self
.data
.fingerprint
[-16:]
131 return self
.data
.uids
.split(", ")
135 return self
.data
.data
139 res
= self
.db
.query("SELECT * FROM keys_subkeys WHERE key_id = %s ORDER BY time_created", self
.id)
143 subkey
= Subkey(self
.backend
, row
.id, data
=row
)
144 subkeys
.append(subkey
)
146 return sorted(subkeys
)
149 class Subkey(base
.DataObject
):
150 table
= "keys_subkeys"
152 def __lt__(self
, other
):
153 if isinstance(other
, self
.__class
__):
154 return self
.time_created
< other
.time_created
157 def fingerprint(self
):
158 return self
.data
.fingerprint
161 def time_created(self
):
162 return self
.data
.time_created
165 def time_expires(self
):
166 return self
.data
.time_expires
170 return self
.time_expires
<= datetime
.datetime
.utcnow()
174 return self
.data
.algo