| /* |
| * Copyright 2017 Dgraph Labs, Inc. and Contributors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package table |
| |
| import ( |
| "bytes" |
| "io" |
| "sort" |
| |
| "github.com/dgraph-io/badger/v2/y" |
| "github.com/pkg/errors" |
| ) |
| |
| type blockIterator struct { |
| data []byte |
| idx int // Idx of the entry inside a block |
| err error |
| baseKey []byte |
| key []byte |
| val []byte |
| entryOffsets []uint32 |
| block *block |
| |
| // prevOverlap stores the overlap of the previous key with the base key. |
| // This avoids unnecessary copy of base key when the overlap is same for multiple keys. |
| prevOverlap uint16 |
| } |
| |
| func (itr *blockIterator) setBlock(b *block) { |
| // Decrement the ref for the old block. If the old block was compressed, we |
| // might be able to reuse it. |
| itr.block.decrRef() |
| |
| itr.block = b |
| itr.err = nil |
| itr.idx = 0 |
| itr.baseKey = itr.baseKey[:0] |
| itr.prevOverlap = 0 |
| itr.key = itr.key[:0] |
| itr.val = itr.val[:0] |
| // Drop the index from the block. We don't need it anymore. |
| itr.data = b.data[:b.entriesIndexStart] |
| itr.entryOffsets = b.entryOffsets |
| } |
| |
| // setIdx sets the iterator to the entry at index i and set it's key and value. |
| func (itr *blockIterator) setIdx(i int) { |
| itr.idx = i |
| if i >= len(itr.entryOffsets) || i < 0 { |
| itr.err = io.EOF |
| return |
| } |
| itr.err = nil |
| startOffset := int(itr.entryOffsets[i]) |
| |
| // Set base key. |
| if len(itr.baseKey) == 0 { |
| var baseHeader header |
| baseHeader.Decode(itr.data) |
| itr.baseKey = itr.data[headerSize : headerSize+baseHeader.diff] |
| } |
| |
| var endOffset int |
| // idx points to the last entry in the block. |
| if itr.idx+1 == len(itr.entryOffsets) { |
| endOffset = len(itr.data) |
| } else { |
| // idx point to some entry other than the last one in the block. |
| // EndOffset of the current entry is the start offset of the next entry. |
| endOffset = int(itr.entryOffsets[itr.idx+1]) |
| } |
| |
| entryData := itr.data[startOffset:endOffset] |
| var h header |
| h.Decode(entryData) |
| // Header contains the length of key overlap and difference compared to the base key. If the key |
| // before this one had the same or better key overlap, we can avoid copying that part into |
| // itr.key. But, if the overlap was lesser, we could copy over just that portion. |
| if h.overlap > itr.prevOverlap { |
| itr.key = append(itr.key[:itr.prevOverlap], itr.baseKey[itr.prevOverlap:h.overlap]...) |
| } |
| itr.prevOverlap = h.overlap |
| valueOff := headerSize + h.diff |
| diffKey := entryData[headerSize:valueOff] |
| itr.key = append(itr.key[:h.overlap], diffKey...) |
| itr.val = entryData[valueOff:] |
| } |
| |
| func (itr *blockIterator) Valid() bool { |
| return itr != nil && itr.err == nil |
| } |
| |
| func (itr *blockIterator) Error() error { |
| return itr.err |
| } |
| |
| func (itr *blockIterator) Close() { |
| itr.block.decrRef() |
| } |
| |
| var ( |
| origin = 0 |
| current = 1 |
| ) |
| |
| // seek brings us to the first block element that is >= input key. |
| func (itr *blockIterator) seek(key []byte, whence int) { |
| itr.err = nil |
| startIndex := 0 // This tells from which index we should start binary search. |
| |
| switch whence { |
| case origin: |
| // We don't need to do anything. startIndex is already at 0 |
| case current: |
| startIndex = itr.idx |
| } |
| |
| foundEntryIdx := sort.Search(len(itr.entryOffsets), func(idx int) bool { |
| // If idx is less than start index then just return false. |
| if idx < startIndex { |
| return false |
| } |
| itr.setIdx(idx) |
| return y.CompareKeys(itr.key, key) >= 0 |
| }) |
| itr.setIdx(foundEntryIdx) |
| } |
| |
| // seekToFirst brings us to the first element. |
| func (itr *blockIterator) seekToFirst() { |
| itr.setIdx(0) |
| } |
| |
| // seekToLast brings us to the last element. |
| func (itr *blockIterator) seekToLast() { |
| itr.setIdx(len(itr.entryOffsets) - 1) |
| } |
| |
| func (itr *blockIterator) next() { |
| itr.setIdx(itr.idx + 1) |
| } |
| |
| func (itr *blockIterator) prev() { |
| itr.setIdx(itr.idx - 1) |
| } |
| |
| // Iterator is an iterator for a Table. |
| type Iterator struct { |
| t *Table |
| bpos int |
| bi blockIterator |
| err error |
| |
| // Internally, Iterator is bidirectional. However, we only expose the |
| // unidirectional functionality for now. |
| opt int // Valid options are REVERSED and NOCACHE. |
| } |
| |
| // NewIterator returns a new iterator of the Table |
| func (t *Table) NewIterator(opt int) *Iterator { |
| t.IncrRef() // Important. |
| ti := &Iterator{t: t, opt: opt} |
| ti.next() |
| return ti |
| } |
| |
| // Close closes the iterator (and it must be called). |
| func (itr *Iterator) Close() error { |
| itr.bi.Close() |
| return itr.t.DecrRef() |
| } |
| |
| func (itr *Iterator) reset() { |
| itr.bpos = 0 |
| itr.err = nil |
| } |
| |
| // Valid follows the y.Iterator interface |
| func (itr *Iterator) Valid() bool { |
| return itr.err == nil |
| } |
| |
| func (itr *Iterator) useCache() bool { |
| return itr.opt&NOCACHE == 0 |
| } |
| |
| func (itr *Iterator) seekToFirst() { |
| numBlocks := itr.t.noOfBlocks |
| if numBlocks == 0 { |
| itr.err = io.EOF |
| return |
| } |
| itr.bpos = 0 |
| block, err := itr.t.block(itr.bpos, itr.useCache()) |
| if err != nil { |
| itr.err = err |
| return |
| } |
| itr.bi.setBlock(block) |
| itr.bi.seekToFirst() |
| itr.err = itr.bi.Error() |
| } |
| |
| func (itr *Iterator) seekToLast() { |
| numBlocks := itr.t.noOfBlocks |
| if numBlocks == 0 { |
| itr.err = io.EOF |
| return |
| } |
| itr.bpos = numBlocks - 1 |
| block, err := itr.t.block(itr.bpos, itr.useCache()) |
| if err != nil { |
| itr.err = err |
| return |
| } |
| itr.bi.setBlock(block) |
| itr.bi.seekToLast() |
| itr.err = itr.bi.Error() |
| } |
| |
| func (itr *Iterator) seekHelper(blockIdx int, key []byte) { |
| itr.bpos = blockIdx |
| block, err := itr.t.block(blockIdx, itr.useCache()) |
| if err != nil { |
| itr.err = err |
| return |
| } |
| itr.bi.setBlock(block) |
| itr.bi.seek(key, origin) |
| itr.err = itr.bi.Error() |
| } |
| |
| // seekFrom brings us to a key that is >= input key. |
| func (itr *Iterator) seekFrom(key []byte, whence int) { |
| itr.err = nil |
| switch whence { |
| case origin: |
| itr.reset() |
| case current: |
| } |
| |
| idx := sort.Search(itr.t.noOfBlocks, func(idx int) bool { |
| ko := itr.t.blockOffsets()[idx] |
| return y.CompareKeys(ko.Key, key) > 0 |
| }) |
| if idx == 0 { |
| // The smallest key in our table is already strictly > key. We can return that. |
| // This is like a SeekToFirst. |
| itr.seekHelper(0, key) |
| return |
| } |
| |
| // block[idx].smallest is > key. |
| // Since idx>0, we know block[idx-1].smallest is <= key. |
| // There are two cases. |
| // 1) Everything in block[idx-1] is strictly < key. In this case, we should go to the first |
| // element of block[idx]. |
| // 2) Some element in block[idx-1] is >= key. We should go to that element. |
| itr.seekHelper(idx-1, key) |
| if itr.err == io.EOF { |
| // Case 1. Need to visit block[idx]. |
| if idx == itr.t.noOfBlocks { |
| // If idx == len(itr.t.blockIndex), then input key is greater than ANY element of table. |
| // There's nothing we can do. Valid() should return false as we seek to end of table. |
| return |
| } |
| // Since block[idx].smallest is > key. This is essentially a block[idx].SeekToFirst. |
| itr.seekHelper(idx, key) |
| } |
| // Case 2: No need to do anything. We already did the seek in block[idx-1]. |
| } |
| |
| // seek will reset iterator and seek to >= key. |
| func (itr *Iterator) seek(key []byte) { |
| itr.seekFrom(key, origin) |
| } |
| |
| // seekForPrev will reset iterator and seek to <= key. |
| func (itr *Iterator) seekForPrev(key []byte) { |
| // TODO: Optimize this. We shouldn't have to take a Prev step. |
| itr.seekFrom(key, origin) |
| if !bytes.Equal(itr.Key(), key) { |
| itr.prev() |
| } |
| } |
| |
| func (itr *Iterator) next() { |
| itr.err = nil |
| |
| if itr.bpos >= itr.t.noOfBlocks { |
| itr.err = io.EOF |
| return |
| } |
| |
| if len(itr.bi.data) == 0 { |
| block, err := itr.t.block(itr.bpos, itr.useCache()) |
| if err != nil { |
| itr.err = err |
| return |
| } |
| itr.bi.setBlock(block) |
| itr.bi.seekToFirst() |
| itr.err = itr.bi.Error() |
| return |
| } |
| |
| itr.bi.next() |
| if !itr.bi.Valid() { |
| itr.bpos++ |
| itr.bi.data = nil |
| itr.next() |
| return |
| } |
| } |
| |
| func (itr *Iterator) prev() { |
| itr.err = nil |
| if itr.bpos < 0 { |
| itr.err = io.EOF |
| return |
| } |
| |
| if len(itr.bi.data) == 0 { |
| block, err := itr.t.block(itr.bpos, itr.useCache()) |
| if err != nil { |
| itr.err = err |
| return |
| } |
| itr.bi.setBlock(block) |
| itr.bi.seekToLast() |
| itr.err = itr.bi.Error() |
| return |
| } |
| |
| itr.bi.prev() |
| if !itr.bi.Valid() { |
| itr.bpos-- |
| itr.bi.data = nil |
| itr.prev() |
| return |
| } |
| } |
| |
| // Key follows the y.Iterator interface. |
| // Returns the key with timestamp. |
| func (itr *Iterator) Key() []byte { |
| return itr.bi.key |
| } |
| |
| // Value follows the y.Iterator interface |
| func (itr *Iterator) Value() (ret y.ValueStruct) { |
| ret.Decode(itr.bi.val) |
| return |
| } |
| |
| // ValueCopy copies the current value and returns it as decoded |
| // ValueStruct. |
| func (itr *Iterator) ValueCopy() (ret y.ValueStruct) { |
| dst := y.Copy(itr.bi.val) |
| ret.Decode(dst) |
| return |
| } |
| |
| // Next follows the y.Iterator interface |
| func (itr *Iterator) Next() { |
| if itr.opt&REVERSED == 0 { |
| itr.next() |
| } else { |
| itr.prev() |
| } |
| } |
| |
| // Rewind follows the y.Iterator interface |
| func (itr *Iterator) Rewind() { |
| if itr.opt&REVERSED == 0 { |
| itr.seekToFirst() |
| } else { |
| itr.seekToLast() |
| } |
| } |
| |
| // Seek follows the y.Iterator interface |
| func (itr *Iterator) Seek(key []byte) { |
| if itr.opt&REVERSED == 0 { |
| itr.seek(key) |
| } else { |
| itr.seekForPrev(key) |
| } |
| } |
| |
| var ( |
| REVERSED int = 2 |
| NOCACHE int = 4 |
| ) |
| |
| // ConcatIterator concatenates the sequences defined by several iterators. (It only works with |
| // TableIterators, probably just because it's faster to not be so generic.) |
| type ConcatIterator struct { |
| idx int // Which iterator is active now. |
| cur *Iterator |
| iters []*Iterator // Corresponds to tables. |
| tables []*Table // Disregarding reversed, this is in ascending order. |
| options int // Valid options are REVERSED and NOCACHE. |
| } |
| |
| // NewConcatIterator creates a new concatenated iterator |
| func NewConcatIterator(tbls []*Table, opt int) *ConcatIterator { |
| iters := make([]*Iterator, len(tbls)) |
| for i := 0; i < len(tbls); i++ { |
| // Increment the reference count. Since, we're not creating the iterator right now. |
| // Here, We'll hold the reference of the tables, till the lifecycle of the iterator. |
| tbls[i].IncrRef() |
| |
| // Save cycles by not initializing the iterators until needed. |
| // iters[i] = tbls[i].NewIterator(reversed) |
| } |
| return &ConcatIterator{ |
| options: opt, |
| iters: iters, |
| tables: tbls, |
| idx: -1, // Not really necessary because s.it.Valid()=false, but good to have. |
| } |
| } |
| |
| func (s *ConcatIterator) setIdx(idx int) { |
| s.idx = idx |
| if idx < 0 || idx >= len(s.iters) { |
| s.cur = nil |
| return |
| } |
| if s.iters[idx] == nil { |
| s.iters[idx] = s.tables[idx].NewIterator(s.options) |
| } |
| s.cur = s.iters[s.idx] |
| } |
| |
| // Rewind implements y.Interface |
| func (s *ConcatIterator) Rewind() { |
| if len(s.iters) == 0 { |
| return |
| } |
| if s.options&REVERSED == 0 { |
| s.setIdx(0) |
| } else { |
| s.setIdx(len(s.iters) - 1) |
| } |
| s.cur.Rewind() |
| } |
| |
| // Valid implements y.Interface |
| func (s *ConcatIterator) Valid() bool { |
| return s.cur != nil && s.cur.Valid() |
| } |
| |
| // Key implements y.Interface |
| func (s *ConcatIterator) Key() []byte { |
| return s.cur.Key() |
| } |
| |
| // Value implements y.Interface |
| func (s *ConcatIterator) Value() y.ValueStruct { |
| return s.cur.Value() |
| } |
| |
| // Seek brings us to element >= key if reversed is false. Otherwise, <= key. |
| func (s *ConcatIterator) Seek(key []byte) { |
| var idx int |
| if s.options&REVERSED == 0 { |
| idx = sort.Search(len(s.tables), func(i int) bool { |
| return y.CompareKeys(s.tables[i].Biggest(), key) >= 0 |
| }) |
| } else { |
| n := len(s.tables) |
| idx = n - 1 - sort.Search(n, func(i int) bool { |
| return y.CompareKeys(s.tables[n-1-i].Smallest(), key) <= 0 |
| }) |
| } |
| if idx >= len(s.tables) || idx < 0 { |
| s.setIdx(-1) |
| return |
| } |
| // For reversed=false, we know s.tables[i-1].Biggest() < key. Thus, the |
| // previous table cannot possibly contain key. |
| s.setIdx(idx) |
| s.cur.Seek(key) |
| } |
| |
| // Next advances our concat iterator. |
| func (s *ConcatIterator) Next() { |
| s.cur.Next() |
| if s.cur.Valid() { |
| // Nothing to do. Just stay with the current table. |
| return |
| } |
| for { // In case there are empty tables. |
| if s.options&REVERSED == 0 { |
| s.setIdx(s.idx + 1) |
| } else { |
| s.setIdx(s.idx - 1) |
| } |
| if s.cur == nil { |
| // End of list. Valid will become false. |
| return |
| } |
| s.cur.Rewind() |
| if s.cur.Valid() { |
| break |
| } |
| } |
| } |
| |
| // Close implements y.Interface. |
| func (s *ConcatIterator) Close() error { |
| for _, t := range s.tables { |
| // DeReference the tables while closing the iterator. |
| if err := t.DecrRef(); err != nil { |
| return err |
| } |
| } |
| for _, it := range s.iters { |
| if it == nil { |
| continue |
| } |
| if err := it.Close(); err != nil { |
| return errors.Wrap(err, "ConcatIterator") |
| } |
| } |
| return nil |
| } |