道生一,一生二,二生三,三生万物
这张尽量结合上一章进行使用:上一章
这章主要是讲如何通过
redis实现
这里我用
技术:
这里是有一个大体的实现思路:主要是使用
redis 命令说明:
setnx 命令:set if not exists ,当且仅当key 不存在时,将key 的值设为value 。若给定的key 已经存在,则 SETNX不做任何动作。
- 返回1,说明该进程获得锁,将密钥的值设为值
- 返回0,说明其他进程已经获得了锁,进程不能进入临界区命令格式:设置锁。
get 命令:获取键的值
- 如果存在,则返回
- 如果不存在,则返回
nil 命令格式:获取锁getset 命令:该方法是原子的,对键设置newvalue 这个值,并且返回键原来的旧值。
- 命令格式:设置锁并设置键新值
del 命令:删除redis 中指定的key
- 命令格式:
del lock.key
看了很多博客,这里总结一些比较常用的一些方法:
方案1:
原理:基于set命令的分布式锁
使用:set命令
存在问题:可能产生死锁
- 原因:假设线程获取了锁之后,在执行任务的过程中挂掉,来不及显示地执行del命令释放锁,那么竞争该锁的线程都会执行不了,产生死锁的情况。
- 解决办法:设置锁超时时间
- 原理:可以使用expire命令设置锁超时时间
- 使用:setnx 的 key 必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。
- 存在问题:可能产生死锁
- 问题原因:setnx 和 expire 不是原子性的操作:
假设某个线程执行 setnx 命令,成功获得了锁,但是还没来得及执行expire 命令,服务器就挂掉了,这样一来,这把锁就没有设置过期时间了,变成了死锁,别的线程再也没有办法获得锁了 - 使用:setnx 的 key 必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放
- 解决办法:redis 的 set 命令支持在获取锁的同时设置 key 的过期时间
- 存在问题:锁过期提前自动释放,线程A删除了线程B的锁
- 问题原因:锁过期提前自动释放
- 假如线程A成功得到了锁,并且设置的超时时间是 30 秒。如果某些原因导致线程 A 执行的很慢,过了 30 秒都没执行完,这时候锁过期自动释放,线程 B 得到了锁。
- 随后,线程A执行完任务,接着执行del指令来释放锁。但这时候线程 B 还没执行完,线程A实际上删除的是线程B加的锁。
- 使用:在加锁的时候把当前的线程 ID 当做value,并在删除之前验证 key 对应的 value 是不是自己线程的 ID
- 解决办法:可以在 del 释放锁之前做一个判断,验证当前的锁是不是自己加的锁
- 存在问题:get操作、判断和释放锁是两个独立操作,非原子操作
- 问题原因:判断和释放锁是两个独立操作
- 解决办法:对于非原子性的问题,我们可以使用Lua脚本来确保操作的原子性
- 问题原因:锁过期提前自动释放
- 问题原因:setnx 和 expire 不是原子性的操作:
诺是想要更好的体验可以通过我的飞书观看:飞升思维导图
方式2:
这里的一些出现的方法是
具体的实现操作:
const ( //解锁,使用lua变成原子性 unLockScript = "if redis.call('get',KEYS[1])==ARGV[1]" + "then redis.call('del',KEYS[1]) " + "return 1 " + "else " + "return 0 " + "end" //续期(看门狗) watchLogScript = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end" ) type DispersedLock struct { key string //锁 value string //锁的值,随机值(可以用userId+requestId) expire int //锁过期时间,单位毫秒 lockClient redis.Cmdable //启用锁的客户端,redis目前 unLockScript string //lua 脚本 watchLogScript string //看门狗 lua unlockChan chan struct{} //通知通道 } func (d DispersedLock) getScript(ctx context.Context, script string) string { result, _ := d.lockClient.ScriptLoad(ctx, script).Result() return result } var scriptMap sync.Map func NewLockRedis(ctx context.Context, cmdable redis.Cmdable, key string, expire int, value string) *DispersedLock { lock := &DispersedLock{ key: key, value: value, expire: expire, } lock.lockClient = cmdable lockScrip, _ := scriptMap.LoadOrStore("dispersed_lock", lock.getScript(ctx, unLockScript)) lockWatch, _ := scriptMap.LoadOrStore("watch_log", lock.getScript(ctx, watchLogScript)) lock.unLockScript = lockScrip.(string) lock.watchLogScript = lockWatch.(string) lock.unlockChan = make(chan struct{}, 0) return lock } func (d DispersedLock) Lock(ctx context.Context) bool { ok, _ := d.lockClient.SetNX(ctx, d.key, d.value, time.Duration(d.expire)*time.Millisecond).Result() if ok { go d.watchDog(ctx) } return ok } func (d DispersedLock) watchDog(ctx context.Context) { //创建一个定时器,每到工作时间的2/3就出发一次 duration := time.Duration(d.expire*1e3*2/3) * time.Millisecond ticker := time.NewTicker(duration) //打包成原子 for { select { case <-ticker.C: //脚本参数 args := []interface{}{ d.value, d.expire, } result, err := d.lockClient.Eval(ctx, d.watchLogScript, []string{d.key}, args...).Result() if err != nil { logS.LogM.ErrorF(ctx, "watchDog error %s", err) return } res, ok := result.(int64) if !ok { return } if res == 0 { return } case <-d.unlockChan: return } } } func (d DispersedLock) unlock(ctx context.Context) bool { //脚本参数 args := []interface{}{ d.value, } result, _ := d.lockClient.Eval(ctx, d.unLockScript, []string{d.key}, args...).Result() close(d.unlockChan) if result.(int64) > 0 { return true } else { return false } } const lockMaxLoopNum = 1000 // LoopLock 轮询等待 func (d DispersedLock) LoopLock(ctx context.Context, sleepTime int) bool { cancel, cannel := context.WithCancel(context.Background()) ticker := time.NewTicker(time.Duration(sleepTime) * time.Millisecond) count := 0 status := 0 loop: for { select { case <-cancel.Done(): break loop default: } if d.Lock(ctx) { ticker.Stop() cannel() break } else { <-ticker.C } count++ //判断是否大于最大获取次数,达到最大直接退出循环 if count >= lockMaxLoopNum { status = 1 break } } cannel() if status != 0 { return false } return true }
这些就是通过redis去实现一个分布式锁的具体步骤,很多实现,估计很多其他语言的朋友们可能会有些蒙圈。但是没有关系。
推荐使用包
import "github.com/go-redsync/redsync/v4"
这个包基本上满足了市面上分布式锁的所有需求,包括续租:(但是这里的续租需要一定的条件才能触发,这个条件要达到redis实例的最大值时才能触发)。所以为了,方便使用,建议可以自己续写一个续租的方法。
这里献上我的:
// NewLock 实例化一个分布式锁,用来实现幂等,降低重试成本 func NewLock(mutexName string) *redsync.Mutex { pool := goredis.NewPool(configuration.RedisClient) rs := redsync.New(pool) newString := uuid.NewString() lockName := "Lock:" + newString + ":" + mutexName mutex := rs.NewMutex(lockName) return mutex } // LockRelet 周期性续租,过去无可挽回,未来可以改变 // num定义时间:单位毫秒 // size定义续租的次数 func LockRelet(num int, size int, mutex *redsync.Mutex) chan bool { done := make(chan bool) if size <= 0 { return nil } go func() { ticker := time.NewTicker(time.Duration(num) * time.Millisecond) defer ticker.Stop() for size > 0 { size-- select { case <-ticker.C: extend, err := mutex.Extend() if err != nil { logS.LogM.Panicf("Failed to extend lock:", err) } else if !extend { logS.LogM.Panicf("Failed to extend lock: not successes") } case <-done: return } } }() return done }