]>
Commit | Line | Data |
---|---|---|
1ae2b8cd HWN |
1 | /* |
2 | Copyright 2020 Google LLC | |
3 | ||
4 | Use of this source code is governed by a BSD-style | |
5 | license that can be found in the LICENSE file or at | |
6 | https://developers.google.com/open-source/licenses/bsd | |
7 | */ | |
8 | ||
9 | #include "merged.h" | |
10 | ||
11 | #include "constants.h" | |
12 | #include "iter.h" | |
13 | #include "pq.h" | |
14 | #include "reader.h" | |
15 | #include "record.h" | |
16 | #include "generic.h" | |
17 | #include "reftable-merged.h" | |
18 | #include "reftable-error.h" | |
19 | #include "system.h" | |
20 | ||
21 | static int merged_iter_init(struct merged_iter *mi) | |
22 | { | |
23 | int i = 0; | |
24 | for (i = 0; i < mi->stack_len; i++) { | |
25 | struct reftable_record rec = reftable_new_record(mi->typ); | |
26 | int err = iterator_next(&mi->stack[i], &rec); | |
27 | if (err < 0) { | |
28 | return err; | |
29 | } | |
30 | ||
31 | if (err > 0) { | |
32 | reftable_iterator_destroy(&mi->stack[i]); | |
33 | reftable_record_destroy(&rec); | |
34 | } else { | |
35 | struct pq_entry e = { | |
36 | .rec = rec, | |
37 | .index = i, | |
38 | }; | |
39 | merged_iter_pqueue_add(&mi->pq, e); | |
40 | } | |
41 | } | |
42 | ||
43 | return 0; | |
44 | } | |
45 | ||
46 | static void merged_iter_close(void *p) | |
47 | { | |
48 | struct merged_iter *mi = p; | |
49 | int i = 0; | |
50 | merged_iter_pqueue_release(&mi->pq); | |
51 | for (i = 0; i < mi->stack_len; i++) { | |
52 | reftable_iterator_destroy(&mi->stack[i]); | |
53 | } | |
54 | reftable_free(mi->stack); | |
55 | } | |
56 | ||
57 | static int merged_iter_advance_nonnull_subiter(struct merged_iter *mi, | |
58 | size_t idx) | |
59 | { | |
60 | struct reftable_record rec = reftable_new_record(mi->typ); | |
61 | struct pq_entry e = { | |
62 | .rec = rec, | |
63 | .index = idx, | |
64 | }; | |
65 | int err = iterator_next(&mi->stack[idx], &rec); | |
66 | if (err < 0) | |
67 | return err; | |
68 | ||
69 | if (err > 0) { | |
70 | reftable_iterator_destroy(&mi->stack[idx]); | |
71 | reftable_record_destroy(&rec); | |
72 | return 0; | |
73 | } | |
74 | ||
75 | merged_iter_pqueue_add(&mi->pq, e); | |
76 | return 0; | |
77 | } | |
78 | ||
79 | static int merged_iter_advance_subiter(struct merged_iter *mi, size_t idx) | |
80 | { | |
81 | if (iterator_is_null(&mi->stack[idx])) | |
82 | return 0; | |
83 | return merged_iter_advance_nonnull_subiter(mi, idx); | |
84 | } | |
85 | ||
86 | static int merged_iter_next_entry(struct merged_iter *mi, | |
87 | struct reftable_record *rec) | |
88 | { | |
89 | struct strbuf entry_key = STRBUF_INIT; | |
90 | struct pq_entry entry = { 0 }; | |
91 | int err = 0; | |
92 | ||
93 | if (merged_iter_pqueue_is_empty(mi->pq)) | |
94 | return 1; | |
95 | ||
96 | entry = merged_iter_pqueue_remove(&mi->pq); | |
97 | err = merged_iter_advance_subiter(mi, entry.index); | |
98 | if (err < 0) | |
99 | return err; | |
100 | ||
101 | /* | |
102 | One can also use reftable as datacenter-local storage, where the ref | |
103 | database is maintained in globally consistent database (eg. | |
104 | CockroachDB or Spanner). In this scenario, replication delays together | |
105 | with compaction may cause newer tables to contain older entries. In | |
106 | such a deployment, the loop below must be changed to collect all | |
107 | entries for the same key, and return new the newest one. | |
108 | */ | |
109 | reftable_record_key(&entry.rec, &entry_key); | |
110 | while (!merged_iter_pqueue_is_empty(mi->pq)) { | |
111 | struct pq_entry top = merged_iter_pqueue_top(mi->pq); | |
112 | struct strbuf k = STRBUF_INIT; | |
113 | int err = 0, cmp = 0; | |
114 | ||
115 | reftable_record_key(&top.rec, &k); | |
116 | ||
117 | cmp = strbuf_cmp(&k, &entry_key); | |
118 | strbuf_release(&k); | |
119 | ||
120 | if (cmp > 0) { | |
121 | break; | |
122 | } | |
123 | ||
124 | merged_iter_pqueue_remove(&mi->pq); | |
125 | err = merged_iter_advance_subiter(mi, top.index); | |
126 | if (err < 0) { | |
127 | return err; | |
128 | } | |
129 | reftable_record_destroy(&top.rec); | |
130 | } | |
131 | ||
132 | reftable_record_copy_from(rec, &entry.rec, hash_size(mi->hash_id)); | |
133 | reftable_record_destroy(&entry.rec); | |
134 | strbuf_release(&entry_key); | |
135 | return 0; | |
136 | } | |
137 | ||
138 | static int merged_iter_next(struct merged_iter *mi, struct reftable_record *rec) | |
139 | { | |
140 | while (1) { | |
141 | int err = merged_iter_next_entry(mi, rec); | |
142 | if (err == 0 && mi->suppress_deletions && | |
143 | reftable_record_is_deletion(rec)) { | |
144 | continue; | |
145 | } | |
146 | ||
147 | return err; | |
148 | } | |
149 | } | |
150 | ||
151 | static int merged_iter_next_void(void *p, struct reftable_record *rec) | |
152 | { | |
153 | struct merged_iter *mi = p; | |
154 | if (merged_iter_pqueue_is_empty(mi->pq)) | |
155 | return 1; | |
156 | ||
157 | return merged_iter_next(mi, rec); | |
158 | } | |
159 | ||
160 | static struct reftable_iterator_vtable merged_iter_vtable = { | |
161 | .next = &merged_iter_next_void, | |
162 | .close = &merged_iter_close, | |
163 | }; | |
164 | ||
165 | static void iterator_from_merged_iter(struct reftable_iterator *it, | |
166 | struct merged_iter *mi) | |
167 | { | |
168 | assert(!it->ops); | |
169 | it->iter_arg = mi; | |
170 | it->ops = &merged_iter_vtable; | |
171 | } | |
172 | ||
173 | int reftable_new_merged_table(struct reftable_merged_table **dest, | |
174 | struct reftable_table *stack, int n, | |
175 | uint32_t hash_id) | |
176 | { | |
177 | struct reftable_merged_table *m = NULL; | |
178 | uint64_t last_max = 0; | |
179 | uint64_t first_min = 0; | |
180 | int i = 0; | |
181 | for (i = 0; i < n; i++) { | |
182 | uint64_t min = reftable_table_min_update_index(&stack[i]); | |
183 | uint64_t max = reftable_table_max_update_index(&stack[i]); | |
184 | ||
185 | if (reftable_table_hash_id(&stack[i]) != hash_id) { | |
186 | return REFTABLE_FORMAT_ERROR; | |
187 | } | |
188 | if (i == 0 || min < first_min) { | |
189 | first_min = min; | |
190 | } | |
191 | if (i == 0 || max > last_max) { | |
192 | last_max = max; | |
193 | } | |
194 | } | |
195 | ||
196 | m = reftable_calloc(sizeof(struct reftable_merged_table)); | |
197 | m->stack = stack; | |
198 | m->stack_len = n; | |
199 | m->min = first_min; | |
200 | m->max = last_max; | |
201 | m->hash_id = hash_id; | |
202 | *dest = m; | |
203 | return 0; | |
204 | } | |
205 | ||
206 | /* clears the list of subtable, without affecting the readers themselves. */ | |
207 | void merged_table_release(struct reftable_merged_table *mt) | |
208 | { | |
209 | FREE_AND_NULL(mt->stack); | |
210 | mt->stack_len = 0; | |
211 | } | |
212 | ||
213 | void reftable_merged_table_free(struct reftable_merged_table *mt) | |
214 | { | |
215 | if (!mt) { | |
216 | return; | |
217 | } | |
218 | merged_table_release(mt); | |
219 | reftable_free(mt); | |
220 | } | |
221 | ||
222 | uint64_t | |
223 | reftable_merged_table_max_update_index(struct reftable_merged_table *mt) | |
224 | { | |
225 | return mt->max; | |
226 | } | |
227 | ||
228 | uint64_t | |
229 | reftable_merged_table_min_update_index(struct reftable_merged_table *mt) | |
230 | { | |
231 | return mt->min; | |
232 | } | |
233 | ||
234 | static int reftable_table_seek_record(struct reftable_table *tab, | |
235 | struct reftable_iterator *it, | |
236 | struct reftable_record *rec) | |
237 | { | |
238 | return tab->ops->seek_record(tab->table_arg, it, rec); | |
239 | } | |
240 | ||
241 | static int merged_table_seek_record(struct reftable_merged_table *mt, | |
242 | struct reftable_iterator *it, | |
243 | struct reftable_record *rec) | |
244 | { | |
245 | struct reftable_iterator *iters = reftable_calloc( | |
246 | sizeof(struct reftable_iterator) * mt->stack_len); | |
247 | struct merged_iter merged = { | |
248 | .stack = iters, | |
249 | .typ = reftable_record_type(rec), | |
250 | .hash_id = mt->hash_id, | |
251 | .suppress_deletions = mt->suppress_deletions, | |
252 | }; | |
253 | int n = 0; | |
254 | int err = 0; | |
255 | int i = 0; | |
256 | for (i = 0; i < mt->stack_len && err == 0; i++) { | |
257 | int e = reftable_table_seek_record(&mt->stack[i], &iters[n], | |
258 | rec); | |
259 | if (e < 0) { | |
260 | err = e; | |
261 | } | |
262 | if (e == 0) { | |
263 | n++; | |
264 | } | |
265 | } | |
266 | if (err < 0) { | |
267 | int i = 0; | |
268 | for (i = 0; i < n; i++) { | |
269 | reftable_iterator_destroy(&iters[i]); | |
270 | } | |
271 | reftable_free(iters); | |
272 | return err; | |
273 | } | |
274 | ||
275 | merged.stack_len = n; | |
276 | err = merged_iter_init(&merged); | |
277 | if (err < 0) { | |
278 | merged_iter_close(&merged); | |
279 | return err; | |
280 | } else { | |
281 | struct merged_iter *p = | |
282 | reftable_malloc(sizeof(struct merged_iter)); | |
283 | *p = merged; | |
284 | iterator_from_merged_iter(it, p); | |
285 | } | |
286 | return 0; | |
287 | } | |
288 | ||
289 | int reftable_merged_table_seek_ref(struct reftable_merged_table *mt, | |
290 | struct reftable_iterator *it, | |
291 | const char *name) | |
292 | { | |
293 | struct reftable_ref_record ref = { | |
294 | .refname = (char *)name, | |
295 | }; | |
296 | struct reftable_record rec = { NULL }; | |
297 | reftable_record_from_ref(&rec, &ref); | |
298 | return merged_table_seek_record(mt, it, &rec); | |
299 | } | |
300 | ||
301 | int reftable_merged_table_seek_log_at(struct reftable_merged_table *mt, | |
302 | struct reftable_iterator *it, | |
303 | const char *name, uint64_t update_index) | |
304 | { | |
305 | struct reftable_log_record log = { | |
306 | .refname = (char *)name, | |
307 | .update_index = update_index, | |
308 | }; | |
309 | struct reftable_record rec = { NULL }; | |
310 | reftable_record_from_log(&rec, &log); | |
311 | return merged_table_seek_record(mt, it, &rec); | |
312 | } | |
313 | ||
314 | int reftable_merged_table_seek_log(struct reftable_merged_table *mt, | |
315 | struct reftable_iterator *it, | |
316 | const char *name) | |
317 | { | |
318 | uint64_t max = ~((uint64_t)0); | |
319 | return reftable_merged_table_seek_log_at(mt, it, name, max); | |
320 | } | |
321 | ||
322 | uint32_t reftable_merged_table_hash_id(struct reftable_merged_table *mt) | |
323 | { | |
324 | return mt->hash_id; | |
325 | } | |
326 | ||
327 | static int reftable_merged_table_seek_void(void *tab, | |
328 | struct reftable_iterator *it, | |
329 | struct reftable_record *rec) | |
330 | { | |
331 | return merged_table_seek_record(tab, it, rec); | |
332 | } | |
333 | ||
334 | static uint32_t reftable_merged_table_hash_id_void(void *tab) | |
335 | { | |
336 | return reftable_merged_table_hash_id(tab); | |
337 | } | |
338 | ||
339 | static uint64_t reftable_merged_table_min_update_index_void(void *tab) | |
340 | { | |
341 | return reftable_merged_table_min_update_index(tab); | |
342 | } | |
343 | ||
344 | static uint64_t reftable_merged_table_max_update_index_void(void *tab) | |
345 | { | |
346 | return reftable_merged_table_max_update_index(tab); | |
347 | } | |
348 | ||
349 | static struct reftable_table_vtable merged_table_vtable = { | |
350 | .seek_record = reftable_merged_table_seek_void, | |
351 | .hash_id = reftable_merged_table_hash_id_void, | |
352 | .min_update_index = reftable_merged_table_min_update_index_void, | |
353 | .max_update_index = reftable_merged_table_max_update_index_void, | |
354 | }; | |
355 | ||
356 | void reftable_table_from_merged_table(struct reftable_table *tab, | |
357 | struct reftable_merged_table *merged) | |
358 | { | |
359 | assert(!tab->ops); | |
360 | tab->ops = &merged_table_vtable; | |
361 | tab->table_arg = merged; | |
362 | } |