type
status
date
slug
summary
tags
category
icon
password
前段时间看了《Linux内核设计与实现》,第 10 章「内核同步方法」中提到了几种内核中的读写锁。它们分别代表了几种比较典型的读写锁设计,非常值得学习,这里记录一下,讨论是基于 2.6 内核和 x86 体系结构的基础上进行的
读/写自旋锁
内核代码中,要定义读/写自旋锁,通过下面的宏进行初始化:
DEFINE_RWLOCK(mr_rwlock);
使用上读锁和写锁是分开加锁的
read_lock(&mr_rwlock); // 只读临界区 read_unlock(&mr_rwlock); write_lock(&mr_rwlock); // 读写临界区 write_unlock(&mr_rwlock);
不能像这样将一个读锁升级成写锁,这会导致死锁;并且写锁也是不可重入的:
read_lock(&mr_rwlock); write_lock(&mr_rwlock);
读/写自旋锁的汇编级别实现比较类似信号量,都是在寄存器上对值做增减,但为了区分读锁和写锁,读锁是固定减 1,写锁则是固定减去一个很大的 magic number,通过结果值比较就能判断锁持有情况
代码分别位于 include/asm-x86_64/rwlock.h 和 /include/asm-x86_64/spinlock.h 中,这两块涉及的核心代码融合一下大致如下:
#define RW_LOCK_BIAS 0x01000000 // 初始化时设置这个值为 magic number #define rwlock_init(x) do { *(x) = RW_LOCK_UNLOCKED; } while(0) // 获取读锁 // 将eax寄存器指向的地址,也就是变量rw的值减1(subl $1) // 根据结果是否小于0(js 2f)做跳转,小于0就去调用传入的helper做获取失败的处理 // 没有写锁的情况下是不会小于0的,不可能有这么多读者能让它减到0,所以不小于0就是获取成功 #define __build_read_lock_ptr(rw, helper) \ asm volatile(LOCK "subl $1,(%0)\n\t" \ "js 2f\n" \ "1:\n" \ LOCK_SECTION_START("") \ "2:\tcall " helper "\n\t" \ "jmp 1b\n" \ LOCK_SECTION_END \ ::"a" (rw) : "memory") // 传入的helper位于arch/i386/kernel/semaphore.c // 这里就是一个自旋的逻辑,持续操作rw直到它是大于等于0的,相当于: // // lock_failed: // rw++; // while(rw < 1) {} // rw--; // if(rw < 0) goto lock_failed; // // 这里结尾还要判断一次,是因为前面的操作整体上不是原子或互斥的,所以decl后要再check一下 asm( ".text\n" ".align 4\n" ".globl __read_lock_failed\n" "__read_lock_failed:\n\t" LOCK "incl (%eax)\n" "1: rep; nop\n\t" "cmpl $1,(%eax)\n\t" "js 1b\n\t" LOCK "decl (%eax)\n\t" "js __read_lock_failed\n\t" "ret" ); // 获取写锁 // eax寄存器指向的地址值,也就是rw减去magic number // 判断结果是否刚好为0(jnz 2f),不是则说明有读锁在占用,跳转到helper调用 #define __build_write_lock_ptr(rw, helper) \ asm volatile(LOCK "subl $" RW_LOCK_BIAS_STR ",(%0)\n\t" \ "jnz 2f\n" \ "1:\n" \ LOCK_SECTION_START("") \ "2:\tcall " helper "\n\t" \ "jmp 1b\n" \ LOCK_SECTION_END \ ::"a" (rw) : "memory") // 写锁传入的helper同样位于arch/i386/kernel/semaphore.c // 逻辑和读锁的基本一样 asm( ".text\n" ".align 4\n" ".globl __write_lock_failed\n" "__write_lock_failed:\n\t" LOCK "addl $" RW_LOCK_BIAS_STR ",(%eax)\n" "1: rep; nop\n\t" "cmpl $" RW_LOCK_BIAS_STR ",(%eax)\n\t" "jne 1b\n\t" LOCK "subl $" RW_LOCK_BIAS_STR ",(%eax)\n\t" "jnz __write_lock_failed\n\t" "ret" ); // 解锁时就是加上1或magic number #define _raw_read_unlock(rw) asm volatile("lock ; incl %0" :"=m" ((rw)->lock) : : "memory") #define _raw_write_unlock(rw) asm volatile("lock ; addl $" RW_LOCK_BIAS_STR ",%0":"=m" ((rw)->lock) : : "memory")
可以看出,读/写自旋锁是读优先的,会导致写饥饿。当有一个或多个读者持有读锁时,写操作无法获取锁,如果此时读锁被长时间占有,写锁将一直自旋等待,此时自旋会导致一个核心上的高昂开销
这里有一个处理上的细节,在真正开始自旋获取到写锁之前,就已经互斥了(减去 magic number),这时候新的读锁是无法获取的,这避免了写锁和读锁的争抢,能稍微缓解下写饥饿问题,例如这样的情况:
TIME ─────────────────────────────────────────────────────────► ┌──────────────┐ │ reader │ └──────────────┘ ┌──────────┬────────────────────┐ │ spin │ writer │ └──────────┴────────────────────┘ ┌─────────────────────────┬───────────────┐ │ spin │ reader │ └─────────────────────────┴───────────────┘ ┌────────────┬───────────┐ │ spin │ reader │ └────────────┴───────────┘
读/写信号量
读/写信号量的使用上和读/写自旋锁是类似的,但功能上要强大一些,分别支持静态和动态的初始化方法
// 静态定义 static DECLARE_RWSEM(name); // 动态定义 init_rwsem(struct rw_semaphore *sem);
读/写信号量支持 trylock 操作和动态地将写锁降级为读锁
static DECLARE_RWSEM(mr_rwsem); down_read(&mr_rwsem); // 只读临界区 up_read(&mr_rwsem); down_write(&mr_rwsem); // 读写临界区 up_write(&mr_rwsem); down_write(&mr_rwsem); // 读写临界区 downgrade_write(&mr_rwsem); // 写锁降级读锁 // 只读临界区 up_read(&mr_rwsem);
include/asm-x86_64/rwsem.h 中定义了汇编级别上的实现,和前面读/写自旋锁是类似的,但没有自旋,这里就不细说,内核源码这里也直接附带注释了:
#define RWSEM_UNLOCKED_VALUE 0x00000000 #define RWSEM_ACTIVE_BIAS 0x00000001 #define RWSEM_ACTIVE_MASK 0x0000ffff #define RWSEM_WAITING_BIAS (-0x00010000) #define RWSEM_ACTIVE_READ_BIAS RWSEM_ACTIVE_BIAS #define RWSEM_ACTIVE_WRITE_BIAS (RWSEM_WAITING_BIAS + RWSEM_ACTIVE_BIAS) /* * lock for reading */ static inline void __down_read(struct rw_semaphore *sem) { __asm__ __volatile__( "# beginning down_read\n\t" LOCK_PREFIX " incl (%%rdi)\n\t" /* adds 0x00000001, returns the old value */ " js 2f\n\t" /* jump if we weren't granted the lock */ "1:\n\t" LOCK_SECTION_START("") \ "2:\n\t" " call rwsem_down_read_failed_thunk\n\t" " jmp 1b\n" LOCK_SECTION_END \ "# ending down_read\n\t" : "+m"(sem->count) : "D"(sem) : "memory", "cc"); } /* * lock for writing */ static inline void __down_write(struct rw_semaphore *sem) { int tmp; tmp = RWSEM_ACTIVE_WRITE_BIAS; __asm__ __volatile__( "# beginning down_write\n\t" LOCK_PREFIX " xaddl %0,(%%rdi)\n\t" /* subtract 0x0000ffff, returns the old value */ " testl %0,%0\n\t" /* was the count 0 before? */ " jnz 2f\n\t" /* jump if we weren't granted the lock */ "1:\n\t" LOCK_SECTION_START("") "2:\n\t" " call rwsem_down_write_failed_thunk\n\t" " jmp 1b\n" LOCK_SECTION_END "# ending down_write" : "=&r" (tmp) : "0"(tmp), "D"(sem) : "memory", "cc"); } /* * unlock after reading */ static inline void __up_read(struct rw_semaphore *sem) { __s32 tmp = -RWSEM_ACTIVE_READ_BIAS; __asm__ __volatile__( "# beginning __up_read\n\t" LOCK_PREFIX " xaddl %[tmp],(%%rdi)\n\t" /* subtracts 1, returns the old value */ " js 2f\n\t" /* jump if the lock is being waited upon */ "1:\n\t" LOCK_SECTION_START("") "2:\n\t" " decw %w[tmp]\n\t" /* do nothing if still outstanding active readers */ " jnz 1b\n\t" " call rwsem_wake_thunk\n\t" " jmp 1b\n" LOCK_SECTION_END "# ending __up_read\n" : "+m"(sem->count), [tmp] "+r" (tmp) : "D"(sem) : "memory", "cc"); } /* * unlock after writing */ static inline void __up_write(struct rw_semaphore *sem) { unsigned tmp; __asm__ __volatile__( "# beginning __up_write\n\t" " movl %[bias],%[tmp]\n\t" LOCK_PREFIX " xaddl %[tmp],(%%rdi)\n\t" /* tries to transition 0xffff0001 -> 0x00000000 */ " jnz 2f\n\t" /* jump if the lock is being waited upon */ "1:\n\t" LOCK_SECTION_START("") "2:\n\t" " decw %w[tmp]\n\t" /* did the active count reduce to 0? */ " jnz 1b\n\t" /* jump back if not */ " call rwsem_wake_thunk\n\t" " jmp 1b\n" LOCK_SECTION_END "# ending __up_write\n" : "+m"(sem->count), [tmp] "=r" (tmp) : "D"(sem), [bias] "i"(-RWSEM_ACTIVE_WRITE_BIAS) : "memory", "cc"); }
信号量是睡眠锁,读锁和写锁在获取锁失败时最后都会进入到
rwsem_down_failed_common
(位于 lib/rwsem.c) 中,这里会将进程加入等待队列中,然后重新调度进程static inline struct rw_semaphore *rwsem_down_failed_common(struct rw_semaphore *sem, struct rwsem_waiter *waiter, signed long adjustment) { struct task_struct *tsk = current; signed long count; set_task_state(tsk,TASK_UNINTERRUPTIBLE); /* set up my own style of waitqueue */ spin_lock(&sem->wait_lock); waiter->task = tsk; list_add_tail(&waiter->list,&sem->wait_list); /* note that we're now waiting on the lock, but no longer actively read-locking */ count = rwsem_atomic_update(adjustment,sem); /* if there are no longer active locks, wake the front queued process(es) up * - it might even be this process, since the waker takes a more active part */ if (!(count & RWSEM_ACTIVE_MASK)) sem = __rwsem_do_wake(sem,1); spin_unlock(&sem->wait_lock); /* wait to be given the lock */ for (;;) { if (!waiter->flags) break; schedule(); set_task_state(tsk, TASK_UNINTERRUPTIBLE); } tsk->state = TASK_RUNNING; return sem; }
值得一提的是
trylock
和 downgrade_write
操作,这两个操作是读/写自旋锁中没有的两个
trylock
是类似的,都是用 cmpxchg
指令来做 CAS(compare-and-swap) 操作。写锁的 trylock
比较简单,因为是互斥的,所以只需要对初始值做 CAS 即可。而读锁可能被持有多个,所以它的 trylock
需要先将 sem->count
赋值给 tmp
,再自增 tmp
,利用 tmp
的值进行 CAS/* * trylock for reading -- returns 1 if successful, 0 if contention */ static inline int __down_read_trylock(struct rw_semaphore *sem) { __s32 result, tmp; __asm__ __volatile__( "# beginning __down_read_trylock\n\t" " movl %0,%1\n\t" "1:\n\t" " movl %1,%2\n\t" " addl %3,%2\n\t" " jle 2f\n\t" LOCK_PREFIX " cmpxchgl %2,%0\n\t" " jnz 1b\n\t" "2:\n\t" "# ending __down_read_trylock\n\t" : "+m"(sem->count), "=&a"(result), "=&r"(tmp) : "i"(RWSEM_ACTIVE_READ_BIAS) : "memory", "cc"); return result>=0 ? 1 : 0; } /* * trylock for writing -- returns 1 if successful, 0 if contention */ static inline int __down_write_trylock(struct rw_semaphore *sem) { signed long ret = cmpxchg(&sem->count, RWSEM_UNLOCKED_VALUE, RWSEM_ACTIVE_WRITE_BIAS); if (ret == RWSEM_UNLOCKED_VALUE) return 1; return 0; }
downgrade_write 的实现则类似写锁解锁,然后判断是否有必要唤醒等待队列中的项,这里其实和写锁解锁(
up_write
)的主要差别就是给 sem->count
加上的偏移量少了 1(可以回去看前面几个 RWSEM_
开头的宏定义),而这个 1 就是读锁占的值/* * downgrade write lock to read lock */ static inline void __downgrade_write(struct rw_semaphore *sem) { __asm__ __volatile__( "# beginning __downgrade_write\n\t" LOCK_PREFIX " addl %[bias],(%%rdi)\n\t" /* transitions 0xZZZZ0001 -> 0xYYYY0001 */ " js 2f\n\t" /* jump if the lock is being waited upon */ "1:\n\t" LOCK_SECTION_START("") "2:\n\t" " call rwsem_downgrade_thunk\n" " jmp 1b\n" LOCK_SECTION_END "# ending __downgrade_write\n" : "=m"(sem->count) : "D"(sem), [bias] "i"(-RWSEM_WAITING_BIAS), "m"(sem->count) : "memory", "cc"); } /* * downgrade a write lock into a read lock * - caller incremented waiting part of count, and discovered it to be still negative * - just wake up any readers at the front of the queue */ struct rw_semaphore *rwsem_downgrade_wake(struct rw_semaphore *sem) { rwsemtrace(sem,"Entering rwsem_downgrade_wake"); spin_lock(&sem->wait_lock); /* do nothing if list empty */ if (!list_empty(&sem->wait_list)) sem = __rwsem_do_wake(sem,0); spin_unlock(&sem->wait_lock); rwsemtrace(sem,"Leaving rwsem_downgrade_wake"); return sem; }
总结一下,读/写信号量和读/写自旋锁类似,两者语义上是相同的,也都是读优先的,只不过信号量是睡眠锁,当有长时间获取不到锁的情况时,不会导致过多的 CPU 开销
顺序锁
顺序锁和前面两者有个重要的区别,顺序锁是写优先的,让我们来分析下它是如何实现的
首先还是使用方式上:
// 定义一个顺序锁 seqlock_t mr_seq_lock = DEFINE_SEQLOCK(mr_seq_lock); write_seqlock(&mr_seq_lock); // 读写临界区 write_sequnlock(&mr_seq_lock); // 读锁的使用有较大区别 unsigned long seq; do { seq = read_seqbegin(&mr_seq_lock); // 读取数据... } while (read_seqretry(&mr_seq_lock, seq));
一个使用例子是内核的 jiffies,它存储了机器启动到当前的时钟节拍,每次时钟中断时都会更新这个值,所以是一个高频写入的场景,
get_jiffies_64()
函数用来获取这个值,它的实现是这样的u64 get_jiffies_64(void) { unsigned long seq; u64 ret; do { seq = read_seqbegin(&xtime_lock); ret = jiffies_64; } while (read_seqretry(&xtime_lock, seq)); return ret; }
顺序锁在读取时需要一个循环,这是为了判断在这个过程中是否有发生写入,如果没有,那么读取就是安全的,否则需要重试
实现上,代码位于 /include/linux/seqlock.h,有比较清晰的注释:
#define SEQLOCK_UNLOCKED { 0, SPIN_LOCK_UNLOCKED } #define seqlock_init(x) do { *(x) = (seqlock_t) SEQLOCK_UNLOCKED; } while (0) /* Lock out other writers and update the count. * Acts like a normal spin_lock/unlock. * Don't need preempt_disable() because that is in the spin_lock already. */ static inline void write_seqlock(seqlock_t *sl) { spin_lock(&sl->lock); ++sl->sequence; smp_wmb(); } static inline void write_sequnlock(seqlock_t *sl) { smp_wmb(); sl->sequence++; spin_unlock(&sl->lock); } static inline int write_tryseqlock(seqlock_t *sl) { int ret = spin_trylock(&sl->lock); if (ret) { ++sl->sequence; smp_wmb(); } return ret; } /* Start of read calculation -- fetch last complete writer token */ static inline unsigned read_seqbegin(const seqlock_t *sl) { unsigned ret = sl->sequence; smp_rmb(); return ret; } /* Test if reader processed invalid data. * If initial values is odd, * then writer had already started when section was entered * If sequence value changed * then writer changed data while in section * * Using xor saves one conditional branch. */ static inline int read_seqretry(const seqlock_t *sl, unsigned iv) { smp_rmb(); return (iv & 1) | (sl->sequence ^ iv); }
可以看到,顺序锁是基于一个自旋锁实现的。但额外依赖一个序列计数器,当获取写锁时,这个序列值会增加。读取数据时要先调用
read_seqbegin
,它会返回这个序列值,读取完成后通过 read_seqretry
检查传入的值 iv
,满足以下两个条件则说明读是安全的:- 如果
iv
是偶数(初始值为 0,写锁会加 1)则说明不是处在一个写操作进行的过程中
iv
和序列值相同(相同值异或结果为 0)说明没有写操作发生过
这两者都满足,读取的值就是有效的
所以,顺序锁是一种乐观锁,是不存在「读锁」的,而是通过类似版本号的机制来读,因此只要没有其他写者,随时都可以获取到写锁,以此实现写优先