《Go语言四十二章经》第四十章 LevelDB与BoltDB

    LevelDB 和 BoltDB 都是k/v数据库。

    但LevelDB没有事务,LevelDB实现了一个日志结构化的merge tree。它将有序的key/value存储在不同文件的之中,通过db, _ := leveldb.OpenFile(“db”, nil),在db目录下有很多数据文件,并通过“层级”把它们分开,并且周期性地将小的文件merge为更大的文件。这让其在随机写的时候会很快,但是读的时候却很慢。

    LSM树而且通过批量存储技术规避磁盘随机写入问题。 LSM树的设计思想非常朴素,它的原理是把一颗大树拆分成N棵小树, 它首先写入到内存中(内存没有寻道速度的问题,随机写的性能得到大幅提升),在内存中构建一颗有序小树,随着小树越来越大,内存的小树会flush到磁盘上。磁盘中的树定期可以做merge操作,合并成一棵大树,以优化读性能。

    BoltDB会在数据文件上获得一个文件锁,所以多个进程不能同时打开同一个数据库。BoltDB使用一个单独的内存映射的文件(.db),实现一个写入时拷贝的B+树,这能让读取更快。而且,BoltDB的载入时间很快,特别是在从crash恢复的时候,因为它不需要去通过读log去找到上次成功的事务,它仅仅从两个B+树的根节点读取ID。

    BoltDB设计源于LMDB,具有以下特点:

    • 直接使用API存取数据,没有查询语句;
    • 支持完全可序列化的ACID事务,这个特性比LevelDB强;
    • 数据保存在内存映射的文件里。没有wal、线程压缩和垃圾回收;
    • 通过COW技术,可实现无锁的读写并发,但是无法实现无锁的写写并发,这就注定了读性能超高,但写性能一般,适合与读多写少的场景。
    • 最后,BoltDB使用Golang开发,而且被应用于influxDB项目作为底层存储。

    LMDB的全称是Lightning Memory-Mapped Database(快如闪电的内存映射数据库),它的文件结构简单,包含一个数据文件和一个锁文件.
    LMDB文件可以同时由多个进程打开,具有极高的数据存取速度,访问简单,不需要运行单独的数据库管理进程,只要在访问数据的代码里引用LMDB库,访问时给文件路径即可。

    40.2 BoltDB

    1. import (
    2. "fmt"
    3. "log"
    4. "time"
    5. "github.com/boltdb/bolt"
    6. )
    7. func Boltdb() error {
    8. // Bolt 会在数据文件上获得一个文件锁,所以多个进程不能同时打开同一个数据库。
    9. // 打开一个已经打开的 Bolt 数据库将导致它挂起,直到另一个进程关闭它。
    10. // 为防止无限期等待,您可以将超时选项传递给Open()函数:
    11. db, err := bolt.Open("my.db", 0600, &bolt.Options{Timeout: 1 * time.Second})
    12. defer db.Close()
    13. if err != nil {
    14. log.Fatal(err)
    15. }
    16. // 两种处理方式:读-写和只读操作,读-写方式开始于db.Update方法:
    17. // Bolt 一次只允许一个读写事务,但是一次允许多个只读事务。
    18. // 每个事务处理都有一个始终如一的数据视图
    19. err = db.Update(func(tx *bolt.Tx) error {
    20. // 这里还有另外一层:k-v存储在bucket中,
    21. // 可以将bucket当做一个key的集合或者是数据库中的表。
    22. //(顺便提一句,buckets中可以包含其他的buckets,这将会相当有用)
    23. // Buckets 是键值对在数据库中的集合.所有在bucket中的key必须唯一,
    24. // 使用DB.CreateBucket() 函数建立buket
    25. //Tx.DeleteBucket() 删除bucket
    26. //b := tx.Bucket([]byte("MyBucket"))
    27. b, err := tx.CreateBucketIfNotExists([]byte("MyBucket"))
    28. //要将 key/value 对保存到 bucket,请使用 Bucket.Put() 函数:
    29. //这将在 MyBucket 的 bucket 中将 "answer" key的值设置为"42"。
    30. err = b.Put([]byte("answer"),[]byte("42"))
    31. return err
    32. })
    33. // 可以看到,传入db.update函数一个参数,在函数内部你可以get/set数据和处理error。
    34. // 如返回为nil,事务就会从数据库得到一个commit,但如果返回一个实际的错误,则会做回滚,
    35. // 你在函数中做的事情都不会commit。这很自然,因为你不需要人为地去关心事务的回滚,
    36. // 只需要返回一个错误,其他的由Bolt去帮你完成。
    37. // 只读事务 只读事务和读写事务不应该相互依赖,一般不应该在同一个例程中同时打开。
    38. // 这可能会导致死锁,因为读写事务需要定期重新映射数据文件,
    39. // 但只有在只读事务处于打开状态时才能这样做。
    40. // 批量读写事务.每一次新的事物都需要等待上一次事物的结束,
    41. // 可以通过DB.Batch()批处理来完
    42. err = db.Batch(func(tx *bolt.Tx) error {
    43. return nil
    44. })
    45. //只读事务在db.View函数之中:在函数中可以读取,但是不能做修改。
    46. //要检索这个value,我们可以使用 Bucket.Get() 函数:
    47. //由于Get是有安全保障的,所有不会返回错误,不存在的key返回nil
    48. b := tx.Bucket([]byte("MyBucket"))
    49. //tx.Bucket([]byte("MyBucket")).Cursor() 可这样写
    50. id, _ := b.NextSequence()
    51. fmt.Printf("The answer is: %s %d \n", v, id)
    52. //游标遍历key
    53. c := b.Cursor()
    54. for k, v := c.First(); k != nil; k, v = c.Next() {
    55. fmt.Printf("key=%s, value=%s\n", k, v)
    56. }
    57. //游标上有以下函数:
    58. //First() 移动到第一个健.
    59. //Last() 移动到最后一个健.
    60. //Seek() 移动到特定的一个健.
    61. //Next() 移动到下一个健.
    62. //Prev() 移动到上一个健.
    63. //Prefix 前缀扫描
    64. prefix := []byte("1234")
    65. for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() {
    66. fmt.Printf("key=%s, value=%s\n", k, v)
    67. }
    68. return nil
    69. })
    70. //范围查找
    71. //另一个常见的用例是扫描范围,例如时间范围。如果你使用一个合适的时间编码,如rfc3339然后可以查询特定日期范围的数据:
    72. db.View(func(tx *bolt.Tx) error {
    73. // Assume our events bucket exists and has RFC3339 encoded time keys.
    74. c := tx.Bucket([]byte("Events")).Cursor()
    75. // Our time range spans the 90's decade.
    76. min := []byte("1990-01-01T00:00:00Z")
    77. max := []byte("2000-01-01T00:00:00Z")
    78. // Iterate over the 90's.
    79. for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
    80. fmt.Printf("%s: %s\n", k, v)
    81. }
    82. return nil
    83. })
    84. //如果你知道所在桶中拥有键,你也可以使用ForEach()来迭代:
    85. db.View(func(tx *bolt.Tx) error {
    86. b := tx.Bucket([]byte("MyBucket"))
    87. b.ForEach(func(k, v []byte) error {
    88. fmt.Printf("key=%s, value=%s\n", k, v)
    89. return nil
    90. })
    91. return nil
    92. })
    93. //事务处理
    94. // 开始事务
    95. tx, err := db.Begin(true)
    96. if err != nil {
    97. return err
    98. }
    99. _, err = tx.CreateBucket([]byte("MyBucket"))
    100. if err != nil {
    101. return err
    102. }
    103. // 事务提交
    104. if err = tx.Commit(); err != nil {
    105. return err
    106. }
    107. return err
    108. //还可以在一个键中存储一个桶,以创建嵌套的桶:
    109. //func (*Bucket) CreateBucket(key []byte) (*Bucket, error)
    110. //func (*Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error)
    111. //func (*Bucket) DeleteBucket(key []byte) error
    112. }
    113. //备份 curl http://localhost/backup > my.db
    114. func BackupHandleFunc(w http.ResponseWriter, req *http.Request) {
    115. err := db.View(func(tx *bolt.Tx) error {
    116. w.Header().Set("Content-Type", "application/octet-stream")
    117. w.Header().Set("Content-Disposition", `attachment; filename="my.db"`)
    118. w.Header().Set("Content-Length", strconv.Itoa(int(tx.Size())))
    119. _, err := tx.WriteTo(w)
    120. return err
    121. })
    122. if err != nil {
    123. http.Error(w, err.Error(), http.StatusInternalServerError)
    124. }
    125. }
    126. //桶的自增
    127. //利用nextsequence()功能,你可以让boltdb生成序列作为你键值对的唯一标识。见下面的示例。
    128. func (s *Store) CreateUser(u *User) error {
    129. return s.db.Update(func(tx *bolt.Tx) error {
    130. // 创建users桶
    131. b := tx.Bucket([]byte("users"))
    132. // 生成自增序列
    133. id, _ = b.NextSequence()
    134. u.ID = int(id)
    135. // Marshal user data into bytes.
    136. buf, err := Json.Marshal(u)
    137. if err != nil {
    138. return err
    139. }
    140. // Persist bytes to users bucket.
    141. return b.Put(itob(u.ID), buf)
    142. })
    143. }
    144. // itob returns an 8-byte big endian representation of v.
    145. func itob(v int) []byte {
    146. b := make([]byte, 8)
    147. binary.BigEndian.PutUint64(b, uint64(v))
    148. return b
    149. }
    150. type User struct {
    151. ID int