blob: d32d9c6aa68170b1fa684edaff859545527d8e9c [file] [log] [blame] [edit]
/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package table
import (
"crypto/aes"
"math"
"runtime"
"sync"
"unsafe"
"github.com/dgryski/go-farm"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"github.com/pkg/errors"
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
)
const (
KB = 1024
MB = KB * 1024
// When a block is encrypted, it's length increases. We add 200 bytes of padding to
// handle cases when block size increases. This is an approximate number.
padding = 200
)
type header struct {
overlap uint16 // Overlap with base key.
diff uint16 // Length of the diff.
}
const headerSize = uint16(unsafe.Sizeof(header{}))
// Encode encodes the header.
func (h header) Encode() []byte {
var b [4]byte
*(*header)(unsafe.Pointer(&b[0])) = h
return b[:]
}
// Decode decodes the header.
func (h *header) Decode(buf []byte) {
// Copy over data from buf into h. Using *h=unsafe.pointer(...) leads to
// pointer alignment issues. See https://github.com/dgraph-io/badger/issues/1096
// and comment https://github.com/dgraph-io/badger/pull/1097#pullrequestreview-307361714
copy(((*[headerSize]byte)(unsafe.Pointer(h))[:]), buf[:headerSize])
}
type bblock struct {
data []byte
start uint32 // Points to the starting offset of the block.
end uint32 // Points to the end offset of the block.
}
// Builder is used in building a table.
type Builder struct {
// Typically tens or hundreds of meg. This is for one single file.
buf []byte
sz uint32
bufLock sync.Mutex // This lock guards the buf. We acquire lock when we resize the buf.
baseKey []byte // Base key for the current block.
baseOffset uint32 // Offset for the current block.
entryOffsets []uint32 // Offsets of entries present in current block.
tableIndex *pb.TableIndex
keyHashes []uint64 // Used for building the bloomfilter.
opt *Options
// Used to concurrently compress/encrypt blocks.
wg sync.WaitGroup
blockChan chan *bblock
blockList []*bblock
}
// NewTableBuilder makes a new TableBuilder.
func NewTableBuilder(opts Options) *Builder {
b := &Builder{
// Additional 5 MB to store index (approximate).
// We trim the additional space in table.Finish().
buf: make([]byte, opts.TableSize+5*MB),
tableIndex: &pb.TableIndex{},
keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls.
opt: &opts,
}
// If encryption or compression is not enabled, do not start compression/encryption goroutines
// and write directly to the buffer.
if b.opt.Compression == options.None && b.opt.DataKey == nil {
return b
}
count := 2 * runtime.NumCPU()
b.blockChan = make(chan *bblock, count*2)
b.wg.Add(count)
for i := 0; i < count; i++ {
go b.handleBlock()
}
return b
}
var blockPool = &sync.Pool{
New: func() interface{} {
// Create 5 Kb blocks even when the default size of blocks is 4 KB. The
// ZSTD decompresion library increases the buffer by 2X if it's not big
// enough. Using a 5 KB block instead of a 4 KB one avoids the
// unncessary 2X allocation by the decompression library.
b := make([]byte, 5<<10)
return &b
},
}
func (b *Builder) handleBlock() {
defer b.wg.Done()
for item := range b.blockChan {
// Extract the block.
blockBuf := item.data[item.start:item.end]
var dst *[]byte
// Compress the block.
if b.opt.Compression != options.None {
var err error
dst = blockPool.Get().(*[]byte)
blockBuf, err = b.compressData(*dst, blockBuf)
y.Check(err)
}
if b.shouldEncrypt() {
eBlock, err := b.encrypt(blockBuf)
y.Check(y.Wrapf(err, "Error while encrypting block in table builder."))
blockBuf = eBlock
}
// The newend should always be less than or equal to the original end
// plus the padding. If the new end is greater than item.end+padding
// that means the data from this block cannot be stored in its existing
// location and trying to copy it over would mean we would over-write
// some data of the next block.
y.AssertTruef(uint32(len(blockBuf)) <= item.end+padding,
"newend: %d item.end: %d padding: %d", len(blockBuf), item.end, padding)
// Acquire the buflock here. The builder.grow function might change
// the b.buf while this goroutine was running.
b.bufLock.Lock()
// Copy over compressed/encrypted data back to the main buffer.
copy(b.buf[item.start:], blockBuf)
b.bufLock.Unlock()
// Fix the boundary of the block.
item.end = item.start + uint32(len(blockBuf))
if dst != nil {
blockPool.Put(dst)
}
}
}
// Close closes the TableBuilder.
func (b *Builder) Close() {}
// Empty returns whether it's empty.
func (b *Builder) Empty() bool { return b.sz == 0 }
// keyDiff returns a suffix of newKey that is different from b.baseKey.
func (b *Builder) keyDiff(newKey []byte) []byte {
var i int
for i = 0; i < len(newKey) && i < len(b.baseKey); i++ {
if newKey[i] != b.baseKey[i] {
break
}
}
return newKey[i:]
}
func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) {
b.keyHashes = append(b.keyHashes, farm.Fingerprint64(y.ParseKey(key)))
// diffKey stores the difference of key with baseKey.
var diffKey []byte
if len(b.baseKey) == 0 {
// Make a copy. Builder should not keep references. Otherwise, caller has to be very careful
// and will have to make copies of keys every time they add to builder, which is even worse.
b.baseKey = append(b.baseKey[:0], key...)
diffKey = key
} else {
diffKey = b.keyDiff(key)
}
h := header{
overlap: uint16(len(key) - len(diffKey)),
diff: uint16(len(diffKey)),
}
// store current entry's offset
y.AssertTrue(b.sz < math.MaxUint32)
b.entryOffsets = append(b.entryOffsets, b.sz-b.baseOffset)
// Layout: header, diffKey, value.
b.append(h.Encode())
b.append(diffKey)
if uint32(len(b.buf)) < b.sz+v.EncodedSize() {
b.grow(v.EncodedSize())
}
b.sz += v.Encode(b.buf[b.sz:])
// Size of KV on SST.
sstSz := uint64(uint32(headerSize) + uint32(len(diffKey)) + v.EncodedSize())
// Total estimated size = size on SST + size on vlog (length of value pointer).
b.tableIndex.EstimatedSize += (sstSz + vpLen)
}
// grow increases the size of b.buf by atleast 50%.
func (b *Builder) grow(n uint32) {
l := uint32(len(b.buf))
if n < l/2 {
n = l / 2
}
b.bufLock.Lock()
newBuf := make([]byte, l+n)
copy(newBuf, b.buf)
b.buf = newBuf
b.bufLock.Unlock()
}
func (b *Builder) append(data []byte) {
// Ensure we have enough space to store new data.
if uint32(len(b.buf)) < b.sz+uint32(len(data)) {
b.grow(uint32(len(data)))
}
copy(b.buf[b.sz:], data)
b.sz += uint32(len(data))
}
func (b *Builder) addPadding(sz uint32) {
if uint32(len(b.buf)) < b.sz+sz {
b.grow(sz)
}
b.sz += sz
}
/*
Structure of Block.
+-------------------+---------------------+--------------------+--------------+------------------+
| Entry1 | Entry2 | Entry3 | Entry4 | Entry5 |
+-------------------+---------------------+--------------------+--------------+------------------+
| Entry6 | ... | ... | ... | EntryN |
+-------------------+---------------------+--------------------+--------------+------------------+
| Block Meta(contains list of offsets used| Block Meta Size | Block | Checksum Size |
| to perform binary search in the block) | (4 Bytes) | Checksum | (4 Bytes) |
+-----------------------------------------+--------------------+--------------+------------------+
*/
// In case the data is encrypted, the "IV" is added to the end of the block.
func (b *Builder) finishBlock() {
b.append(y.U32SliceToBytes(b.entryOffsets))
b.append(y.U32ToBytes(uint32(len(b.entryOffsets))))
b.writeChecksum(b.buf[b.baseOffset:b.sz])
// If compression/encryption is disabled, no need to send the block to the blockChan.
// There's nothing to be done.
if b.blockChan == nil {
b.addBlockToIndex()
return
}
b.addPadding(padding)
// Block end is the actual end of the block ignoring the padding.
block := &bblock{start: b.baseOffset, end: uint32(b.sz - padding), data: b.buf}
b.blockList = append(b.blockList, block)
b.addBlockToIndex()
// Push to the block handler.
b.blockChan <- block
}
func (b *Builder) addBlockToIndex() {
blockBuf := b.buf[b.baseOffset:b.sz]
// Add key to the block index.
bo := &pb.BlockOffset{
Key: y.Copy(b.baseKey),
Offset: b.baseOffset,
Len: uint32(len(blockBuf)),
}
b.tableIndex.Offsets = append(b.tableIndex.Offsets, bo)
}
func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool {
// If there is no entry till now, we will return false.
if len(b.entryOffsets) <= 0 {
return false
}
// Integer overflow check for statements below.
y.AssertTrue((uint32(len(b.entryOffsets))+1)*4+4+8+4 < math.MaxUint32)
// We should include current entry also in size, that's why +1 to len(b.entryOffsets).
entriesOffsetsSize := uint32((len(b.entryOffsets)+1)*4 +
4 + // size of list
8 + // Sum64 in checksum proto
4) // checksum length
estimatedSize := uint32(b.sz) - b.baseOffset + uint32(6 /*header size for entry*/) +
uint32(len(key)) + uint32(value.EncodedSize()) + entriesOffsetsSize
if b.shouldEncrypt() {
// IV is added at the end of the block, while encrypting.
// So, size of IV is added to estimatedSize.
estimatedSize += aes.BlockSize
}
return estimatedSize > uint32(b.opt.BlockSize)
}
// Add adds a key-value pair to the block.
func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) {
if b.shouldFinishBlock(key, value) {
b.finishBlock()
// Start a new block. Initialize the block.
b.baseKey = []byte{}
y.AssertTrue(uint32(b.sz) < math.MaxUint32)
b.baseOffset = uint32((b.sz))
b.entryOffsets = b.entryOffsets[:0]
}
b.addHelper(key, value, uint64(valueLen))
}
// TODO: vvv this was the comment on ReachedCapacity.
// FinalSize returns the *rough* final size of the array, counting the header which is
// not yet written.
// TODO: Look into why there is a discrepancy. I suspect it is because of Write(empty, empty)
// at the end. The diff can vary.
// ReachedCapacity returns true if we... roughly (?) reached capacity?
func (b *Builder) ReachedCapacity(cap int64) bool {
blocksSize := b.sz + // length of current buffer
uint32(len(b.entryOffsets)*4) + // all entry offsets size
4 + // count of all entry offsets
8 + // checksum bytes
4 // checksum length
estimateSz := blocksSize +
4 + // Index length
5*(uint32(len(b.tableIndex.Offsets))) // approximate index size
return int64(estimateSz) > cap
}
// Finish finishes the table by appending the index.
/*
The table structure looks like
+---------+------------+-----------+---------------+
| Block 1 | Block 2 | Block 3 | Block 4 |
+---------+------------+-----------+---------------+
| Block 5 | Block 6 | Block ... | Block N |
+---------+------------+-----------+---------------+
| Index | Index Size | Checksum | Checksum Size |
+---------+------------+-----------+---------------+
*/
// In case the data is encrypted, the "IV" is added to the end of the index.
func (b *Builder) Finish() []byte {
bf := z.NewBloomFilter(float64(len(b.keyHashes)), b.opt.BloomFalsePositive)
for _, h := range b.keyHashes {
bf.Add(h)
}
// Add bloom filter to the index.
b.tableIndex.BloomFilter = bf.JSONMarshal()
b.finishBlock() // This will never start a new block.
if b.blockChan != nil {
close(b.blockChan)
}
// Wait for block handler to finish.
b.wg.Wait()
dst := b.buf
// Fix block boundaries. This includes moving the blocks so that we
// don't have any interleaving space between them.
if len(b.blockList) > 0 {
dstLen := uint32(0)
for i, bl := range b.blockList {
off := b.tableIndex.Offsets[i]
// Length of the block is end minus the start.
off.Len = bl.end - bl.start
// New offset of the block is the point in the main buffer till
// which we have written data.
off.Offset = dstLen
copy(dst[dstLen:], b.buf[bl.start:bl.end])
// New length is the start of the block plus its length.
dstLen = off.Offset + off.Len
}
// Start writing to the buffer from the point until which we have valid data.
// Fix the length because append and writeChecksum also rely on it.
b.sz = dstLen
}
index, err := proto.Marshal(b.tableIndex)
y.Check(err)
if b.shouldEncrypt() {
index, err = b.encrypt(index)
y.Check(err)
}
// Write index the buffer.
b.append(index)
b.append(y.U32ToBytes(uint32(len(index))))
b.writeChecksum(index)
return b.buf[:b.sz]
}
func (b *Builder) writeChecksum(data []byte) {
// Build checksum for the index.
checksum := pb.Checksum{
// TODO: The checksum type should be configurable from the
// options.
// We chose to use CRC32 as the default option because
// it performed better compared to xxHash64.
// See the BenchmarkChecksum in table_test.go file
// Size => 1024 B 2048 B
// CRC32 => 63.7 ns/op 112 ns/op
// xxHash64 => 87.5 ns/op 158 ns/op
Sum: y.CalculateChecksum(data, pb.Checksum_CRC32C),
Algo: pb.Checksum_CRC32C,
}
// Write checksum to the file.
chksum, err := proto.Marshal(&checksum)
y.Check(err)
b.append(chksum)
// Write checksum size.
b.append(y.U32ToBytes(uint32(len(chksum))))
}
// DataKey returns datakey of the builder.
func (b *Builder) DataKey() *pb.DataKey {
return b.opt.DataKey
}
// encrypt will encrypt the given data and appends IV to the end of the encrypted data.
// This should be only called only after checking shouldEncrypt method.
func (b *Builder) encrypt(data []byte) ([]byte, error) {
iv, err := y.GenerateIV()
if err != nil {
return data, y.Wrapf(err, "Error while generating IV in Builder.encrypt")
}
data, err = y.XORBlock(data, b.DataKey().Data, iv)
if err != nil {
return data, y.Wrapf(err, "Error while encrypting in Builder.encrypt")
}
data = append(data, iv...)
return data, nil
}
// shouldEncrypt tells us whether to encrypt the data or not.
// We encrypt only if the data key exist. Otherwise, not.
func (b *Builder) shouldEncrypt() bool {
return b.opt.DataKey != nil
}
// compressData compresses the given data.
func (b *Builder) compressData(dst, data []byte) ([]byte, error) {
switch b.opt.Compression {
case options.None:
return data, nil
case options.Snappy:
return snappy.Encode(dst, data), nil
case options.ZSTD:
return y.ZSTDCompress(dst, data, b.opt.ZSTDCompressionLevel)
}
return nil, errors.New("Unsupported compression type")
}