原创分享 使用 go 理解 Lock-Free

chrisho · 2020年02月22日 · 最后由 Liyh01 回复于 2020年02月23日 · 957 次阅读
本帖已被设为精华帖!

Lock-Free(无锁编程)

锁是编程中常用的技术, 通常应用于共享内存, 多个线程向同一资源操作往往会发生很多问题, 为了防止这些问题只能用到锁解决. 虽然锁可以解决, 但是在高并发的场景下, 可能会造成性能瓶颈. 无锁编程目前大多数都是基于atomic实现, atomic能够保证数据的正确性, sync.Mutex也有 Lock-Free 的影子.

无锁编程是什么?

<<The Art of Multiprocessor Programming>>书中的定义:
"如果一个方法是无锁的,它保证线程无限次调用这个方法都能够在有限步内完成。"

成为无锁的条件:

  1. 是多线程.
  2. 多个线程访问共享内存.
  3. 不会令其它线程造成阻塞.

go 中如果有一个方法里操作栈数据, 如果没有锁肯定会导致竞争发生, 加上锁又不会是无锁. 无锁编程是一个既复杂又具有挑战性的活, 究竟如何写一个无锁代码?

实现 Lock-Free

type Config struct {
    sync.RWMutex
    endpoint string
}

func BenchmarkPMutexSet(b *testing.B) {
    config := Config{}
    b.ReportAllocs()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            config.Lock()
            config.endpoint = "api.example.com"
            config.Unlock()
        }
    })
}

func BenchmarkPMutexGet(b *testing.B) {
    config := Config{endpoint: "api.example.com"}
    b.ReportAllocs()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            config.RLock()
            _ = config.endpoint
            config.RUnlock()
        }
    })
}

func BenchmarkPAtomicSet(b *testing.B) {
    var config atomic.Value
    c := Config{endpoint: "api.example.com"}
    b.ReportAllocs()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            config.Store(c)
        }
    })
}

func BenchmarkPAtomicGet(b *testing.B) {
    var config atomic.Value
    config.Store(Config{endpoint: "api.example.com"})
    b.ReportAllocs()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            _ = config.Load().(Config)
        }
    })
}

看看结果

BenchmarkPMutexSet-8            19403011                61.6 ns/op             0 B/op          0 allocs/op
BenchmarkPMutexGet-8            35671380                32.7 ns/op             0 B/op          0 allocs/op
BenchmarkPAtomicSet-8           32477751                37.0 ns/op            48 B/op          1 allocs/op
BenchmarkPAtomicGet-8           1000000000               0.247 ns/op           0 B/op          0 allocs/op

比较结果相当明确, 确实是快. 上面只是一个最简单的实现, 看看 Lock-Free Stack.

实现 Lock-Free Stack

先看一下锁实现的栈

var mu sync.Mutex

type LStack struct {
    Next *LStack
    Item int
}

func (head *LStack) Push(i int) {
    mu.Lock()
    defer mu.Unlock()

    new := &LStack{Item: i}
    new.Next = head.Next
    head.Next = new
}

func (head *LStack) Pop() int {
    mu.Lock()
    defer mu.Unlock()

    old := head.Next
    if old == nil {
        return 0
    }

    new := head.Next
    head.Next = new

    return old.Item
}

LStack实现PushPop方法, 两个方法都加上锁, 防止竞争.

下面是 Lock-Free Stack

type LFStack struct {
    Next unsafe.Pointer
    Item int
}

var lfhead unsafe.Pointer // 记录栈头信息

func (head *LFStack) Push(i int) *LFStack { // 强制逃逸
    new := &LFStack{Item: i}
    newptr := unsafe.Pointer(new)
    for {
        old := atomic.LoadPointer(&lfhead)
        new.Next = old

        if atomic.CompareAndSwapPointer(&lfhead, old, newptr) {
            break
        }
    }
    return new
}

func (head *LFStack) Pop() int {
    for {
        time.Sleep(time.Nanosecond) // 可以让CPU缓一缓
        old := atomic.LoadPointer(&lfhead)
        if old == nil {
            return 0
        }

        if lfhead == old {
            new := (*LFStack)(old).Next
            if atomic.CompareAndSwapPointer(&lfhead, old, new) {
                return 1
            }
        }
    }
}

LFStack也实现了PushPop方法, 虽然没有加锁, 也可以保证返回数据的正确性. 对比锁实现的方法来看, 是逻辑要复杂得多. 由于循环使 CPU 压力增大, 可以用time.Sleep暂停一下.

runtime/lfstack.go

最近在研究 gc 时发现 go 源码有用到 Lock-Free Stack, 在runtime/lfstack.go

type lfstack uint64

func (head *lfstack) push(node *lfnode) {
    node.pushcnt++
    new := lfstackPack(node, node.pushcnt)
    if node1 := lfstackUnpack(new); node1 != node {
        print("runtime: lfstack.push invalid packing: node=", node, " cnt=", hex(node.pushcnt), " packed=", hex(new), " -> node=", node1, "\n")
        throw("lfstack.push")
    }
    for {
        old := atomic.Load64((*uint64)(head))
        node.next = old
        if atomic.Cas64((*uint64)(head), old, new) {
            break
        }
    }
}

func (head *lfstack) pop() unsafe.Pointer {
    for {
        old := atomic.Load64((*uint64)(head))
        if old == 0 {
            return nil
        }
        node := lfstackUnpack(old)
        next := atomic.Load64(&node.next)
        if atomic.Cas64((*uint64)(head), old, next) {
            return unsafe.Pointer(node)
        }
    }
}

func (head *lfstack) empty() bool {
    return atomic.Load64((*uint64)(head)) == 0
}

func lfnodeValidate(node *lfnode) {
    if lfstackUnpack(lfstackPack(node, ^uintptr(0))) != node {
        printlock()
        println("runtime: bad lfnode address", hex(uintptr(unsafe.Pointer(node))))
        throw("bad lfnode address")
    }
}

lfstack主要是用于对 gc 时保存灰色对象, 有兴趣的可以看看.

小结

Lock-Free 的实现还有很多种, Lock-Free Stack 只是其中之一. 在日常的编程中, 基本上用sync.Mutex可以满足需求, 不要强制项目使用 Lock-Free, 可以选择在负载高的方法考虑使用, 由于实现复杂有可能性能也不及锁. 在 benchmark 测试LFStackLStack发现, 前者的性能不及后者, 所以不是无锁都好用. 如果大家有兴趣可以研究一下无锁队列.

更多原创文章干货分享,请关注公众号
  • 加微信实战群请加微信(注明:实战群):gocnio
kevin 将本帖设为了精华贴 02月22日 20:37
lwhile GoCN 每日新闻 (2020-02-23) 中提及了此贴 02月23日 12:30

哈喽 本篇文章中的源码有吗,可以学习一下吗?

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册