From 46185daeb212267ebb0e4abe8bdad8c7cd7bfce2 Mon Sep 17 00:00:00 2001 From: Andrew Bartlett Date: Thu, 16 May 2019 10:44:42 +1200 Subject: [PATCH] dsdb: lock metadata.tdb during lock_read in partitions module metadata.tdb was being locked during transactions, but not during read, and we should ensure we take all our locks in order for consistency BUG: https://bugzilla.samba.org/show_bug.cgi?id=13950 Signed-off-by: Andrew Bartlett Reviewed-by: Gary Lockyer --- source4/dsdb/samdb/ldb_modules/partition.c | 60 +++++++++++++----- source4/dsdb/samdb/ldb_modules/partition.h | 1 + .../samdb/ldb_modules/partition_metadata.c | 63 +++++++++++++++++++ 3 files changed, 107 insertions(+), 17 deletions(-) diff --git a/source4/dsdb/samdb/ldb_modules/partition.c b/source4/dsdb/samdb/ldb_modules/partition.c index dca4b7f7993..c9d815b4fb0 100644 --- a/source4/dsdb/samdb/ldb_modules/partition.c +++ b/source4/dsdb/samdb/ldb_modules/partition.c @@ -1435,7 +1435,7 @@ int partition_read_lock(struct ldb_module *module) /* * This order must match that in prepare_commit(), start with - * the metadata partition (sam.ldb) lock + * the top level DB (sam.ldb) lock */ ret = ldb_next_read_lock(module); if (ret != LDB_SUCCESS) { @@ -1449,7 +1449,7 @@ int partition_read_lock(struct ldb_module *module) } /* - * The metadata partition (sam.ldb) lock is not + * The top level DB (sam.ldb) lock is not * enough to block another process in prepare_commit(), * because prepare_commit() is a no-op, if nothing * was changed in the specific backend. @@ -1476,18 +1476,24 @@ int partition_read_lock(struct ldb_module *module) ldb_dn_get_linearized( data->partitions[i]->ctrl->dn)); - /* Back it out, if it fails on one */ - for (i--; i >= 0; i--) { - ret2 = ldb_next_read_unlock(data->partitions[i]->module); - if (ret2 != LDB_SUCCESS) { - ldb_debug(ldb, - LDB_DEBUG_FATAL, - "Failed to unlock db: %s / %s", - ldb_errstring(ldb), - ldb_strerror(ret2)); - } - } - ret2 = ldb_next_read_unlock(module); + goto failed; + } + + /* + * Because in prepare_commit this must come last, to ensure + * lock ordering we have to do this last here also + */ + ret = partition_metadata_read_lock(module); + if (ret != LDB_SUCCESS) { + goto failed; + } + + return LDB_SUCCESS; + +failed: + /* Back it out, if it fails on one */ + for (i--; i >= 0; i--) { + ret2 = ldb_next_read_unlock(data->partitions[i]->module); if (ret2 != LDB_SUCCESS) { ldb_debug(ldb, LDB_DEBUG_FATAL, @@ -1495,10 +1501,16 @@ int partition_read_lock(struct ldb_module *module) ldb_errstring(ldb), ldb_strerror(ret2)); } - return ret; } - - return LDB_SUCCESS; + ret2 = ldb_next_read_unlock(module); + if (ret2 != LDB_SUCCESS) { + ldb_debug(ldb, + LDB_DEBUG_FATAL, + "Failed to unlock db: %s / %s", + ldb_errstring(ldb), + ldb_strerror(ret2)); + } + return ret; } /* unlock all the backends */ @@ -1566,6 +1578,20 @@ int partition_read_unlock(struct ldb_module *module) } } + /* + * Because in prepare_commit this must come last, to ensure + * lock ordering we have to do this last here also + */ + ret = partition_metadata_read_unlock(module); + + /* + * Don't overwrite the original failure code + * if there was one + */ + if (ret == LDB_SUCCESS) { + ret = ret2; + } + return ret; } diff --git a/source4/dsdb/samdb/ldb_modules/partition.h b/source4/dsdb/samdb/ldb_modules/partition.h index 1bf213f9767..5790838e318 100644 --- a/source4/dsdb/samdb/ldb_modules/partition.h +++ b/source4/dsdb/samdb/ldb_modules/partition.h @@ -43,6 +43,7 @@ struct partition_module { struct partition_metadata { struct tdb_wrap *db; int in_transaction; + int read_lock_count; }; struct partition_private_data { diff --git a/source4/dsdb/samdb/ldb_modules/partition_metadata.c b/source4/dsdb/samdb/ldb_modules/partition_metadata.c index 02e56e175a3..349f79c2645 100644 --- a/source4/dsdb/samdb/ldb_modules/partition_metadata.c +++ b/source4/dsdb/samdb/ldb_modules/partition_metadata.c @@ -421,6 +421,69 @@ int partition_metadata_sequence_number_increment(struct ldb_module *module, uint ret = partition_metadata_set_uint64(module, LDB_METADATA_SEQ_NUM, *value, false); return ret; } +/* + lock the database for read - use by partition_lock_read +*/ +int partition_metadata_read_lock(struct ldb_module *module) +{ + struct partition_private_data *data + = talloc_get_type_abort(ldb_module_get_private(module), + struct partition_private_data); + struct tdb_context *tdb = NULL; + int tdb_ret = 0; + int ret; + + if (!data || !data->metadata || !data->metadata->db) { + return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR, + "partition_metadata: metadata not initialized"); + } + tdb = data->metadata->db->tdb; + + if (tdb_transaction_active(tdb) == false && + data->metadata->read_lock_count == 0) { + tdb_ret = tdb_lockall_read(tdb); + } + if (tdb_ret == 0) { + data->metadata->read_lock_count++; + return LDB_SUCCESS; + } else { + /* Sadly we can't call ltdb_err_map(tdb_error(tdb)); */ + ret = LDB_ERR_BUSY; + } + ldb_debug_set(ldb_module_get_ctx(module), + LDB_DEBUG_FATAL, + "Failure during partition_metadata_read_lock(): %s", + tdb_errorstr(tdb)); + return ret; +} + +/* + unlock the database after a partition_metadata_lock_read() +*/ +int partition_metadata_read_unlock(struct ldb_module *module) +{ + struct partition_private_data *data + = talloc_get_type_abort(ldb_module_get_private(module), + struct partition_private_data); + struct tdb_context *tdb = NULL; + + data = talloc_get_type_abort(ldb_module_get_private(module), + struct partition_private_data); + if (!data || !data->metadata || !data->metadata->db) { + return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR, + "partition_metadata: metadata not initialized"); + } + tdb = data->metadata->db->tdb; + + if (!tdb_transaction_active(tdb) && + data->metadata->read_lock_count == 1) { + tdb_unlockall_read(tdb); + data->metadata->read_lock_count--; + return 0; + } + data->metadata->read_lock_count--; + return 0; +} /* -- 2.47.3