[db/BoltDB] 2 - BoltDB Transaction

Posted by Dongbo on December 28, 2021

本篇将介绍 BoltDB 事务的实现。本来看到 etcd 中有 MVCC 机制,还以为是底层存储的 BoltDB 本身的 MVCC 机制。没想到其实 etcd 的 MVCC 是使用 BoltDB 存储不同版本的数据来实现的,BoltDB 本身并没有实现 MVCC 机制。粗略看了一遍代码之后,感觉 BoltDB 中的 kv 没有存储版本号或时间戳,它只支持并发读,写事务之间是互斥的;读写并发的时候,写事务会将 bucket 拷贝一份,在拷贝出的数据上进行修改,最后提交时再统一写进文件里。写入操作是直接将新数据页写入磁盘上新分配的空间,不会干扰正在执行的只读事务,它们也不会看到新的修改;写事务最后再将 meta 的更新写入磁盘,这项操作完成后,之后的只读事务便能看到这次的修改。

这么看来 BoltDB 实现的应当是 shadow paging 机制(确实没有用日志),来保证原子性和持久性的;而 shadow paging + 单线程写事务自然的实现了可串行化的隔离级别,隔离性也由此得到保障。

BoltDB 只读事务和读写事务

BoltDB 将事务分为 ReadTxn 和 ReadWriteTxn,为了方便下面将其称为 读事务写事务。提到写事务时注意它不但能 ,也能 即可。 下面来看看文档给出的 BoltDB 读写操作的示例代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func Test_boltDB_start(t *testing.T){
  db, err := bolt.Open("/tmp/boltdb", 0666, nil)
  assert.Nil(t, err)
  defer db.Close()

  bkt := []byte("bucket1")
  key := []byte("date")
  val := []byte("2021-12-27 14:11")
  err = db.Update(func(tx *bolt.Tx) error {   // open a read/write txn
    b, err := tx.CreateBucketIfNotExists(bkt)
    assert.Nil(t, err)
    err = b.Put(key, val)
    return err
  })
  assert.Nil(t, err)

  err = db.View(func(tx *bolt.Tx) error {   // read only txn
    b := tx.Bucket(bkt)
    v := b.Get(key)
    assert.Equal(t, val, v)
    t.Logf("get val:%s", string(v))
    return nil
  })
  assert.Nil(t, err)
}

产生写事务的tx.Update() 函数开启一个写事务,该函数内部用 tx.Begin() 和 tx.Commit/tx.Rollback 包裹我们定义的匿名函数(对数据项进行读写删除等操作),通过一个互斥锁来保证写事务的串行;而读事务的 tx.View()则开启一个只读事务,不需要提交操作,甚至出错后调用的 Rollback() 也仅仅将读事务从事务列表中删除而不用做其他操作。

分隔数据的单元:Bucket

在以上示例中我们看到,读和写数据都需要经过 Bucket 来完成。代码中这部分的命名含义有些混乱不便于理解,注释部分说的是 Bucket represents a collection of key/value pairs inside the database. 结构体的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Bucket struct {
	*bucket
	tx       *Tx                // the associated transaction
	buckets  map[string]*Bucket // subbucket cache
	page     *page              // inline page reference
	rootNode *node              // materialized node for the root page.
	nodes    map[pgid]*node     // node cache、
	FillPercent float64
}

// bucket represents the on-file representation of a bucket.
// This is stored as the "value" of a bucket key. If the bucket is small enough,
// then its root page can be stored inline in the "value", after the bucket
// header. In the case of inline buckets, the "root" will be 0.
type bucket struct {
	root     pgid   // page id of the bucket's root-level page
	sequence uint64 // monotonically incrementing, used by NextSequence()
}

打断点逐行运行后,发现可以把一个 Bucket 当作关系型数据库中的一张表,每个 Bucket 中有一个 B+ 树索引;对同一个 Bucket 下同一个 key 的 Put 操作只会更新它的 value 而不会写入两个相同的 key,但不同的 Bucket 内可以存在相同的 key 映射到不同的 value(毕竟是两张“表”,索引和数据相互独立)。

dirty page 的产生

