| /* |
| * Copyright 2017 Dgraph Labs, Inc. and Contributors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package badger |
| |
| import ( |
| "bytes" |
| "encoding/json" |
| "fmt" |
| "io/ioutil" |
| "math/rand" |
| "os" |
| "reflect" |
| "runtime" |
| "sync" |
| "testing" |
| "time" |
| |
| "github.com/dgraph-io/badger/v2/options" |
| "github.com/dgraph-io/badger/v2/y" |
| humanize "github.com/dustin/go-humanize" |
| "github.com/stretchr/testify/require" |
| "golang.org/x/net/trace" |
| ) |
| |
| func TestValueBasic(t *testing.T) { |
| t.Skip() |
| dir, err := ioutil.TempDir("", "badger-test") |
| y.Check(err) |
| defer removeDir(dir) |
| |
| kv, _ := Open(getTestOptions(dir)) |
| defer kv.Close() |
| log := &kv.vlog |
| |
| // Use value big enough that the value log writes them even if SyncWrites is false. |
| const val1 = "sampleval012345678901234567890123" |
| const val2 = "samplevalb012345678901234567890123" |
| require.True(t, len(val1) >= kv.opt.ValueThreshold) |
| |
| e1 := &Entry{ |
| Key: []byte("samplekey"), |
| Value: []byte(val1), |
| meta: bitValuePointer, |
| } |
| e2 := &Entry{ |
| Key: []byte("samplekeyb"), |
| Value: []byte(val2), |
| meta: bitValuePointer, |
| } |
| |
| b := new(request) |
| b.Entries = []*Entry{e1, e2} |
| |
| log.write([]*request{b}) |
| require.Len(t, b.Ptrs, 2) |
| t.Logf("Pointer written: %+v %+v\n", b.Ptrs[0], b.Ptrs[1]) |
| |
| s := new(y.Slice) |
| buf1, lf1, err1 := log.readValueBytes(b.Ptrs[0], s) |
| buf2, lf2, err2 := log.readValueBytes(b.Ptrs[1], s) |
| require.NoError(t, err1) |
| require.NoError(t, err2) |
| defer runCallback(log.getUnlockCallback(lf1)) |
| defer runCallback(log.getUnlockCallback(lf2)) |
| e1, err = lf1.decodeEntry(buf1, b.Ptrs[0].Offset) |
| require.NoError(t, err) |
| e2, err = lf1.decodeEntry(buf2, b.Ptrs[1].Offset) |
| require.NoError(t, err) |
| readEntries := []Entry{*e1, *e2} |
| require.EqualValues(t, []Entry{ |
| { |
| Key: []byte("samplekey"), |
| Value: []byte(val1), |
| meta: bitValuePointer, |
| offset: b.Ptrs[0].Offset, |
| }, |
| { |
| Key: []byte("samplekeyb"), |
| Value: []byte(val2), |
| meta: bitValuePointer, |
| offset: b.Ptrs[1].Offset, |
| }, |
| }, readEntries) |
| |
| } |
| |
| func TestValueGCManaged(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| |
| N := 10000 |
| opt := getTestOptions(dir) |
| opt.ValueLogMaxEntries = uint32(N / 10) |
| opt.managedTxns = true |
| db, err := Open(opt) |
| require.NoError(t, err) |
| defer db.Close() |
| |
| var ts uint64 |
| newTs := func() uint64 { |
| ts++ |
| return ts |
| } |
| |
| sz := 64 << 10 |
| var wg sync.WaitGroup |
| for i := 0; i < N; i++ { |
| v := make([]byte, sz) |
| rand.Read(v[:rand.Intn(sz)]) |
| |
| wg.Add(1) |
| txn := db.NewTransactionAt(newTs(), true) |
| require.NoError(t, txn.SetEntry(NewEntry([]byte(fmt.Sprintf("key%d", i)), v))) |
| require.NoError(t, txn.CommitAt(newTs(), func(err error) { |
| wg.Done() |
| require.NoError(t, err) |
| })) |
| } |
| |
| for i := 0; i < N; i++ { |
| wg.Add(1) |
| txn := db.NewTransactionAt(newTs(), true) |
| require.NoError(t, txn.Delete([]byte(fmt.Sprintf("key%d", i)))) |
| require.NoError(t, txn.CommitAt(newTs(), func(err error) { |
| wg.Done() |
| require.NoError(t, err) |
| })) |
| } |
| wg.Wait() |
| files, err := ioutil.ReadDir(dir) |
| require.NoError(t, err) |
| for _, fi := range files { |
| t.Logf("File: %s. Size: %s\n", fi.Name(), humanize.Bytes(uint64(fi.Size()))) |
| } |
| |
| for i := 0; i < 100; i++ { |
| // Try at max 100 times to GC even a single value log file. |
| if err := db.RunValueLogGC(0.0001); err == nil { |
| return // Done |
| } |
| } |
| require.Fail(t, "Unable to GC even a single value log file.") |
| } |
| |
| func TestValueGC(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| opt := getTestOptions(dir) |
| opt.ValueLogFileSize = 1 << 20 |
| |
| kv, _ := Open(opt) |
| defer kv.Close() |
| |
| sz := 32 << 10 |
| txn := kv.NewTransaction(true) |
| for i := 0; i < 100; i++ { |
| v := make([]byte, sz) |
| rand.Read(v[:rand.Intn(sz)]) |
| require.NoError(t, txn.SetEntry(NewEntry([]byte(fmt.Sprintf("key%d", i)), v))) |
| if i%20 == 0 { |
| require.NoError(t, txn.Commit()) |
| txn = kv.NewTransaction(true) |
| } |
| } |
| require.NoError(t, txn.Commit()) |
| |
| for i := 0; i < 45; i++ { |
| txnDelete(t, kv, []byte(fmt.Sprintf("key%d", i))) |
| } |
| |
| kv.vlog.filesLock.RLock() |
| lf := kv.vlog.filesMap[kv.vlog.sortedFids()[0]] |
| kv.vlog.filesLock.RUnlock() |
| |
| // lf.iterate(0, func(e Entry) bool { |
| // e.print("lf") |
| // return true |
| // }) |
| |
| tr := trace.New("Test", "Test") |
| defer tr.Finish() |
| kv.vlog.rewrite(lf, tr) |
| for i := 45; i < 100; i++ { |
| key := []byte(fmt.Sprintf("key%d", i)) |
| |
| require.NoError(t, kv.View(func(txn *Txn) error { |
| item, err := txn.Get(key) |
| require.NoError(t, err) |
| val := getItemValue(t, item) |
| require.NotNil(t, val) |
| require.True(t, len(val) == sz, "Size found: %d", len(val)) |
| return nil |
| })) |
| } |
| } |
| |
| func TestValueGC2(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| opt := getTestOptions(dir) |
| opt.ValueLogFileSize = 1 << 20 |
| |
| kv, _ := Open(opt) |
| defer kv.Close() |
| |
| sz := 32 << 10 |
| txn := kv.NewTransaction(true) |
| for i := 0; i < 100; i++ { |
| v := make([]byte, sz) |
| rand.Read(v[:rand.Intn(sz)]) |
| require.NoError(t, txn.SetEntry(NewEntry([]byte(fmt.Sprintf("key%d", i)), v))) |
| if i%20 == 0 { |
| require.NoError(t, txn.Commit()) |
| txn = kv.NewTransaction(true) |
| } |
| } |
| require.NoError(t, txn.Commit()) |
| |
| for i := 0; i < 5; i++ { |
| txnDelete(t, kv, []byte(fmt.Sprintf("key%d", i))) |
| } |
| |
| for i := 5; i < 10; i++ { |
| v := []byte(fmt.Sprintf("value%d", i)) |
| txnSet(t, kv, []byte(fmt.Sprintf("key%d", i)), v, 0) |
| } |
| |
| kv.vlog.filesLock.RLock() |
| lf := kv.vlog.filesMap[kv.vlog.sortedFids()[0]] |
| kv.vlog.filesLock.RUnlock() |
| |
| // lf.iterate(0, func(e Entry) bool { |
| // e.print("lf") |
| // return true |
| // }) |
| |
| tr := trace.New("Test", "Test") |
| defer tr.Finish() |
| kv.vlog.rewrite(lf, tr) |
| for i := 0; i < 5; i++ { |
| key := []byte(fmt.Sprintf("key%d", i)) |
| require.NoError(t, kv.View(func(txn *Txn) error { |
| _, err := txn.Get(key) |
| require.Equal(t, ErrKeyNotFound, err) |
| return nil |
| })) |
| } |
| for i := 5; i < 10; i++ { |
| key := []byte(fmt.Sprintf("key%d", i)) |
| require.NoError(t, kv.View(func(txn *Txn) error { |
| item, err := txn.Get(key) |
| require.NoError(t, err) |
| val := getItemValue(t, item) |
| require.NotNil(t, val) |
| require.Equal(t, string(val), fmt.Sprintf("value%d", i)) |
| return nil |
| })) |
| } |
| for i := 10; i < 100; i++ { |
| key := []byte(fmt.Sprintf("key%d", i)) |
| require.NoError(t, kv.View(func(txn *Txn) error { |
| item, err := txn.Get(key) |
| require.NoError(t, err) |
| val := getItemValue(t, item) |
| require.NotNil(t, val) |
| require.True(t, len(val) == sz, "Size found: %d", len(val)) |
| return nil |
| })) |
| } |
| } |
| |
| func TestValueGC3(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| opt := getTestOptions(dir) |
| opt.ValueLogFileSize = 1 << 20 |
| |
| kv, err := Open(opt) |
| require.NoError(t, err) |
| defer kv.Close() |
| |
| // We want to test whether an iterator can continue through a value log GC. |
| |
| valueSize := 32 << 10 |
| |
| var value3 []byte |
| txn := kv.NewTransaction(true) |
| for i := 0; i < 100; i++ { |
| v := make([]byte, valueSize) // 32K * 100 will take >=3'276'800 B. |
| if i == 3 { |
| value3 = v |
| } |
| rand.Read(v[:]) |
| // Keys key000, key001, key002, such that sorted order matches insertion order |
| require.NoError(t, txn.SetEntry(NewEntry([]byte(fmt.Sprintf("key%03d", i)), v))) |
| if i%20 == 0 { |
| require.NoError(t, txn.Commit()) |
| txn = kv.NewTransaction(true) |
| } |
| } |
| require.NoError(t, txn.Commit()) |
| |
| // Start an iterator to keys in the first value log file |
| itOpt := IteratorOptions{ |
| PrefetchValues: false, |
| PrefetchSize: 0, |
| Reverse: false, |
| } |
| |
| txn = kv.NewTransaction(true) |
| it := txn.NewIterator(itOpt) |
| defer it.Close() |
| // Walk a few keys |
| it.Rewind() |
| require.True(t, it.Valid()) |
| item := it.Item() |
| require.Equal(t, []byte("key000"), item.Key()) |
| it.Next() |
| require.True(t, it.Valid()) |
| item = it.Item() |
| require.Equal(t, []byte("key001"), item.Key()) |
| it.Next() |
| require.True(t, it.Valid()) |
| item = it.Item() |
| require.Equal(t, []byte("key002"), item.Key()) |
| |
| // Like other tests, we pull out a logFile to rewrite it directly |
| |
| kv.vlog.filesLock.RLock() |
| logFile := kv.vlog.filesMap[kv.vlog.sortedFids()[0]] |
| kv.vlog.filesLock.RUnlock() |
| |
| tr := trace.New("Test", "Test") |
| defer tr.Finish() |
| kv.vlog.rewrite(logFile, tr) |
| it.Next() |
| require.True(t, it.Valid()) |
| item = it.Item() |
| require.Equal(t, []byte("key003"), item.Key()) |
| |
| v3, err := item.ValueCopy(nil) |
| require.NoError(t, err) |
| require.Equal(t, value3, v3) |
| } |
| |
| func TestValueGC4(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| opt := getTestOptions(dir) |
| opt.ValueLogFileSize = 1 << 20 |
| opt.Truncate = true |
| |
| kv, err := Open(opt) |
| require.NoError(t, err) |
| defer kv.Close() |
| |
| sz := 128 << 10 // 5 entries per value log file. |
| txn := kv.NewTransaction(true) |
| for i := 0; i < 24; i++ { |
| v := make([]byte, sz) |
| rand.Read(v[:rand.Intn(sz)]) |
| require.NoError(t, txn.SetEntry(NewEntry([]byte(fmt.Sprintf("key%d", i)), v))) |
| if i%3 == 0 { |
| require.NoError(t, txn.Commit()) |
| txn = kv.NewTransaction(true) |
| } |
| } |
| require.NoError(t, txn.Commit()) |
| |
| for i := 0; i < 8; i++ { |
| txnDelete(t, kv, []byte(fmt.Sprintf("key%d", i))) |
| } |
| |
| for i := 8; i < 16; i++ { |
| v := []byte(fmt.Sprintf("value%d", i)) |
| txnSet(t, kv, []byte(fmt.Sprintf("key%d", i)), v, 0) |
| } |
| |
| kv.vlog.filesLock.RLock() |
| lf0 := kv.vlog.filesMap[kv.vlog.sortedFids()[0]] |
| lf1 := kv.vlog.filesMap[kv.vlog.sortedFids()[1]] |
| kv.vlog.filesLock.RUnlock() |
| |
| // lf.iterate(0, func(e Entry) bool { |
| // e.print("lf") |
| // return true |
| // }) |
| |
| tr := trace.New("Test", "Test") |
| defer tr.Finish() |
| kv.vlog.rewrite(lf0, tr) |
| kv.vlog.rewrite(lf1, tr) |
| |
| err = kv.vlog.Close() |
| require.NoError(t, err) |
| |
| kv.vlog.init(kv) |
| err = kv.vlog.open(kv, valuePointer{Fid: 2}, kv.replayFunction()) |
| require.NoError(t, err) |
| |
| for i := 0; i < 8; i++ { |
| key := []byte(fmt.Sprintf("key%d", i)) |
| require.NoError(t, kv.View(func(txn *Txn) error { |
| _, err := txn.Get(key) |
| require.Equal(t, ErrKeyNotFound, err) |
| return nil |
| })) |
| } |
| for i := 8; i < 16; i++ { |
| key := []byte(fmt.Sprintf("key%d", i)) |
| require.NoError(t, kv.View(func(txn *Txn) error { |
| item, err := txn.Get(key) |
| require.NoError(t, err) |
| val := getItemValue(t, item) |
| require.NotNil(t, val) |
| require.Equal(t, string(val), fmt.Sprintf("value%d", i)) |
| return nil |
| })) |
| } |
| } |
| |
| func TestPersistLFDiscardStats(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| opt := getTestOptions(dir) |
| opt.ValueLogFileSize = 1 << 20 |
| opt.Truncate = true |
| // avoid compaction on close, so that discard map remains same |
| opt.CompactL0OnClose = false |
| |
| db, err := Open(opt) |
| require.NoError(t, err) |
| |
| sz := 128 << 10 // 5 entries per value log file. |
| v := make([]byte, sz) |
| rand.Read(v[:rand.Intn(sz)]) |
| txn := db.NewTransaction(true) |
| for i := 0; i < 500; i++ { |
| require.NoError(t, txn.SetEntry(NewEntry([]byte(fmt.Sprintf("key%d", i)), v))) |
| if i%3 == 0 { |
| require.NoError(t, txn.Commit()) |
| txn = db.NewTransaction(true) |
| } |
| } |
| require.NoError(t, txn.Commit(), "error while committing txn") |
| |
| for i := 0; i < 500; i++ { |
| // use Entry.WithDiscard() to delete entries, because this causes data to be flushed on |
| // disk, creating SSTs. Simple Delete was having data in Memtables only. |
| err = db.Update(func(txn *Txn) error { |
| return txn.SetEntry(NewEntry([]byte(fmt.Sprintf("key%d", i)), v).WithDiscard()) |
| }) |
| require.NoError(t, err) |
| } |
| |
| time.Sleep(1 * time.Second) // wait for compaction to complete |
| |
| persistedMap := make(map[uint32]int64) |
| db.vlog.lfDiscardStats.Lock() |
| require.True(t, len(db.vlog.lfDiscardStats.m) > 0, "some discardStats should be generated") |
| for k, v := range db.vlog.lfDiscardStats.m { |
| persistedMap[k] = v |
| } |
| db.vlog.lfDiscardStats.updatesSinceFlush = discardStatsFlushThreshold + 1 |
| db.vlog.lfDiscardStats.Unlock() |
| |
| // db.vlog.lfDiscardStats.updatesSinceFlush is already > discardStatsFlushThreshold, |
| // send empty map to flushChan, so that latest discardStats map can be persisted. |
| db.vlog.lfDiscardStats.flushChan <- map[uint32]int64{} |
| time.Sleep(1 * time.Second) // Wait for map to be persisted. |
| err = db.Close() |
| require.NoError(t, err) |
| |
| db, err = Open(opt) |
| require.NoError(t, err) |
| defer db.Close() |
| time.Sleep(1 * time.Second) // Wait for discardStats to be populated by populateDiscardStats(). |
| db.vlog.lfDiscardStats.RLock() |
| require.True(t, reflect.DeepEqual(persistedMap, db.vlog.lfDiscardStats.m), |
| "Discard maps are not equal") |
| db.vlog.lfDiscardStats.RUnlock() |
| } |
| |
| func TestChecksums(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| |
| // Set up SST with K1=V1 |
| opts := getTestOptions(dir) |
| opts.Truncate = true |
| opts.ValueLogFileSize = 100 * 1024 * 1024 // 100Mb |
| kv, err := Open(opts) |
| require.NoError(t, err) |
| require.NoError(t, kv.Close()) |
| |
| var ( |
| k0 = []byte("k0") |
| k1 = []byte("k1") |
| k2 = []byte("k2") |
| k3 = []byte("k3") |
| v0 = []byte("value0-012345678901234567890123012345678901234567890123") |
| v1 = []byte("value1-012345678901234567890123012345678901234567890123") |
| v2 = []byte("value2-012345678901234567890123012345678901234567890123") |
| v3 = []byte("value3-012345678901234567890123012345678901234567890123") |
| ) |
| // Make sure the value log would actually store the item |
| require.True(t, len(v0) >= kv.opt.ValueThreshold) |
| |
| // Use a vlog with K0=V0 and a (corrupted) second transaction(k1,k2) |
| buf := createVlog(t, []*Entry{ |
| {Key: k0, Value: v0}, |
| {Key: k1, Value: v1}, |
| {Key: k2, Value: v2}, |
| }) |
| buf[len(buf)-1]++ // Corrupt last byte |
| require.NoError(t, ioutil.WriteFile(vlogFilePath(dir, 0), buf, 0777)) |
| |
| // K1 should exist, but K2 shouldn't. |
| kv, err = Open(opts) |
| require.NoError(t, err) |
| |
| require.NoError(t, kv.View(func(txn *Txn) error { |
| item, err := txn.Get(k0) |
| require.NoError(t, err) |
| require.Equal(t, getItemValue(t, item), v0) |
| |
| _, err = txn.Get(k1) |
| require.Equal(t, ErrKeyNotFound, err) |
| |
| _, err = txn.Get(k2) |
| require.Equal(t, ErrKeyNotFound, err) |
| return nil |
| })) |
| |
| // Write K3 at the end of the vlog. |
| txnSet(t, kv, k3, v3, 0) |
| require.NoError(t, kv.Close()) |
| |
| // The vlog should contain K0 and K3 (K1 and k2 was lost when Badger started up |
| // last due to checksum failure). |
| kv, err = Open(opts) |
| require.NoError(t, err) |
| |
| { |
| txn := kv.NewTransaction(false) |
| |
| iter := txn.NewIterator(DefaultIteratorOptions) |
| iter.Seek(k0) |
| require.True(t, iter.Valid()) |
| it := iter.Item() |
| require.Equal(t, it.Key(), k0) |
| require.Equal(t, getItemValue(t, it), v0) |
| iter.Next() |
| require.True(t, iter.Valid()) |
| it = iter.Item() |
| require.Equal(t, it.Key(), k3) |
| require.Equal(t, getItemValue(t, it), v3) |
| |
| iter.Close() |
| txn.Discard() |
| } |
| |
| require.NoError(t, kv.Close()) |
| } |
| |
| func TestPartialAppendToValueLog(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| |
| // Create skeleton files. |
| opts := getTestOptions(dir) |
| opts.Truncate = true |
| opts.ValueLogFileSize = 100 * 1024 * 1024 // 100Mb |
| kv, err := Open(opts) |
| require.NoError(t, err) |
| require.NoError(t, kv.Close()) |
| |
| var ( |
| k0 = []byte("k0") |
| k1 = []byte("k1") |
| k2 = []byte("k2") |
| k3 = []byte("k3") |
| v0 = []byte("value0-01234567890123456789012012345678901234567890123") |
| v1 = []byte("value1-01234567890123456789012012345678901234567890123") |
| v2 = []byte("value2-01234567890123456789012012345678901234567890123") |
| v3 = []byte("value3-01234567890123456789012012345678901234567890123") |
| ) |
| // Values need to be long enough to actually get written to value log. |
| require.True(t, len(v3) >= kv.opt.ValueThreshold) |
| |
| // Create truncated vlog to simulate a partial append. |
| // k0 - single transaction, k1 and k2 in another transaction |
| buf := createVlog(t, []*Entry{ |
| {Key: k0, Value: v0}, |
| {Key: k1, Value: v1}, |
| {Key: k2, Value: v2}, |
| }) |
| buf = buf[:len(buf)-6] |
| require.NoError(t, ioutil.WriteFile(vlogFilePath(dir, 0), buf, 0777)) |
| |
| // Badger should now start up |
| kv, err = Open(opts) |
| require.NoError(t, err) |
| |
| require.NoError(t, kv.View(func(txn *Txn) error { |
| item, err := txn.Get(k0) |
| require.NoError(t, err) |
| require.Equal(t, v0, getItemValue(t, item)) |
| |
| _, err = txn.Get(k1) |
| require.Equal(t, ErrKeyNotFound, err) |
| _, err = txn.Get(k2) |
| require.Equal(t, ErrKeyNotFound, err) |
| return nil |
| })) |
| |
| // When K3 is set, it should be persisted after a restart. |
| txnSet(t, kv, k3, v3, 0) |
| require.NoError(t, kv.Close()) |
| kv, err = Open(opts) |
| require.NoError(t, err) |
| checkKeys(t, kv, [][]byte{k3}) |
| // Replay value log from beginning, badger head is past k2. |
| require.NoError(t, kv.vlog.Close()) |
| |
| kv.vlog.init(kv) |
| require.NoError( |
| t, kv.vlog.open(kv, valuePointer{Fid: 0}, kv.replayFunction()), |
| ) |
| require.NoError(t, kv.Close()) |
| } |
| |
| func TestReadOnlyOpenWithPartialAppendToValueLog(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| |
| // Create skeleton files. |
| opts := getTestOptions(dir) |
| opts.ValueLogFileSize = 100 * 1024 * 1024 // 100Mb |
| kv, err := Open(opts) |
| require.NoError(t, err) |
| require.NoError(t, kv.Close()) |
| |
| var ( |
| k0 = []byte("k0") |
| k1 = []byte("k1") |
| k2 = []byte("k2") |
| v0 = []byte("value0-012345678901234567890123") |
| v1 = []byte("value1-012345678901234567890123") |
| v2 = []byte("value2-012345678901234567890123") |
| ) |
| |
| // Create truncated vlog to simulate a partial append. |
| // k0 - single transaction, k1 and k2 in another transaction |
| buf := createVlog(t, []*Entry{ |
| {Key: k0, Value: v0}, |
| {Key: k1, Value: v1}, |
| {Key: k2, Value: v2}, |
| }) |
| buf = buf[:len(buf)-6] |
| require.NoError(t, ioutil.WriteFile(vlogFilePath(dir, 0), buf, 0777)) |
| |
| opts.ReadOnly = true |
| // Badger should fail a read-only open with values to replay |
| _, err = Open(opts) |
| require.Error(t, err) |
| require.Regexp(t, "Database was not properly closed, cannot open read-only|Read-only mode is not supported on Windows", err.Error()) |
| } |
| |
| func TestValueLogTrigger(t *testing.T) { |
| t.Skip("Difficult to trigger compaction, so skipping. Re-enable after fixing #226") |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| |
| opt := getTestOptions(dir) |
| opt.ValueLogFileSize = 1 << 20 |
| kv, err := Open(opt) |
| require.NoError(t, err) |
| |
| // Write a lot of data, so it creates some work for valug log GC. |
| sz := 32 << 10 |
| txn := kv.NewTransaction(true) |
| for i := 0; i < 100; i++ { |
| v := make([]byte, sz) |
| rand.Read(v[:rand.Intn(sz)]) |
| require.NoError(t, txn.SetEntry(NewEntry([]byte(fmt.Sprintf("key%d", i)), v))) |
| if i%20 == 0 { |
| require.NoError(t, txn.Commit()) |
| txn = kv.NewTransaction(true) |
| } |
| } |
| require.NoError(t, txn.Commit()) |
| |
| for i := 0; i < 45; i++ { |
| txnDelete(t, kv, []byte(fmt.Sprintf("key%d", i))) |
| } |
| |
| require.NoError(t, kv.RunValueLogGC(0.5)) |
| |
| require.NoError(t, kv.Close()) |
| |
| err = kv.RunValueLogGC(0.5) |
| require.Equal(t, ErrRejected, err, "Error should be returned after closing DB.") |
| } |
| |
| func createVlog(t *testing.T, entries []*Entry) []byte { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| |
| opts := getTestOptions(dir) |
| opts.ValueLogFileSize = 100 * 1024 * 1024 // 100Mb |
| kv, err := Open(opts) |
| require.NoError(t, err) |
| txnSet(t, kv, entries[0].Key, entries[0].Value, entries[0].meta) |
| entries = entries[1:] |
| txn := kv.NewTransaction(true) |
| for _, entry := range entries { |
| require.NoError(t, txn.SetEntry(NewEntry(entry.Key, entry.Value).WithMeta(entry.meta))) |
| } |
| require.NoError(t, txn.Commit()) |
| require.NoError(t, kv.Close()) |
| |
| filename := vlogFilePath(dir, 0) |
| buf, err := ioutil.ReadFile(filename) |
| require.NoError(t, err) |
| return buf |
| } |
| |
| func TestPenultimateLogCorruption(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| opt := getTestOptions(dir) |
| opt.ValueLogLoadingMode = options.FileIO |
| // Each txn generates at least two entries. 3 txns will fit each file. |
| opt.ValueLogMaxEntries = 5 |
| opt.LogRotatesToFlush = 1000 |
| |
| db0, err := Open(opt) |
| require.NoError(t, err) |
| |
| h := testHelper{db: db0, t: t} |
| h.writeRange(0, 7) |
| h.readRange(0, 7) |
| |
| for i := 2; i >= 0; i-- { |
| fpath := vlogFilePath(dir, uint32(i)) |
| fi, err := os.Stat(fpath) |
| require.NoError(t, err) |
| require.True(t, fi.Size() > 0, "Empty file at log=%d", i) |
| if i == 0 { |
| err := os.Truncate(fpath, fi.Size()-1) |
| require.NoError(t, err) |
| } |
| } |
| // Simulate a crash by not closing db0, but releasing the locks. |
| if db0.dirLockGuard != nil { |
| require.NoError(t, db0.dirLockGuard.release()) |
| } |
| if db0.valueDirGuard != nil { |
| require.NoError(t, db0.valueDirGuard.release()) |
| } |
| require.NoError(t, db0.vlog.Close()) |
| require.NoError(t, db0.manifest.close()) |
| require.NoError(t, db0.registry.Close()) |
| |
| opt.Truncate = true |
| db1, err := Open(opt) |
| require.NoError(t, err) |
| h.db = db1 |
| h.readRange(0, 1) // Only 2 should be gone, because it is at the end of logfile 0. |
| h.readRange(3, 7) |
| err = db1.View(func(txn *Txn) error { |
| _, err := txn.Get(h.key(2)) // Verify that 2 is gone. |
| require.Equal(t, ErrKeyNotFound, err) |
| return nil |
| }) |
| require.NoError(t, err) |
| require.NoError(t, db1.Close()) |
| } |
| |
| func checkKeys(t *testing.T, kv *DB, keys [][]byte) { |
| i := 0 |
| txn := kv.NewTransaction(false) |
| defer txn.Discard() |
| iter := txn.NewIterator(IteratorOptions{}) |
| defer iter.Close() |
| for iter.Seek(keys[0]); iter.Valid(); iter.Next() { |
| require.Equal(t, iter.Item().Key(), keys[i]) |
| i++ |
| } |
| require.Equal(t, i, len(keys)) |
| } |
| |
| type testHelper struct { |
| db *DB |
| t *testing.T |
| val []byte |
| } |
| |
| func (th *testHelper) key(i int) []byte { |
| return []byte(fmt.Sprintf("%010d", i)) |
| } |
| func (th *testHelper) value() []byte { |
| if len(th.val) > 0 { |
| return th.val |
| } |
| th.val = make([]byte, 100) |
| y.Check2(rand.Read(th.val)) |
| return th.val |
| } |
| |
| // writeRange [from, to]. |
| func (th *testHelper) writeRange(from, to int) { |
| for i := from; i <= to; i++ { |
| err := th.db.Update(func(txn *Txn) error { |
| return txn.SetEntry(NewEntry(th.key(i), th.value())) |
| }) |
| require.NoError(th.t, err) |
| } |
| } |
| |
| func (th *testHelper) readRange(from, to int) { |
| for i := from; i <= to; i++ { |
| err := th.db.View(func(txn *Txn) error { |
| item, err := txn.Get(th.key(i)) |
| if err != nil { |
| return err |
| } |
| return item.Value(func(val []byte) error { |
| require.Equal(th.t, val, th.value(), "key=%q", th.key(i)) |
| return nil |
| |
| }) |
| }) |
| require.NoError(th.t, err, "key=%q", th.key(i)) |
| } |
| } |
| |
| // Test Bug #578, which showed that if a value is moved during value log GC, an |
| // older version can end up at a higher level in the LSM tree than a newer |
| // version, causing the data to not be returned. |
| func TestBug578(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| y.Check(err) |
| defer removeDir(dir) |
| |
| db, err := Open(DefaultOptions(dir). |
| WithValueLogMaxEntries(64). |
| WithMaxTableSize(1 << 13)) |
| require.NoError(t, err) |
| |
| h := testHelper{db: db, t: t} |
| |
| // Let's run this whole thing a few times. |
| for j := 0; j < 10; j++ { |
| t.Logf("Cycle: %d\n", j) |
| h.writeRange(0, 32) |
| h.writeRange(0, 10) |
| h.writeRange(50, 72) |
| h.writeRange(40, 72) |
| h.writeRange(40, 72) |
| |
| // Run value log GC a few times. |
| for i := 0; i < 5; i++ { |
| db.RunValueLogGC(0.5) |
| } |
| h.readRange(0, 10) |
| } |
| require.NoError(t, db.Close()) |
| } |
| |
| func BenchmarkReadWrite(b *testing.B) { |
| rwRatio := []float32{ |
| 0.1, 0.2, 0.5, 1.0, |
| } |
| valueSize := []int{ |
| 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, |
| } |
| |
| for _, vsz := range valueSize { |
| for _, rw := range rwRatio { |
| b.Run(fmt.Sprintf("%3.1f,%04d", rw, vsz), func(b *testing.B) { |
| dir, err := ioutil.TempDir("", "vlog-benchmark") |
| y.Check(err) |
| defer removeDir(dir) |
| |
| db, err := Open(getTestOptions(dir)) |
| y.Check(err) |
| |
| vl := &db.vlog |
| b.ResetTimer() |
| |
| for i := 0; i < b.N; i++ { |
| e := new(Entry) |
| e.Key = make([]byte, 16) |
| e.Value = make([]byte, vsz) |
| bl := new(request) |
| bl.Entries = []*Entry{e} |
| |
| var ptrs []valuePointer |
| |
| vl.write([]*request{bl}) |
| ptrs = append(ptrs, bl.Ptrs...) |
| |
| f := rand.Float32() |
| if f < rw { |
| vl.write([]*request{bl}) |
| |
| } else { |
| ln := len(ptrs) |
| if ln == 0 { |
| b.Fatalf("Zero length of ptrs") |
| } |
| idx := rand.Intn(ln) |
| s := new(y.Slice) |
| buf, lf, err := vl.readValueBytes(ptrs[idx], s) |
| if err != nil { |
| b.Fatalf("Benchmark Read: %v", err) |
| } |
| |
| e, err := lf.decodeEntry(buf, ptrs[idx].Offset) |
| require.NoError(b, err) |
| if len(e.Key) != 16 { |
| b.Fatalf("Key is invalid") |
| } |
| if len(e.Value) != vsz { |
| b.Fatalf("Value is invalid") |
| } |
| runCallback(db.vlog.getUnlockCallback(lf)) |
| } |
| } |
| }) |
| } |
| } |
| } |
| |
| // Regression test for https://github.com/dgraph-io/badger/issues/817 |
| func TestValueLogTruncate(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| |
| db, err := Open(DefaultOptions(dir).WithTruncate(true)) |
| require.NoError(t, err) |
| // Insert 1 entry so that we have valid data in first vlog file |
| require.NoError(t, db.Update(func(txn *Txn) error { |
| return txn.Set([]byte("foo"), nil) |
| })) |
| |
| fileCountBeforeCorruption := len(db.vlog.filesMap) |
| |
| require.NoError(t, db.Close()) |
| |
| // Create two vlog files corrupted data. These will be truncated when DB starts next time |
| require.NoError(t, ioutil.WriteFile(vlogFilePath(dir, 1), []byte("foo"), 0664)) |
| require.NoError(t, ioutil.WriteFile(vlogFilePath(dir, 2), []byte("foo"), 0664)) |
| |
| db, err = Open(DefaultOptions(dir).WithTruncate(true)) |
| require.NoError(t, err) |
| |
| // Ensure vlog file with id=1 is not present |
| require.Nil(t, db.vlog.filesMap[1]) |
| |
| // Ensure filesize of fid=2 is zero |
| zeroFile, ok := db.vlog.filesMap[2] |
| require.True(t, ok) |
| fileStat, err := zeroFile.fd.Stat() |
| require.NoError(t, err) |
| |
| // The size of last vlog file in windows is equal to 2*opt.ValueLogFileSize. This is because |
| // we mmap the last value log file and windows doesn't allow us to mmap a file more than |
| // it's acutal size. So we increase the file size and then mmap it. See mmap_windows.go file. |
| if runtime.GOOS == "windows" { |
| require.Equal(t, 2*db.opt.ValueLogFileSize, fileStat.Size()) |
| } else { |
| require.Equal(t, int64(vlogHeaderSize), fileStat.Size()) |
| } |
| fileCountAfterCorruption := len(db.vlog.filesMap) |
| // +1 because the file with id=2 will be completely truncated. It won't be deleted. |
| // There would be two files. fid=0 with valid data, fid=2 with zero data (truncated). |
| require.Equal(t, fileCountBeforeCorruption+1, fileCountAfterCorruption) |
| // Max file ID would point to the last vlog file, which is fid=2 in this case |
| require.Equal(t, 2, int(db.vlog.maxFid)) |
| require.NoError(t, db.Close()) |
| } |
| |
| // Regression test for https://github.com/dgraph-io/dgraph/issues/3669 |
| func TestTruncatedDiscardStat(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| ops := getTestOptions(dir) |
| db, err := Open(ops) |
| require.NoError(t, err) |
| |
| stat := make(map[uint32]int64, 20) |
| for i := uint32(0); i < uint32(20); i++ { |
| stat[i] = 0 |
| } |
| db.vlog.lfDiscardStats.m = stat |
| encodedDS, _ := json.Marshal(db.vlog.lfDiscardStats.m) |
| entries := []*Entry{{ |
| Key: y.KeyWithTs(lfDiscardStatsKey, 1), |
| // Insert truncated discard stats. This is important. |
| Value: encodedDS[:13], |
| }} |
| // Push discard stats entry to the write channel. |
| req, err := db.sendToWriteCh(entries) |
| require.NoError(t, err) |
| req.Wait() |
| |
| // Unset discard stats. We've already pushed the stats. If we don't unset it then it will be |
| // pushed again on DB close. |
| db.vlog.lfDiscardStats.m = nil |
| |
| require.NoError(t, db.Close()) |
| |
| db, err = Open(ops) |
| require.NoError(t, err) |
| require.NoError(t, db.Close()) |
| } |
| |
| func TestSafeEntry(t *testing.T) { |
| var s safeRead |
| s.lf = &logFile{} |
| e := NewEntry([]byte("foo"), []byte("bar")) |
| buf := bytes.NewBuffer(nil) |
| _, err := s.lf.encodeEntry(e, buf, 0) |
| require.NoError(t, err) |
| |
| ne, err := s.Entry(buf) |
| require.NoError(t, err) |
| require.Equal(t, e.Key, ne.Key, "key mismatch") |
| require.Equal(t, e.Value, ne.Value, "value mismatch") |
| require.Equal(t, e.meta, ne.meta, "meta mismatch") |
| require.Equal(t, e.UserMeta, ne.UserMeta, "usermeta mismatch") |
| require.Equal(t, e.ExpiresAt, ne.ExpiresAt, "expiresAt mismatch") |
| } |
| |
| // Regression test for https://github.com/dgraph-io/badger/issues/926 |
| func TestDiscardStatsMove(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| ops := getTestOptions(dir) |
| ops.ValueLogMaxEntries = 1 |
| db, err := Open(ops) |
| require.NoError(t, err) |
| |
| stat := make(map[uint32]int64, ops.ValueThreshold+10) |
| for i := uint32(0); i < uint32(ops.ValueThreshold+10); i++ { |
| stat[i] = 0 |
| } |
| |
| db.vlog.lfDiscardStats.Lock() |
| db.vlog.lfDiscardStats.m = stat |
| encodedDS, _ := json.Marshal(db.vlog.lfDiscardStats.m) |
| db.vlog.lfDiscardStats.Unlock() |
| entries := []*Entry{{ |
| Key: y.KeyWithTs(lfDiscardStatsKey, 1), |
| // The discard stat value is more than value threshold. |
| Value: encodedDS, |
| }} |
| // Push discard stats entry to the write channel. |
| req, err := db.sendToWriteCh(entries) |
| require.NoError(t, err) |
| req.Wait() |
| |
| // Unset discard stats. We've already pushed the stats. If we don't unset it then it will be |
| // pushed again on DB close. Also, the first insertion was in vlog file 1, this insertion would |
| // be in value log file 3. |
| db.vlog.lfDiscardStats.Lock() |
| db.vlog.lfDiscardStats.m = nil |
| db.vlog.lfDiscardStats.Unlock() |
| |
| // Push more entries so that we get more than 1 value log files. |
| require.NoError(t, db.Update(func(txn *Txn) error { |
| e := NewEntry([]byte("f"), []byte("1")) |
| return txn.SetEntry(e) |
| })) |
| require.NoError(t, db.Update(func(txn *Txn) error { |
| e := NewEntry([]byte("ff"), []byte("1")) |
| return txn.SetEntry(e) |
| })) |
| |
| tr := trace.New("Badger.ValueLog", "GC") |
| // Use first value log file for GC. This value log file contains the discard stats. |
| lf := db.vlog.filesMap[0] |
| require.NoError(t, db.vlog.rewrite(lf, tr)) |
| require.NoError(t, db.Close()) |
| |
| db, err = Open(ops) |
| // discardStats will be populate using vlog.populateDiscardStats(), which pushes discard stats |
| // to vlog.lfDiscardStats.flushChan. Hence wait for some time, for discard stats to be updated. |
| time.Sleep(1 * time.Second) |
| require.NoError(t, err) |
| db.vlog.lfDiscardStats.RLock() |
| require.Equal(t, stat, db.vlog.lfDiscardStats.m) |
| db.vlog.lfDiscardStats.RUnlock() |
| require.NoError(t, db.Close()) |
| } |
| |
| // This test ensures, flushDiscardStats() doesn't crash. |
| func TestBlockedDiscardStats(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer os.Remove(dir) |
| db, err := Open(getTestOptions(dir)) |
| require.NoError(t, err) |
| // Set discard stats. |
| db.vlog.lfDiscardStats.m = map[uint32]int64{0: 0} |
| db.blockWrite() |
| // Push discard stats more than the capacity of flushChan. This ensures at least one flush |
| // operation completes successfully after the writes were blocked. |
| for i := 0; i < cap(db.vlog.lfDiscardStats.flushChan)+2; i++ { |
| db.vlog.lfDiscardStats.flushChan <- db.vlog.lfDiscardStats.m |
| } |
| db.unblockWrite() |
| require.NoError(t, db.Close()) |
| } |
| |
| // Regression test for https://github.com/dgraph-io/badger/issues/970 |
| func TestBlockedDiscardStatsOnClose(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| |
| db, err := Open(getTestOptions(dir)) |
| require.NoError(t, err) |
| db.vlog.lfDiscardStats.m = map[uint32]int64{0: 0} |
| // This is important. Set updateSinceFlush to discardStatsFlushThreshold so |
| // that the next update call flushes the discard stats. |
| db.vlog.lfDiscardStats.updatesSinceFlush = discardStatsFlushThreshold + 1 |
| require.NoError(t, db.Close()) |
| } |
| |
| func TestValueEntryChecksum(t *testing.T) { |
| k := []byte("KEY") |
| v := []byte(fmt.Sprintf("val%100d", 10)) |
| t.Run("ok", func(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| |
| opt := getTestOptions(dir) |
| opt.VerifyValueChecksum = true |
| db, err := Open(opt) |
| require.NoError(t, err) |
| |
| require.Greater(t, len(v), db.opt.ValueThreshold) |
| txnSet(t, db, k, v, 0) |
| require.NoError(t, db.Close()) |
| |
| db, err = Open(opt) |
| require.NoError(t, err) |
| |
| txn := db.NewTransaction(false) |
| entry, err := txn.Get(k) |
| require.NoError(t, err) |
| |
| x, err := entry.ValueCopy(nil) |
| require.NoError(t, err) |
| require.Equal(t, v, x) |
| |
| require.NoError(t, db.Close()) |
| }) |
| // Regression test for https://github.com/dgraph-io/badger/issues/1049 |
| t.Run("Corruption", func(t *testing.T) { |
| dir, err := ioutil.TempDir("", "badger-test") |
| require.NoError(t, err) |
| defer removeDir(dir) |
| |
| opt := getTestOptions(dir) |
| opt.VerifyValueChecksum = true |
| db, err := Open(opt) |
| require.NoError(t, err) |
| |
| require.Greater(t, len(v), db.opt.ValueThreshold) |
| txnSet(t, db, k, v, 0) |
| |
| path := db.vlog.fpath(0) |
| require.NoError(t, db.Close()) |
| |
| file, err := os.OpenFile(path, os.O_RDWR, 0644) |
| require.NoError(t, err) |
| offset := 50 |
| orig := make([]byte, 1) |
| _, err = file.ReadAt(orig, int64(offset)) |
| require.NoError(t, err) |
| // Corrupt a single bit. |
| _, err = file.WriteAt([]byte{7}, int64(offset)) |
| require.NoError(t, err) |
| require.NoError(t, file.Close()) |
| |
| db, err = Open(opt) |
| require.NoError(t, err) |
| |
| txn := db.NewTransaction(false) |
| entry, err := txn.Get(k) |
| require.NoError(t, err) |
| |
| x, err := entry.ValueCopy(nil) |
| require.Error(t, err) |
| require.Contains(t, err.Error(), "checksum mismatch") |
| require.Nil(t, x) |
| |
| require.NoError(t, db.Close()) |
| }) |
| } |