blob: c29622439125a3938aed39fbb554f0a61f65f61e [file] [log] [blame]
/*
* Copyright 2019 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package badger
import (
"context"
"fmt"
"github.com/pkg/errors"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/dgraph-io/badger/v3/pb"
)
// This test will result in deadlock for commits before this.
// Exiting this test gracefully will be the proof that the
// publisher is no longer stuck in deadlock.
func TestPublisherDeadlock(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
var subWg sync.WaitGroup
subWg.Add(1)
var firstUpdate sync.WaitGroup
firstUpdate.Add(1)
var subDone sync.WaitGroup
subDone.Add(1)
go func() {
subWg.Done()
match := pb.Match{Prefix: []byte("ke"), IgnoreBytes: ""}
err := db.Subscribe(context.Background(), func(kvs *pb.KVList) error {
firstUpdate.Done()
time.Sleep(time.Second * 20)
return errors.New("error returned")
}, []pb.Match{match})
require.Error(t, err, errors.New("error returned"))
subDone.Done()
}()
subWg.Wait()
go func() {
err := db.Update(func(txn *Txn) error {
e := NewEntry([]byte(fmt.Sprintf("key%d", 0)), []byte(fmt.Sprintf("value%d", 0)))
return txn.SetEntry(e)
})
require.NoError(t, err)
} ()
firstUpdate.Wait()
req := int64(0)
for i := 1; i < 1110; i++ {
time.Sleep(time.Millisecond * 10)
go func(i int) {
err := db.Update(func(txn *Txn) error {
e := NewEntry([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i)))
return txn.SetEntry(e)
})
require.NoError(t, err)
atomic.AddInt64(&req, 1)
}(i)
}
for {
if atomic.LoadInt64(&req) == 1109 {
break
}
time.Sleep(time.Second)
}
subDone.Wait()
})
}
func TestPublisherOrdering(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
order := []string{}
var wg sync.WaitGroup
wg.Add(1)
var subWg sync.WaitGroup
subWg.Add(1)
go func() {
subWg.Done()
updates := 0
match := pb.Match{Prefix: []byte("ke"), IgnoreBytes: ""}
err := db.Subscribe(context.Background(), func(kvs *pb.KVList) error {
updates += len(kvs.GetKv())
for _, kv := range kvs.GetKv() {
order = append(order, string(kv.Value))
}
if updates == 5 {
wg.Done()
}
return nil
}, []pb.Match{match})
if err != nil {
require.Equal(t, err.Error(), context.Canceled.Error())
}
}()
subWg.Wait()
for i := 0; i < 5; i++ {
db.Update(func(txn *Txn) error {
e := NewEntry([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i)))
return txn.SetEntry(e)
})
}
wg.Wait()
for i := 0; i < 5; i++ {
require.Equal(t, fmt.Sprintf("value%d", i), order[i])
}
})
}
func TestMultiplePrefix(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
var wg sync.WaitGroup
wg.Add(1)
var subWg sync.WaitGroup
subWg.Add(1)
go func() {
subWg.Done()
updates := 0
match1 := pb.Match{Prefix: []byte("ke"), IgnoreBytes: ""}
match2 := pb.Match{Prefix: []byte("hel"), IgnoreBytes: ""}
err := db.Subscribe(context.Background(), func(kvs *pb.KVList) error {
updates += len(kvs.GetKv())
for _, kv := range kvs.GetKv() {
if string(kv.Key) == "key" {
require.Equal(t, string(kv.Value), "value")
} else {
require.Equal(t, string(kv.Value), "badger")
}
}
if updates == 2 {
wg.Done()
}
return nil
}, []pb.Match{match1, match2})
if err != nil {
require.Equal(t, err.Error(), context.Canceled.Error())
}
}()
subWg.Wait()
db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry([]byte("key"), []byte("value")))
})
db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry([]byte("hello"), []byte("badger")))
})
wg.Wait()
})
}