读事务不会产生 dirty page,因此读事务可以并发执行,故 BoltDB 适合用于读多写少的场景。话虽如此,我们还是需要正确处理少量写事务出现时产生 dirty page 的情况。本意是想先介绍事务再介绍 B+ 树的,但是果然还是绕不开,此处先以最小程度涉及 B+ 树的方式介绍 dirty page 如何产生,B+ 树更详细的介绍就留到下一篇文章。

  • 情况一:修改 k/v 产生的 dirty page

    BoltDB 中的 k/v 是存放在 B+ 树叶节点中的(也可以称 B+ 树实现了稠密索引),修改 k/v 会导致叶节点对应的 page 被为 dirty page,shadow paging 机制需要我们把叶节点写到一个新页面中,因此 parent 中的指针需要指向新的页面,也就是说 parent 也会成为 dirty page,此过程不断递归,最后从根节点到被修改叶节点这一路径上的所有 page 都会变为 dirty page;

    update-kv-dirty

  • 情况二:插入或删除 k/v 产生的 dirty page

    另外写事务中对 k/v 进行增删都可能引起索引结构的改变,索引中相关的页面都会变成 dirty page,需要先拷贝原有 page,修改后写到新页面中,这一过程需要不断向上递归进行,哪怕某个节点中容量足够,但是因为 inode 中指针改变了(因为子页面改变了),该节点依然变为 dirty,就变为上一段描述的情况;如果某个页面元素被全部删除,那么这个页会被 freeList 回收。

    delete-kv-dirty-1

    delete-kv-dirty-2

可以看到以上两种情况,只要我们在写事务中进行了任意的 k/v 写操作,都会将根节点到相应叶节点路径上的所有页面变为 dirty page。但是 node 中没有 dirty 标记,也没有 BufferPool 来记录页面是否 dirty(缓存全权交由 mmap 来管理),它是如何将读操作和写操作区分开的呢?

BoltDB 中收集 dirty page 是通过 Spill() 函数完成的,tx.Commit()中调用的是 Bucket.spill() 对 Bucket 下所有的 dirty page 进行收集,而实际的工作其实是 node.spill() 来完成的。删除操作只有在k/v数量少于2时,page才会被回收,即放入 freeList 中;没有造成页面回收的 delete 操作只会产生脏页,处理步骤与 update 一致:都是通过 spill 操作来收集。

注意:我们提到的 page 是指从磁盘读到内存中的一段 4096 字节的数据,而 node 是解析 page 中的 raw data 得到的结构体,附带上一些元信息(parent指针、isLeaf标识、是否经过回收的spilled标记等)便于 B+ 树进行增删改等操作,二者可以通过 node.read(page)/node.write(page) 相互转换。

通过 spill() 收集 dirty page

下面我们通过代码来看 spill() 函数都做了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// spill writes the nodes to dirty pages and splits nodes as it goes.
// Returns an error if dirty pages cannot be allocated.
func (n *node) spill() error {
  var tx = n.bucket.tx
  if n.spilled {
  	return nil
  }
  
  sort.Sort(n.children)
  for i := 0; i < len(n.children); i++ {
  	if err := n.children[i].spill(); err != nil {
  		return err
  	}
  }
  
  // We no longer need the child list because it's only used for spill tracking.
  n.children = nil
  
  ...
  
  return nil
}

首先 spill 先递归调用子节点的 spill() 函数,因此最终我们会来到叶节点来进行 spill 操作,随后不断向上回溯直到返回根节点。这里的 children []*node 数组是在 spill 过程中根据 inode 临时生成的子节点 node 缓存(在 node.split() -> node.splitTow()中;另外在通过 Cursor 往下搜索的过程中也可能生成 children),主要工作就是辅助完成 spill 的任务,一旦 node 中的所有子节点 spill 结束,该数组也将被丢弃。

接下来是 spill 实际的工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (n *node) spill() error {
  ...
  var nodes = n.split(uintptr(tx.db.pageSize))
  for _, node := range nodes {
  	// Add node's page to the freelist if it's not new.
  	if node.pgid > 0 {
	  tx.db.freelist.free(tx.meta.txid, tx.page(node.pgid))
	  node.pgid = 0
  	}
  
  	// Allocate contiguous space for the node.
  	p, err := tx.allocate((node.size() + tx.db.pageSize - 1) / tx.db.pageSize)
  	if err != nil {
  		return err
  	}
  
  	// Write the node.
  	if p.id >= tx.meta.pgid {
  		panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", p.id, tx.meta.pgid))
  	}
  	node.pgid = p.id
  	node.write(p)
  	node.spilled = true
  
  	// Insert into parent inodes.
  	if node.parent != nil {
	  var key = node.key
	  if key == nil {
	  	key = node.inodes[0].key
	  }

	  node.parent.put(key, node.inodes[0].key, nil, node.pgid, 0)
	  node.key = node.inodes[0].key
	  _assert(len(node.key) > 0, "spill: zero-length node key")
  	}
  
  }
  
  // If the root node split and created a new root then we need to spill that
  // as well. We'll clear out the children to make sure it doesn't try to respill.
  if n.parent != nil && n.parent.pgid == 0 {
  	n.children = nil
  	return n.parent.spill()
  }
  
  return nil
}

