| package bbolt_test |
| |
| import ( |
| "bytes" |
| crand "crypto/rand" |
| "encoding/hex" |
| "encoding/json" |
| "fmt" |
| "io" |
| mrand "math/rand" |
| "os" |
| "path/filepath" |
| "sort" |
| "strings" |
| "sync" |
| "testing" |
| "time" |
| "unicode/utf8" |
| |
| "github.com/stretchr/testify/require" |
| "golang.org/x/sync/errgroup" |
| |
| bolt "go.etcd.io/bbolt" |
| ) |
| |
| const ( |
| bucketPrefix = "bucket" |
| keyPrefix = "key" |
| noopTxKey = "%magic-no-op-key%" |
| |
| // TestConcurrentCaseDuration is used as a env variable to specify the |
| // concurrent test duration. |
| testConcurrentCaseDuration = "TEST_CONCURRENT_CASE_DURATION" |
| defaultConcurrentTestDuration = 30 * time.Second |
| ) |
| |
| type duration struct { |
| min time.Duration |
| max time.Duration |
| } |
| |
| type bytesRange struct { |
| min int |
| max int |
| } |
| |
| type operationChance struct { |
| operation OperationType |
| chance int |
| } |
| |
| type concurrentConfig struct { |
| bucketCount int |
| keyCount int |
| workInterval duration |
| operationRatio []operationChance |
| readInterval duration // only used by readOperation |
| noopWriteRatio int // only used by writeOperation |
| writeBytes bytesRange // only used by writeOperation |
| } |
| |
| /* |
| TestConcurrentGenericReadAndWrite verifies: |
| 1. Repeatable read: a read transaction should always see the same data |
| view during its lifecycle. |
| 2. Any data written by a writing transaction should be visible to any |
| following reading transactions (with txid >= previous writing txid). |
| 3. The txid should never decrease. |
| */ |
| func TestConcurrentGenericReadAndWrite(t *testing.T) { |
| if testing.Short() { |
| t.Skip("skipping test in short mode.") |
| } |
| |
| testDuration := concurrentTestDuration(t) |
| conf := concurrentConfig{ |
| bucketCount: 5, |
| keyCount: 10000, |
| workInterval: duration{}, |
| operationRatio: []operationChance{ |
| {operation: Read, chance: 60}, |
| {operation: Write, chance: 20}, |
| {operation: Delete, chance: 20}, |
| }, |
| readInterval: duration{ |
| min: 50 * time.Millisecond, |
| max: 100 * time.Millisecond, |
| }, |
| noopWriteRatio: 20, |
| writeBytes: bytesRange{ |
| min: 200, |
| max: 16000, |
| }, |
| } |
| |
| testCases := []struct { |
| name string |
| workerCount int |
| conf concurrentConfig |
| testDuration time.Duration |
| }{ |
| { |
| name: "1 worker", |
| workerCount: 1, |
| conf: conf, |
| testDuration: testDuration, |
| }, |
| { |
| name: "10 workers", |
| workerCount: 10, |
| conf: conf, |
| testDuration: testDuration, |
| }, |
| { |
| name: "50 workers", |
| workerCount: 50, |
| conf: conf, |
| testDuration: testDuration, |
| }, |
| { |
| name: "100 workers", |
| workerCount: 100, |
| conf: conf, |
| testDuration: testDuration, |
| }, |
| { |
| name: "200 workers", |
| workerCount: 200, |
| conf: conf, |
| testDuration: testDuration, |
| }, |
| } |
| |
| for _, tc := range testCases { |
| tc := tc |
| t.Run(tc.name, func(t *testing.T) { |
| concurrentReadAndWrite(t, |
| tc.workerCount, |
| tc.conf, |
| tc.testDuration) |
| }) |
| } |
| } |
| |
| func concurrentTestDuration(t *testing.T) time.Duration { |
| durationInEnv := strings.ToLower(os.Getenv(testConcurrentCaseDuration)) |
| if durationInEnv == "" { |
| t.Logf("%q not set, defaults to %s", testConcurrentCaseDuration, defaultConcurrentTestDuration) |
| return defaultConcurrentTestDuration |
| } |
| |
| d, err := time.ParseDuration(durationInEnv) |
| if err != nil { |
| t.Logf("Failed to parse %s=%s, error: %v, defaults to %s", testConcurrentCaseDuration, durationInEnv, err, defaultConcurrentTestDuration) |
| return defaultConcurrentTestDuration |
| } |
| |
| t.Logf("Concurrent test duration set by %s=%s", testConcurrentCaseDuration, d) |
| return d |
| } |
| |
| func concurrentReadAndWrite(t *testing.T, |
| workerCount int, |
| conf concurrentConfig, |
| testDuration time.Duration) { |
| |
| t.Log("Preparing db.") |
| db := mustCreateDB(t, &bolt.Options{ |
| PageSize: 4096, |
| }) |
| defer db.Close() |
| err := db.Update(func(tx *bolt.Tx) error { |
| for i := 0; i < conf.bucketCount; i++ { |
| if _, err := tx.CreateBucketIfNotExists(bucketName(i)); err != nil { |
| return err |
| } |
| } |
| return nil |
| }) |
| require.NoError(t, err) |
| |
| var records historyRecords |
| // t.Failed() returns false during panicking. We need to forcibly |
| // save data on panicking. |
| // Refer to: https://github.com/golang/go/issues/49929 |
| panicked := true |
| defer func() { |
| t.Log("Save data if failed.") |
| saveDataIfFailed(t, db, records, panicked) |
| }() |
| |
| t.Log("Starting workers.") |
| records = runWorkers(t, |
| db, |
| workerCount, |
| conf, |
| testDuration) |
| |
| t.Log("Analyzing the history records.") |
| if err := validateSequential(records); err != nil { |
| t.Errorf("The history records are not sequential:\n %v", err) |
| } |
| |
| t.Log("Checking database consistency.") |
| if err := checkConsistency(t, db); err != nil { |
| t.Errorf("The data isn't consistency: %v", err) |
| } |
| |
| panicked = false |
| // TODO (ahrtr): |
| // 1. intentionally inject a random failpoint. |
| } |
| |
| // mustCreateDB is created in place of `btesting.MustCreateDB`, and it's |
| // only supposed to be used by the concurrent test case. The purpose is |
| // to ensure the test case can be executed on old branches or versions, |
| // e.g. `release-1.3` or `1.3.[5-7]`. |
| func mustCreateDB(t *testing.T, o *bolt.Options) *bolt.DB { |
| f := filepath.Join(t.TempDir(), "db") |
| |
| return mustOpenDB(t, f, o) |
| } |
| |
| func mustReOpenDB(t *testing.T, db *bolt.DB, o *bolt.Options) *bolt.DB { |
| f := db.Path() |
| |
| t.Logf("Closing bbolt DB at: %s", f) |
| err := db.Close() |
| require.NoError(t, err) |
| |
| return mustOpenDB(t, f, o) |
| } |
| |
| func mustOpenDB(t *testing.T, dbPath string, o *bolt.Options) *bolt.DB { |
| t.Logf("Opening bbolt DB at: %s", dbPath) |
| if o == nil { |
| o = bolt.DefaultOptions |
| } |
| |
| freelistType := bolt.FreelistArrayType |
| if env := os.Getenv("TEST_FREELIST_TYPE"); env == string(bolt.FreelistMapType) { |
| freelistType = bolt.FreelistMapType |
| } |
| |
| o.FreelistType = freelistType |
| |
| db, err := bolt.Open(dbPath, 0600, o) |
| require.NoError(t, err) |
| |
| return db |
| } |
| |
| func checkConsistency(t *testing.T, db *bolt.DB) error { |
| return db.View(func(tx *bolt.Tx) error { |
| cnt := 0 |
| for err := range tx.Check() { |
| t.Errorf("Consistency error: %v", err) |
| cnt++ |
| } |
| if cnt > 0 { |
| return fmt.Errorf("%d consistency errors found", cnt) |
| } |
| return nil |
| }) |
| } |
| |
| /* |
| ********************************************************* |
| Data structures and functions/methods for running concurrent |
| workers, which execute different operations, including `Read`, |
| `Write` and `Delete`. |
| ********************************************************* |
| */ |
| func runWorkers(t *testing.T, |
| db *bolt.DB, |
| workerCount int, |
| conf concurrentConfig, |
| testDuration time.Duration) historyRecords { |
| stopCh := make(chan struct{}, 1) |
| errCh := make(chan error, workerCount) |
| |
| var mu sync.Mutex |
| var rs historyRecords |
| |
| g := new(errgroup.Group) |
| for i := 0; i < workerCount; i++ { |
| w := &worker{ |
| id: i, |
| db: db, |
| |
| conf: conf, |
| |
| errCh: errCh, |
| stopCh: stopCh, |
| t: t, |
| } |
| g.Go(func() error { |
| wrs, err := runWorker(t, w, errCh) |
| mu.Lock() |
| rs = append(rs, wrs...) |
| mu.Unlock() |
| return err |
| }) |
| } |
| |
| t.Logf("Keep all workers running for about %s.", testDuration) |
| select { |
| case <-time.After(testDuration): |
| case <-errCh: |
| } |
| |
| close(stopCh) |
| t.Log("Waiting for all workers to finish.") |
| if err := g.Wait(); err != nil { |
| t.Errorf("Received error: %v", err) |
| } |
| |
| return rs |
| } |
| |
| func runWorker(t *testing.T, w *worker, errCh chan error) (historyRecords, error) { |
| rs, err := w.run() |
| if len(rs) > 0 && err == nil { |
| if terr := validateIncrementalTxid(rs); terr != nil { |
| txidErr := fmt.Errorf("[%s]: %w", w.name(), terr) |
| t.Error(txidErr) |
| errCh <- txidErr |
| return rs, txidErr |
| } |
| } |
| return rs, err |
| } |
| |
| type worker struct { |
| id int |
| db *bolt.DB |
| |
| conf concurrentConfig |
| |
| errCh chan error |
| stopCh chan struct{} |
| |
| t *testing.T |
| } |
| |
| func (w *worker) name() string { |
| return fmt.Sprintf("worker-%d", w.id) |
| } |
| |
| func (w *worker) run() (historyRecords, error) { |
| var rs historyRecords |
| |
| ticker := time.NewTicker(1 * time.Second) |
| defer ticker.Stop() |
| |
| for { |
| select { |
| case <-w.stopCh: |
| return rs, nil |
| default: |
| } |
| |
| err := w.db.Update(func(tx *bolt.Tx) error { |
| for { |
| op := w.pickOperation() |
| bucket, key := w.pickBucket(), w.pickKey() |
| rec, eerr := executeOperation(op, tx, bucket, key, w.conf) |
| if eerr != nil { |
| opErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, eerr) |
| w.t.Error(opErr) |
| w.errCh <- opErr |
| return opErr |
| } |
| |
| rs = append(rs, rec) |
| if w.conf.workInterval != (duration{}) { |
| time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max)) |
| } |
| |
| select { |
| case <-ticker.C: |
| return nil |
| case <-w.stopCh: |
| return nil |
| default: |
| } |
| } |
| }) |
| if err != nil { |
| return rs, err |
| } |
| } |
| } |
| |
| func (w *worker) pickBucket() []byte { |
| return bucketName(mrand.Intn(w.conf.bucketCount)) |
| } |
| |
| func bucketName(index int) []byte { |
| bucket := fmt.Sprintf("%s_%d", bucketPrefix, index) |
| return []byte(bucket) |
| } |
| |
| func (w *worker) pickKey() []byte { |
| key := fmt.Sprintf("%s_%d", keyPrefix, mrand.Intn(w.conf.keyCount)) |
| return []byte(key) |
| } |
| |
| func (w *worker) pickOperation() OperationType { |
| sum := 0 |
| for _, op := range w.conf.operationRatio { |
| sum += op.chance |
| } |
| roll := mrand.Int() % sum |
| for _, op := range w.conf.operationRatio { |
| if roll < op.chance { |
| return op.operation |
| } |
| roll -= op.chance |
| } |
| panic("unexpected") |
| } |
| |
| func executeOperation(op OperationType, tx *bolt.Tx, bucket []byte, key []byte, conf concurrentConfig) (historyRecord, error) { |
| switch op { |
| case Read: |
| return executeRead(tx, bucket, key, conf.readInterval) |
| case Write: |
| return executeWrite(tx, bucket, key, conf.writeBytes, conf.noopWriteRatio) |
| case Delete: |
| return executeDelete(tx, bucket, key) |
| default: |
| panic(fmt.Sprintf("unexpected operation type: %s", op)) |
| } |
| } |
| |
| func executeRead(tx *bolt.Tx, bucket []byte, key []byte, readInterval duration) (historyRecord, error) { |
| var rec historyRecord |
| |
| b := tx.Bucket(bucket) |
| |
| initialVal := b.Get(key) |
| time.Sleep(randomDurationInRange(readInterval.min, readInterval.max)) |
| val := b.Get(key) |
| |
| if !bytes.Equal(initialVal, val) { |
| return rec, fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q", |
| string(key), formatBytes(initialVal), formatBytes(val)) |
| } |
| |
| clonedVal := make([]byte, len(val)) |
| copy(clonedVal, val) |
| |
| rec = historyRecord{ |
| OperationType: Read, |
| Bucket: string(bucket), |
| Key: string(key), |
| Value: clonedVal, |
| Txid: tx.ID(), |
| } |
| |
| return rec, nil |
| } |
| |
| func executeWrite(tx *bolt.Tx, bucket []byte, key []byte, writeBytes bytesRange, noopWriteRatio int) (historyRecord, error) { |
| var rec historyRecord |
| |
| if mrand.Intn(100) < noopWriteRatio { |
| // A no-op write transaction has two consequences: |
| // 1. The txid increases by 1; |
| // 2. Two meta pages point to the same root page. |
| rec = historyRecord{ |
| OperationType: Write, |
| Bucket: string(bucket), |
| Key: noopTxKey, |
| Value: nil, |
| Txid: tx.ID(), |
| } |
| return rec, nil |
| } |
| |
| b := tx.Bucket(bucket) |
| |
| valueBytes := randomIntInRange(writeBytes.min, writeBytes.max) |
| v := make([]byte, valueBytes) |
| if _, cErr := crand.Read(v); cErr != nil { |
| return rec, cErr |
| } |
| |
| putErr := b.Put(key, v) |
| if putErr == nil { |
| rec = historyRecord{ |
| OperationType: Write, |
| Bucket: string(bucket), |
| Key: string(key), |
| Value: v, |
| Txid: tx.ID(), |
| } |
| } |
| |
| return rec, putErr |
| } |
| |
| func executeDelete(tx *bolt.Tx, bucket []byte, key []byte) (historyRecord, error) { |
| var rec historyRecord |
| |
| b := tx.Bucket(bucket) |
| |
| err := b.Delete(key) |
| if err == nil { |
| rec = historyRecord{ |
| OperationType: Delete, |
| Bucket: string(bucket), |
| Key: string(key), |
| Txid: tx.ID(), |
| } |
| } |
| |
| return rec, err |
| } |
| |
| func randomDurationInRange(min, max time.Duration) time.Duration { |
| d := int64(max) - int64(min) |
| d = int64(mrand.Intn(int(d))) + int64(min) |
| return time.Duration(d) |
| } |
| |
| func randomIntInRange(min, max int) int { |
| return mrand.Intn(max-min) + min |
| } |
| |
| func formatBytes(val []byte) string { |
| if utf8.ValidString(string(val)) { |
| return string(val) |
| } |
| |
| return hex.EncodeToString(val) |
| } |
| |
| /* |
| ********************************************************* |
| Functions for persisting test data, including db file |
| and operation history |
| ********************************************************* |
| */ |
| func saveDataIfFailed(t *testing.T, db *bolt.DB, rs historyRecords, force bool) { |
| if t.Failed() || force { |
| t.Log("Saving data...") |
| dbPath := db.Path() |
| if err := db.Close(); err != nil { |
| t.Errorf("Failed to close db: %v", err) |
| } |
| backupPath := testResultsDirectory(t) |
| backupDB(t, dbPath, backupPath) |
| persistHistoryRecords(t, rs, backupPath) |
| } |
| } |
| |
| func backupDB(t *testing.T, srcPath string, dstPath string) { |
| targetFile := filepath.Join(dstPath, "db.bak") |
| t.Logf("Saving the DB file to %s", targetFile) |
| err := copyFile(srcPath, targetFile) |
| require.NoError(t, err) |
| t.Logf("DB file saved to %s", targetFile) |
| } |
| |
| func copyFile(srcPath, dstPath string) error { |
| // Ensure source file exists. |
| _, err := os.Stat(srcPath) |
| if os.IsNotExist(err) { |
| return fmt.Errorf("source file %q not found", srcPath) |
| } else if err != nil { |
| return err |
| } |
| |
| // Ensure output file not exist. |
| _, err = os.Stat(dstPath) |
| if err == nil { |
| return fmt.Errorf("output file %q already exists", dstPath) |
| } else if !os.IsNotExist(err) { |
| return err |
| } |
| |
| srcDB, err := os.Open(srcPath) |
| if err != nil { |
| return fmt.Errorf("failed to open source file %q: %w", srcPath, err) |
| } |
| defer srcDB.Close() |
| dstDB, err := os.Create(dstPath) |
| if err != nil { |
| return fmt.Errorf("failed to create output file %q: %w", dstPath, err) |
| } |
| defer dstDB.Close() |
| written, err := io.Copy(dstDB, srcDB) |
| if err != nil { |
| return fmt.Errorf("failed to copy database file from %q to %q: %w", srcPath, dstPath, err) |
| } |
| |
| srcFi, err := srcDB.Stat() |
| if err != nil { |
| return fmt.Errorf("failed to get source file info %q: %w", srcPath, err) |
| } |
| initialSize := srcFi.Size() |
| if initialSize != written { |
| return fmt.Errorf("the byte copied (%q: %d) isn't equal to the initial db size (%q: %d)", dstPath, written, srcPath, initialSize) |
| } |
| |
| return nil |
| } |
| |
| func persistHistoryRecords(t *testing.T, rs historyRecords, path string) { |
| recordFilePath := filepath.Join(path, "history_records.json") |
| t.Logf("Saving history records to %s", recordFilePath) |
| recordFile, err := os.OpenFile(recordFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) |
| require.NoError(t, err) |
| defer recordFile.Close() |
| encoder := json.NewEncoder(recordFile) |
| for _, rec := range rs { |
| err := encoder.Encode(rec) |
| require.NoError(t, err) |
| } |
| } |
| |
| func testResultsDirectory(t *testing.T) string { |
| resultsDirectory, ok := os.LookupEnv("RESULTS_DIR") |
| var err error |
| if !ok { |
| resultsDirectory, err = os.MkdirTemp("", "*.db") |
| require.NoError(t, err) |
| } |
| resultsDirectory, err = filepath.Abs(resultsDirectory) |
| require.NoError(t, err) |
| |
| path, err := filepath.Abs(filepath.Join(resultsDirectory, strings.ReplaceAll(t.Name(), "/", "_"))) |
| require.NoError(t, err) |
| |
| err = os.RemoveAll(path) |
| require.NoError(t, err) |
| |
| err = os.MkdirAll(path, 0700) |
| require.NoError(t, err) |
| |
| return path |
| } |
| |
| /* |
| ********************************************************* |
| Data structures and functions for analyzing history records |
| ********************************************************* |
| */ |
| type OperationType string |
| |
| const ( |
| Read OperationType = "read" |
| Write OperationType = "write" |
| Delete OperationType = "delete" |
| ) |
| |
| type historyRecord struct { |
| OperationType OperationType `json:"operationType,omitempty"` |
| Txid int `json:"txid,omitempty"` |
| Bucket string `json:"bucket,omitempty"` |
| Key string `json:"key,omitempty"` |
| Value []byte `json:"value,omitempty"` |
| } |
| |
| type historyRecords []historyRecord |
| |
| func (rs historyRecords) Len() int { |
| return len(rs) |
| } |
| |
| func (rs historyRecords) Less(i, j int) bool { |
| // Sorted by (bucket, key) firstly: all records in the same |
| // (bucket, key) are grouped together. |
| bucketCmp := strings.Compare(rs[i].Bucket, rs[j].Bucket) |
| if bucketCmp != 0 { |
| return bucketCmp < 0 |
| } |
| keyCmp := strings.Compare(rs[i].Key, rs[j].Key) |
| if keyCmp != 0 { |
| return keyCmp < 0 |
| } |
| |
| // Sorted by txid |
| return rs[i].Txid < rs[j].Txid |
| } |
| |
| func (rs historyRecords) Swap(i, j int) { |
| rs[i], rs[j] = rs[j], rs[i] |
| } |
| |
| func validateIncrementalTxid(rs historyRecords) error { |
| lastTxid := rs[0].Txid |
| |
| for i := 1; i < len(rs); i++ { |
| if rs[i].Txid < lastTxid { |
| return fmt.Errorf("detected non-incremental txid(%d, %d) in %s mode", lastTxid, rs[i].Txid, rs[i].OperationType) |
| } |
| lastTxid = rs[i].Txid |
| } |
| |
| return nil |
| } |
| |
| func validateSequential(rs historyRecords) error { |
| sort.Stable(rs) |
| |
| type bucketAndKey struct { |
| bucket string |
| key string |
| } |
| lastWriteKeyValueMap := make(map[bucketAndKey]*historyRecord) |
| |
| for _, rec := range rs { |
| bk := bucketAndKey{ |
| bucket: rec.Bucket, |
| key: rec.Key, |
| } |
| if v, ok := lastWriteKeyValueMap[bk]; ok { |
| if rec.OperationType == Write { |
| v.Txid = rec.Txid |
| if rec.Key != noopTxKey { |
| v.Value = rec.Value |
| } |
| } else if rec.OperationType == Delete { |
| delete(lastWriteKeyValueMap, bk) |
| } else { |
| if !bytes.Equal(v.Value, rec.Value) { |
| return fmt.Errorf("readOperation[txid: %d, bucket: %s, key: %s] read %x, \nbut writer[txid: %d] wrote %x", |
| rec.Txid, rec.Bucket, rec.Key, rec.Value, v.Txid, v.Value) |
| } |
| } |
| } else { |
| if rec.OperationType == Write && rec.Key != noopTxKey { |
| lastWriteKeyValueMap[bk] = &historyRecord{ |
| OperationType: Write, |
| Bucket: rec.Bucket, |
| Key: rec.Key, |
| Value: rec.Value, |
| Txid: rec.Txid, |
| } |
| } else if rec.OperationType == Read { |
| if len(rec.Value) != 0 { |
| return fmt.Errorf("expected the first readOperation[txid: %d, bucket: %s, key: %s] read nil, \nbut got %x", |
| rec.Txid, rec.Bucket, rec.Key, rec.Value) |
| } |
| } |
| } |
| } |
| |
| return nil |
| } |
| |
| /* |
| TestConcurrentRepeatableRead verifies repeatable read. The case |
| intentionally creates a scenario that read and write transactions |
| are interleaved. It performs several writing operations after starting |
| each long-running read transaction to ensure it has a larger txid |
| than previous read transaction. It verifies that bbolt correctly |
| releases free pages, and will not pollute (e.g. prematurely release) |
| any pages which are still being used by any read transaction. |
| */ |
| func TestConcurrentRepeatableRead(t *testing.T) { |
| if testing.Short() { |
| t.Skip("skipping test in short mode.") |
| } |
| |
| testCases := []struct { |
| name string |
| noFreelistSync bool |
| freelistType bolt.FreelistType |
| }{ |
| // [array] freelist |
| { |
| name: "sync array freelist", |
| noFreelistSync: false, |
| freelistType: bolt.FreelistArrayType, |
| }, |
| { |
| name: "not sync array freelist", |
| noFreelistSync: true, |
| freelistType: bolt.FreelistArrayType, |
| }, |
| // [map] freelist |
| { |
| name: "sync map freelist", |
| noFreelistSync: false, |
| freelistType: bolt.FreelistMapType, |
| }, |
| { |
| name: "not sync map freelist", |
| noFreelistSync: true, |
| freelistType: bolt.FreelistMapType, |
| }, |
| } |
| |
| for _, tc := range testCases { |
| tc := tc |
| t.Run(tc.name, func(t *testing.T) { |
| |
| t.Log("Preparing db.") |
| var ( |
| bucket = []byte("data") |
| key = []byte("mykey") |
| |
| option = &bolt.Options{ |
| PageSize: 4096, |
| NoFreelistSync: tc.noFreelistSync, |
| FreelistType: tc.freelistType, |
| } |
| ) |
| |
| db := mustCreateDB(t, option) |
| defer func() { |
| // The db will be reopened later, so put `db.Close()` in a function |
| // to avoid premature evaluation of `db`. Note that the execution |
| // of a deferred function is deferred to the moment the surrounding |
| // function returns, but the function value and parameters to the |
| // call are evaluated as usual and saved anew. |
| db.Close() |
| }() |
| |
| // Create lots of K/V to allocate some pages |
| err := db.Update(func(tx *bolt.Tx) error { |
| b, err := tx.CreateBucketIfNotExists(bucket) |
| if err != nil { |
| return err |
| } |
| for i := 0; i < 1000; i++ { |
| k := fmt.Sprintf("key_%d", i) |
| if err := b.Put([]byte(k), make([]byte, 1024)); err != nil { |
| return err |
| } |
| } |
| return nil |
| }) |
| require.NoError(t, err) |
| |
| // Remove all K/V to create some free pages |
| err = db.Update(func(tx *bolt.Tx) error { |
| b := tx.Bucket(bucket) |
| for i := 0; i < 1000; i++ { |
| k := fmt.Sprintf("key_%d", i) |
| if err := b.Delete([]byte(k)); err != nil { |
| return err |
| } |
| } |
| return b.Put(key, []byte("randomValue")) |
| }) |
| require.NoError(t, err) |
| |
| // bbolt will not release free pages directly after committing |
| // a writing transaction; instead all pages freed are putting |
| // into a pending list. Accordingly, the free pages might not |
| // be able to be reused by following writing transactions. So |
| // we reopen the db to completely release all free pages. |
| db = mustReOpenDB(t, db, option) |
| |
| var ( |
| wg sync.WaitGroup |
| longRunningReaderCount = 10 |
| stopCh = make(chan struct{}) |
| errCh = make(chan error, longRunningReaderCount) |
| readInterval = duration{5 * time.Millisecond, 10 * time.Millisecond} |
| |
| writeOperationCountInBetween = 5 |
| writeBytes = bytesRange{10, 20} |
| |
| testDuration = 10 * time.Second |
| ) |
| |
| for i := 0; i < longRunningReaderCount; i++ { |
| readWorkerName := fmt.Sprintf("reader_%d", i) |
| t.Logf("Starting long running read operation: %s", readWorkerName) |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| rErr := executeLongRunningRead(t, readWorkerName, db, bucket, key, readInterval, stopCh) |
| if rErr != nil { |
| errCh <- rErr |
| } |
| }() |
| time.Sleep(500 * time.Millisecond) |
| |
| t.Logf("Perform %d write operations after starting a long running read operation", writeOperationCountInBetween) |
| for j := 0; j < writeOperationCountInBetween; j++ { |
| err := db.Update(func(tx *bolt.Tx) error { |
| _, eerr := executeWrite(tx, bucket, key, writeBytes, 0) |
| return eerr |
| }) |
| |
| require.NoError(t, err) |
| } |
| } |
| |
| t.Log("Perform lots of write operations to check whether the long running read operations will read dirty data") |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| cnt := longRunningReaderCount * writeOperationCountInBetween |
| for i := 0; i < cnt; i++ { |
| select { |
| case <-stopCh: |
| return |
| default: |
| } |
| err := db.Update(func(tx *bolt.Tx) error { |
| _, eerr := executeWrite(tx, bucket, key, writeBytes, 0) |
| return eerr |
| }) |
| require.NoError(t, err) |
| } |
| }() |
| |
| t.Log("Waiting for result") |
| select { |
| case err := <-errCh: |
| close(stopCh) |
| t.Errorf("Detected dirty read: %v", err) |
| case <-time.After(testDuration): |
| close(stopCh) |
| } |
| |
| wg.Wait() |
| }) |
| } |
| } |
| |
| func executeLongRunningRead(t *testing.T, name string, db *bolt.DB, bucket []byte, key []byte, readInterval duration, stopCh chan struct{}) error { |
| err := db.View(func(tx *bolt.Tx) error { |
| b := tx.Bucket(bucket) |
| |
| initialVal := b.Get(key) |
| |
| for { |
| select { |
| case <-stopCh: |
| t.Logf("%q finished.", name) |
| return nil |
| default: |
| } |
| |
| time.Sleep(randomDurationInRange(readInterval.min, readInterval.max)) |
| val := b.Get(key) |
| |
| if !bytes.Equal(initialVal, val) { |
| dirtyReadErr := fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q", |
| string(key), formatBytes(initialVal), formatBytes(val)) |
| return dirtyReadErr |
| } |
| } |
| }) |
| |
| return err |
| } |