| /* |
| * |
| * Copyright 2023 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package grpcsync |
| |
| import ( |
| "context" |
| "sync" |
| "testing" |
| "time" |
| ) |
| |
| type testSubscriber struct { |
| onMsgCh chan int |
| } |
| |
| func newTestSubscriber(chSize int) *testSubscriber { |
| return &testSubscriber{onMsgCh: make(chan int, chSize)} |
| } |
| |
| func (ts *testSubscriber) OnMessage(msg any) { |
| select { |
| case ts.onMsgCh <- msg.(int): |
| default: |
| } |
| } |
| |
| func (s) TestPubSub_PublishNoMsg(t *testing.T) { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| pubsub := NewPubSub(ctx) |
| |
| ts := newTestSubscriber(1) |
| pubsub.Subscribe(ts) |
| |
| select { |
| case <-ts.onMsgCh: |
| t.Fatal("Subscriber callback invoked when no message was published") |
| case <-time.After(defaultTestShortTimeout): |
| } |
| } |
| |
| func (s) TestPubSub_PublishMsgs_RegisterSubs_And_Stop(t *testing.T) { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| pubsub := NewPubSub(ctx) |
| |
| const numPublished = 10 |
| |
| ts1 := newTestSubscriber(numPublished) |
| pubsub.Subscribe(ts1) |
| |
| var wg sync.WaitGroup |
| wg.Add(2) |
| // Publish ten messages on the pubsub and ensure that they are received in order by the subscriber. |
| go func() { |
| for i := 0; i < numPublished; i++ { |
| pubsub.Publish(i) |
| } |
| wg.Done() |
| }() |
| |
| go func() { |
| defer wg.Done() |
| for i := 0; i < numPublished; i++ { |
| select { |
| case m := <-ts1.onMsgCh: |
| if m != i { |
| t.Errorf("Received unexpected message: %q; want: %q", m, i) |
| return |
| } |
| case <-time.After(defaultTestTimeout): |
| t.Error("Timeout when expecting the onMessage() callback to be invoked") |
| return |
| } |
| } |
| }() |
| wg.Wait() |
| if t.Failed() { |
| t.FailNow() |
| } |
| |
| // Register another subscriber and ensure that it receives the last published message. |
| ts2 := newTestSubscriber(numPublished) |
| pubsub.Subscribe(ts2) |
| |
| select { |
| case m := <-ts2.onMsgCh: |
| if m != numPublished-1 { |
| t.Fatalf("Received unexpected message: %q; want: %q", m, numPublished-1) |
| } |
| case <-time.After(defaultTestShortTimeout): |
| t.Fatal("Timeout when expecting the onMessage() callback to be invoked") |
| } |
| |
| wg.Add(3) |
| // Publish ten messages on the pubsub and ensure that they are received in order by the subscribers. |
| go func() { |
| for i := 0; i < numPublished; i++ { |
| pubsub.Publish(i) |
| } |
| wg.Done() |
| }() |
| go func() { |
| defer wg.Done() |
| for i := 0; i < numPublished; i++ { |
| select { |
| case m := <-ts1.onMsgCh: |
| if m != i { |
| t.Errorf("Received unexpected message: %q; want: %q", m, i) |
| return |
| } |
| case <-time.After(defaultTestTimeout): |
| t.Error("Timeout when expecting the onMessage() callback to be invoked") |
| return |
| } |
| } |
| |
| }() |
| go func() { |
| defer wg.Done() |
| for i := 0; i < numPublished; i++ { |
| select { |
| case m := <-ts2.onMsgCh: |
| if m != i { |
| t.Errorf("Received unexpected message: %q; want: %q", m, i) |
| return |
| } |
| case <-time.After(defaultTestTimeout): |
| t.Error("Timeout when expecting the onMessage() callback to be invoked") |
| return |
| } |
| } |
| }() |
| wg.Wait() |
| if t.Failed() { |
| t.FailNow() |
| } |
| |
| cancel() |
| <-pubsub.Done() |
| |
| go func() { |
| pubsub.Publish(99) |
| }() |
| // Ensure that the subscriber callback is not invoked as instantiated |
| // pubsub has already closed. |
| select { |
| case <-ts1.onMsgCh: |
| t.Fatal("The callback was invoked after pubsub being stopped") |
| case <-ts2.onMsgCh: |
| t.Fatal("The callback was invoked after pubsub being stopped") |
| case <-time.After(defaultTestShortTimeout): |
| } |
| } |
| |
| func (s) TestPubSub_PublishMsgs_BeforeRegisterSub(t *testing.T) { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| pubsub := NewPubSub(ctx) |
| |
| const numPublished = 3 |
| for i := 0; i < numPublished; i++ { |
| pubsub.Publish(i) |
| } |
| |
| ts := newTestSubscriber(numPublished) |
| pubsub.Subscribe(ts) |
| |
| // Ensure that the subscriber callback is invoked with a previously |
| // published message. |
| select { |
| case d := <-ts.onMsgCh: |
| if d != numPublished-1 { |
| t.Fatalf("Unexpected message received: %q; %q", d, numPublished-1) |
| } |
| |
| case <-time.After(defaultTestShortTimeout): |
| t.Fatal("Timeout when expecting the onMessage() callback to be invoked") |
| } |
| } |