对当前节点的 spill 操作流程如下:

  1. 首先判断 node 是否需要分割,如果 node 容量超过阈值则通过 split() 函数分割成若干个 node(记得新产生的 node.parent 也指向 old_node.parent);若无需分割则 split() 返回包含 node 本身的 nodes 数组;
  2. 判断 node 对应页面是否为旧页面,如果是旧的则将该页面放入 freeList 中表示可以被回收(关于 freeList 我们将在BoltDB freeList展开介绍,因为接下来我们马上会将这个页面的内容写入一个新页面中;

  3. 对于nodes 数组的每个 node,都向 db 申请一块新页面的空间,将该页面放入事务的 dirty page 列表中(Tx.pages);
  4. 把 node 内容写入刚开辟的 page 中;
  5. 更新 parent 中的子节点信息,即将 node 的新 page id 写入 parent 的 inode 数组;
  6. 最后一步是检查根节点分裂生成新 root 的情况,如果旧的根节点分裂产生了新的根节点,也要对其进行 spill。

完成 spill 操作之后,所有的 dirty page 都会出现在写事务的 pages 列表中,只要提交时一并将其都写入数据库文件,随后再更新 meta 信息,告知其他事务此 Bucket 新的 root node 所在页面编号,其他事务再读取数据时即可看见本次写事务产生的修改。

以上就是 BoltDB 写事务处理 dirty page 的大致逻辑。不过还有一个没完全解决的问题是,写事务中如何确定要对哪些节点运行 spill() 函数呢。根据上述分析,根节点肯定会成为 spill 的对象,但是如何确定跟节点下哪些节点需要 spill 呢;另外如果在写事务中进行了读取操作,这个过程肯定不应该产生 dirty page,又该如何将这种情况与发生了写的情况区分开呢?

这个问题的答案其实藏在上面我们对 node.children 数组介绍中。记得我们前面提到 children 数组是专门用于辅助 spill 工作的,会在分裂过程中生成;但最初的 spill 过程是需要通过 children 走到叶节点才开始的。所以我们还需要知道最初的 children 元素是从何而来。

答案是 Cursor 在处理 Put 请求的过程中产生的。我们先来看 Bucket.Put() 函数的代码逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (b *Bucket) Put(key []byte, value []byte) error {
    ...
	// Move cursor to correct position.
	c := b.Cursor()
	k, _, flags := c.seek(key)

    ...

	// Insert into node.
	key = cloneBytes(key)
	c.node().put(key, key, value, 0, 0)

	return nil
}

这里先将 Cursor 移动到 key 对应的叶节点,这一过程中 Cursor 内部会维护一个 stack 来存放走过的节点;随后通过 Cursor.node() 获取 Cursor 当前所指向的 node,并且向其中写入 k/v。正是这个 Cursor.node() 函数,每走过一个节点,就把 node 放到 parent.chilldren 数组中(同时还会放到 Bucket.nodes 中缓存),因此我们每进行一次修改,对应路径上的 node 节点 children 数组中就加入了被修改的 child。得到这一信息之后,我们就可以在 root node 上运行 spill() 函数,从而递归的对被修改的 children 进行收集了。

另外,之所以能够区分读操作,是因为 Get() 函数中没有使用 Cursor.node():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (b *Bucket) Get(key []byte) []byte {
	k, v, flags := b.Cursor().seek(key)

	// Return nil if this is a bucket.
	if (flags & bucketLeafFlag) != 0 {
		return nil
	}

	// If our target node isn't the same key as what's passed in then return nil.
	if !bytes.Equal(key, k) {
		return nil
	}
	return v
}

这里是通过 Cursor.seek(key) 直接获取对应的 value 的。seek() 函数直接解析 page 的原始数据进行二分查找,不用转化为 node,可以直接从 mmap 的页面数据中读取。对于 Cursor.node() 生成 children 的细节,可以看 Cursor.node() -> Bucket.node() 以及 node.childAt() 几个函数调用链,这里就不贴代码了。

// TODO:unsafe.Pointer 转成的 []byte 是深拷贝吗

// TODO:cursor 的 stack 图

另外关于小数据量时使用的 inline bucket 优化这里其实没有提到,以后再补充吧。

一锤定音:Commit

后悔药:Rollback

// TODO: 待补充

那么关于 BoltDB 事务的介绍就到这里了,我还是头一次看到 shadow paging 的实际代码实现,逻辑上比较容易理解实现方式也简单,但是对于长时间运行、吞吐量大的数据库系统这种方式可能不适用(一拍脑袋觉得 shadow paging 占用的内存和磁盘空间都比使用 logging 要大)。另外 BoltDB 的 B+ 树实现与原版也略有不同,除了前面提到的 split 操作,删除数据导致的 rebalance 也是一个值得学习的地方,就留到下一篇文章中再介绍吧。今天就到这里了。

The End