黄东旭解析 TiDB 的核心优势
1068
2024-03-18

本篇文章将会对锁冲突场景下,常用的解决锁冲突的接口,进行个人对代码的理解与解析,希望对大家理解 TIKV 分布式事务有所帮助
这个接口的作用比较明显,主要是事务中,遇到锁冲突 (一般是悲观事务过程中加悲观锁,或者 prewrite 过程中加的锁),从锁信息中获取到其 Primary KEY ,进而通过这个接口来看 primary key 的当前事务状态。事务状态可能是已经提交,已经回滚,或者还在事务中。
特别的,如果发现 Primary KEY 的锁已经过期了,CheckTxnStatus 还会主动将其进行保护模式的回滚。
假如目前有两个并发事务,t1 和 t2
t1 在事务过程中,发现了 t2 的锁,因此 t1 调用了 CheckTxnStatus 接口来查看 t2 当前的状态
primary_key: 需要查看的 t2 主键 KEY
lock_ts: t2 的 start_ts
caller_start_ts: t1 的 start_ts
current_ts: 当前的 ts
rollback_if_not_exist : 如果没有发现任何提交记录或者回滚记录的时候,是直接回滚还是返回错误
force_sync_commit: async_commit 的场景下,是否强制推进 async_commit 进程,否则返回 uncommitted
resolving_pessimistic_lock:false 代表本意是想解析 prewrite lock ,true 代表本意是想解析悲观锁。
verify_is_primary: 验证主键上面的锁是主键锁 ( issue 42937 ),目前默认开启该校验功能
和其他接口一样,首先需要获取 Primary 的锁信息
如果找到的锁是 符合预期 的 t2 的锁,那么调用 check_txn_status_lock_exists
特别地,如果悲观锁信息显示该 lock 是通过公平锁功能写入的,那么这个 lock 需要进一步进行检查验证,防止 issue 43540 ,进一步查看该 lock 对应的事务是已经提交或者回滚,防止其是 stale 的 lock。如果确实没有任何提交记录或者回滚记录,那么可以才可以认为该悲观锁是可用的有效的。否则的话,直接清楚该悲观锁,返回事务状态即可。
如果悲观锁已经过期
如果悲观锁未过期,那么更新 lock 的 min_commit_ts,并且返回 TxnStatus::uncommitted
如果预期检查的 lock 就是悲观锁,那么只需要清理悲观锁,返回 Ok(TxnStatus::PessimisticRollBack) 即可,无需回滚
如果预期检查的 lock 是 prewrite lock,我们就需要清理悲观锁的同时,还需要留下回滚的记录 (非保护模式下,笔者目前不太了解为何还是非保护模式)
如果已经过期,那么直接回滚,返回 TxnStatus::TtlExpire。(这里的回滚好像是非保护模式的?按理说应该对 primary lock 进行保护模式的回滚,笔者比较疑惑)
如果还未过期,那么更新 lock 的 min_commit_ts,并且返回 TxnStatus::uncommitted
Corner Case:但是存在一个特殊情况 (lock 是悲观锁 && resolving_pessimistic_lock 是 false )。就是说本来想要解析的是 prewrite lock,结果发现是悲观锁,而且锁的 primary 主键 key 还对应不上,这种场景下会网开一面并不会报错,而是会清理悲观锁,并且使用 check_txn_status_missing_lock 来进行进一步查看事务状态。这种场景的出现可能是因为悲观事务的 acquire_pessimistic_lock 接口被 stale 调用导致的
首先需要校验这个 lock 的合法性:如果 verify_is_primary 参数是 true,结果发现这个锁信息中的 primary key 和请求参数的 primary 对应不上,那么需要返回错误 PrimaryMismatch,这种情况可能是 primary key 被替换了。
如果是 use_async_commit 类型的 lock,非强制模式下 ( force_sync_commit 为 false),直接返回 Ok(TxnStatus::uncommitted) 后续可能会进行重试。否则的话,继续执行
如果 lock 是 prewrite lock 的话,是符合预期的进一步判断 lock 的过期时间
如果 lock 是悲观锁的话,需要使用 check_txn_status_from_pessimistic_primary_lock 进一步处理
如果没有找到锁,或者找到的锁是不是符合预期的 t2 的锁,属于非预期场景,那么调用 check_txn_status_missing_lock
check_txn_status_missing_lock 这个函数我们应该很熟悉了,这个函数在 rollback、cleanup 函数中也会被调用,但是由于 MissingLockAction 的不同,逻辑稍微有些变化:
如果发现有本事务的 OverlappedRollback 的记录或者回滚记录 (SingleRecord::Rollback),说明已经回滚完成,直接返回 OK
如果发现有本事务提交记录的话,返回 ErrorInner::Committed
如果没有找到任何本事务 write 记录的话,属于非预期场景
调用 mark_rollback_on_mismatching_lock 在这个 LOCK 上面添加回滚 LockTS 标记,这样这个 lock 所涉及的事务在提交后,如果发现自己的 commitTS 和 LockTS 重叠的话,需要设置一下 overlap 标记
调用 make_rollback 写入保护模式的 rollback 记录,确保这个回滚记录不会被删除
删除 collapse 以前的非保护rollback 记录
如果 rollback_if_not_exist 为 false,那么直接返回 ErrorInner::TxnNotFound
如果 resolving_pessimistic_lock 参数为 true 的话,就是说目标是解析悲观锁,结果并没有发现该锁,这时候会返回 Ok(TxnStatus::LockNotExistDoNothing)
如果 rollback_if_not_exist 为 true,那么需要进行保护模式的回滚操作:
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
... let (txn_status, released) = match reader.load_lock(&self.primary_key)? {
Some(lock) if lock.ts == self.lock_ts => check_txn_status_lock_exists(
&mut txn,
&mut reader,
self.primary_key,
lock,
self.current_ts,
self.caller_start_ts,
self.force_sync_commit,
self.resolving_pessimistic_lock,
self.verify_is_primary,
self.rollback_if_not_exist,
)?,
l => (
check_txn_status_missing_lock(
&mut txn,
&mut reader,
self.primary_key,
l,
MissingLockAction::rollback(self.rollback_if_not_exist),
self.resolving_pessimistic_lock,
)?,
None,
),
};
...
Ok(WriteResult {
...
})
}pub fn check_txn_status_lock_exists(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<impl Snapshot>,
primary_key: Key,
mut lock: Lock,
current_ts: TimeStamp,
caller_start_ts: TimeStamp,
force_sync_commit: bool,
resolving_pessimistic_lock: bool,
verify_is_primary: bool,
rollback_if_not_exist: bool,
) -> Result<(TxnStatus, Option<ReleasedLock>)> {
if verify_is_primary && !primary_key.is_encoded_from(&lock.primary) {
return match (resolving_pessimistic_lock, lock.is_pessimistic_lock()) {
(false, true) => {
...
let txn_status = check_txn_status_missing_lock(
...
MissingLockAction::rollback(rollback_if_not_exist),
resolving_pessimistic_lock,
)?; Ok((txn_status, released))
}
_ => {
Err(
ErrorInner::PrimaryMismatch... )
}
};
} // Never rollback or push forward min_commit_ts in check_txn_status if it's
// using async commit. Rollback of async-commit locks are done during
// ResolveLock.
if lock.use_async_commit {
if force_sync_commit {
...
} else {
return Ok((TxnStatus::uncommitted(lock, false), None));
}
} let is_pessimistic_txn = !lock.for_update_ts.is_zero();
if lock.is_pessimistic_lock() {
let check_result = check_txn_status_from_pessimistic_primary_lock(
...
resolving_pessimistic_lock,
)?;
...
} else if lock.ts.physical() + lock.ttl < current_ts.physical() {
let released = rollback_lock(txn, reader, primary_key, &lock, is_pessimistic_txn, true)?;
return Ok((TxnStatus::TtlExpire, released));
} if !lock.min_commit_ts.is_zero()
&& !caller_start_ts.is_max()
// Push forward the min_commit_ts so that reading won't be blocked by locks.
&& caller_start_ts >= lock.min_commit_ts
{
lock.min_commit_ts = ...
} Ok((TxnStatus::uncommitted(lock, min_commit_ts_pushed), None))}
fn check_txn_status_from_pessimistic_primary_lock(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<impl Snapshot>,
primary_key: Key,
lock: &Lock,
current_ts: TimeStamp,
resolving_pessimistic_lock: bool,
) -> Result<(Option<TxnStatus>, Option<ReleasedLock>)> {
if lock.is_pessimistic_lock_with_conflict() {
if let Some(txn_status) = check_determined_txn_status(reader, &primary_key)? {
...
let released = txn.unlock_key(primary_key, true, TimeStamp::zero());
return Ok((Some(txn_status), released));
}
} if lock.ts.physical() + lock.ttl < current_ts.physical() {
return if resolving_pessimistic_lock {
let released = txn.unlock_key(primary_key, true, TimeStamp::zero());
Ok((Some(TxnStatus::PessimisticRollBack), released))
} else {
let released = rollback_lock(txn, reader, primary_key, lock, true, true)?;
Ok((Some(TxnStatus::TtlExpire), released))
};
} Ok((None, None))}CheckSecondaryLocks 接口主要是应用与 Async Commit 所用,用来查看异步 commit 的过程中,通过 primary lock 上面的 secondary 来查看所有的 prewrite lock,进而分析事务到底是否提交。
/// Check secondary locks of an async commit transaction.
///
/// If all prewritten locks exist, the lock information is returned.
/// Otherwise, it returns the commit timestamp of the transaction.
///
/// If the lock does not exist or is a pessimistic lock, to prevent the
/// status being changed, a rollback may be written.
keys:事务涉及到被加锁的 keys
start_ts: 事务的开始 ts
对每个 key 查询所对应的 lock
如果通过某一个 key 发现了提交或者回滚记录,那么直接可以 break,返回结果。
如果没有记录,也没有找到锁的话,那么就需要回滚,并且是以保护模式下进行回滚,然后 break,返回结果。
否则的话需要遍历所有的 key,收集 lock 信息
如果如预期一样查询到了事务的 lock,那么就会使用 check_status_from_lock 进行进一步检查
和 checkTxnStatus 一样,如果 lock 是公平锁冲突加锁的话,需要进一步查看提交、回滚、无记录状态。如果是提交或者回滚状态,那么直接可以终止 CheckSecondaryLocks 返回结果。如果是无记录状态的话,可以将其当做普通的悲观锁
悲观锁是非预期状态,这个时候需要清理悲观锁,将其当做无记录也没有找到 lock 的场景来看,也就是执行回滚操作,然后终止 CheckSecondaryLocks
如果 lock 是悲观锁
如果 lock 是 prewrite lock,符合预期,返回锁信息,继续检查其他 key 的状态
如果没有 lock,或者没有查询到预期事务的 lock,那么就会 check_determined_txn_status 进一步查询提交或者回滚的记录
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
...
let mut released_locks = ReleasedLocks::new();
let mut result = SecondaryLocksStatus::Locked(Vec::new()); for key in self.keys {
let mut released_lock = None;
let mut mismatch_lock = None;
// Checks whether the given secondary lock exists.
let (status, need_rollback, rollback_overlapped_write) = match reader.load_lock(&key)? {
// The lock exists, the lock information is returned.
Some(lock) if lock.ts == self.start_ts => {
let (status, need_rollback, rollback_overlapped_write, lock_released) =
check_status_from_lock(&mut txn, &mut reader, lock, &key, region_id)?;
released_lock = lock_released;
(status, need_rollback, rollback_overlapped_write)
}
// Searches the write CF for the commit record of the lock and returns the commit
// timestamp (0 if the lock is not committed).
l => {
mismatch_lock = l;
check_determined_txn_status(&mut reader, &key)?
}
};
if need_rollback {
if let Some(l) = mismatch_lock {
txn.mark_rollback_on_mismatching_lock(&key, l, true);
}
// We must protect this rollback in case this rollback is collapsed and a stale
// acquire_pessimistic_lock and prewrite succeed again.
if let Some(write) = make_rollback(self.start_ts, true, rollback_overlapped_write) {
txn.put_write(key.clone(), self.start_ts, write.as_ref().to_bytes());
collapse_prev_rollback(&mut txn, &mut reader, &key)?;
}
}
released_locks.push(released_lock);
match status {
SecondaryLockStatus::Locked(lock) => {
result.push(lock.into_lock_info(key.to_raw()?));
}
SecondaryLockStatus::Committed(commit_ts) => {
result = SecondaryLocksStatus::Committed(commit_ts);
break;
}
SecondaryLockStatus::RolledBack => {
result = SecondaryLocksStatus::RolledBack;
break;
}
}
} ...
}}
fn check_status_from_lock<S: Snapshot>(
txn: &mut MvccTxn,
reader: &mut ReaderWithStats<'_, S>,
lock: Lock,
key: &Key,
region_id: u64,
) -> Result<(
SecondaryLockStatus,
bool,
Option<OverlappedWrite>,
Option<ReleasedLock>,
)> {
let mut overlapped_write = None;
if lock.is_pessimistic_lock_with_conflict() {
let (status, need_rollback, rollback_overlapped_write) =
check_determined_txn_status(reader, key)?; if !need_rollback {
let released_lock = txn.unlock_key(key.clone(), true, TimeStamp::zero());
return Ok((
...
));
}
overlapped_write = rollback_overlapped_write;
} if lock.is_pessimistic_lock() {
let released_lock = txn.unlock_key(key.clone(), true, TimeStamp::zero());
let overlapped_write_res = if lock.is_pessimistic_lock_with_conflict() {
overlapped_write
} else {
reader.get_txn_commit_record(key)?.unwrap_none(region_id)
};
Ok((
...
))
} else {
Ok((SecondaryLockStatus::Locked(lock), false, None, None))
}}
fn check_determined_txn_status<S: Snapshot>(
reader: &mut ReaderWithStats<'_, S>,
key: &Key,
) -> Result<(SecondaryLockStatus, bool, Option<OverlappedWrite>)> {
match reader.get_txn_commit_record(key)? {
TxnCommitRecord::SingleRecord { commit_ts, write } => {
let status = if write.write_type != WriteType::Rollback {
SecondaryLockStatus::Committed(commit_ts)
} else {
SecondaryLockStatus::RolledBack
};
// We needn't write a rollback once there is a write record for it:
// If it's a committed record, it cannot be changed.
// If it's a rollback record, it either comes from another
// check_secondary_lock (thus protected) or the client stops commit
// actively. So we don't need to make it protected again.
Ok((status, false, None))
}
TxnCommitRecord::OverlappedRollback { .. } => {
Ok((SecondaryLockStatus::RolledBack, false, None))
}
TxnCommitRecord::None { overlapped_write } => {
Ok((SecondaryLockStatus::RolledBack, true, overlapped_write))
}
}
}通过 checkTxnStatus 查询到 primary key 的事务状态后,就需要 ResolveLock 对 secondary key 进行提交或者回滚。如果 primary key 已经提交了,那么 ResolveLock 对 secondary key 进行提交。如果 primary key 已经回滚了,那么 ResolveLock 对 secondary key 进行回滚。
start_ts:事务的开始 ts
commit_ts: 事务的提交 ts。当需要回滚的时候,该值为 0;否则的话,该值不为 0
resolve_keys: 需要提交或者回滚的 secondary keys
代码非常简单了,直接调用提交或者回滚的函数即可。注意对于 secondary key 来说,回滚是非保护模式的。
impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for ResolveLockLite {
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
...
for key in self.resolve_keys {
released_locks.push(if !self.commit_ts.is_zero() {
commit(&mut txn, &mut reader, key, self.commit_ts)?
} else {
cleanup(&mut txn, &mut reader, key, TimeStamp::zero(), false)?
});
}
Ok(WriteResult {
...
})
}}版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系小编 edito_r@163.com 处理,核实后本网站将在24小时内删除侵权内容。