| /* |
| * |
| * Copyright 2022 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package test |
| |
| import ( |
| "context" |
| "fmt" |
| "strings" |
| "testing" |
| "time" |
| |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/connectivity" |
| "google.golang.org/grpc/credentials/insecure" |
| "google.golang.org/grpc/internal" |
| "google.golang.org/grpc/internal/channelz" |
| "google.golang.org/grpc/internal/grpcsync" |
| "google.golang.org/grpc/internal/stubserver" |
| "google.golang.org/grpc/internal/testutils" |
| testgrpc "google.golang.org/grpc/interop/grpc_testing" |
| testpb "google.golang.org/grpc/interop/grpc_testing" |
| "google.golang.org/grpc/resolver" |
| "google.golang.org/grpc/resolver/manual" |
| "google.golang.org/grpc/stats" |
| "google.golang.org/grpc/status" |
| ) |
| |
| // TestClientConnClose_WithPendingRPC tests the scenario where the channel has |
| // not yet received any update from the name resolver and hence RPCs are |
| // blocking. The test verifies that closing the ClientConn unblocks the RPC with |
| // the expected error code. |
| func (s) TestClientConnClose_WithPendingRPC(t *testing.T) { |
| r := manual.NewBuilderWithScheme("whatever") |
| cc, err := grpc.NewClient(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) |
| if err != nil { |
| t.Fatalf("grpc.NewClient() failed: %v", err) |
| } |
| client := testgrpc.NewTestServiceClient(cc) |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| doneErrCh := make(chan error, 1) |
| go func() { |
| // This RPC would block until the ClientConn is closed, because the |
| // resolver has not provided its first update yet. |
| _, err := client.EmptyCall(ctx, &testpb.Empty{}) |
| if status.Code(err) != codes.Canceled || !strings.Contains(err.Error(), "client connection is closing") { |
| doneErrCh <- fmt.Errorf("EmptyCall() = %v, want %s", err, codes.Canceled) |
| } |
| doneErrCh <- nil |
| }() |
| |
| // Make sure that there is one pending RPC on the ClientConn before attempting |
| // to close it. If we don't do this, cc.Close() can happen before the above |
| // goroutine gets to make the RPC. |
| for { |
| if err := ctx.Err(); err != nil { |
| t.Fatal(err) |
| } |
| tcs, _ := channelz.GetTopChannels(0, 0) |
| if len(tcs) != 1 { |
| t.Fatalf("there should only be one top channel, not %d", len(tcs)) |
| } |
| started := tcs[0].ChannelMetrics.CallsStarted.Load() |
| completed := tcs[0].ChannelMetrics.CallsSucceeded.Load() + tcs[0].ChannelMetrics.CallsFailed.Load() |
| if (started - completed) == 1 { |
| break |
| } |
| time.Sleep(defaultTestShortTimeout) |
| } |
| cc.Close() |
| if err := <-doneErrCh; err != nil { |
| t.Fatal(err) |
| } |
| } |
| |
| type testStatsHandler struct { |
| nameResolutionDelayed bool |
| } |
| |
| // TagRPC is called when an RPC is initiated and allows adding metadata to the |
| // context. It checks if the RPC experienced a name resolution delay and |
| // updates the handler's state. |
| func (h *testStatsHandler) TagRPC(ctx context.Context, rpcInfo *stats.RPCTagInfo) context.Context { |
| h.nameResolutionDelayed = rpcInfo.NameResolutionDelay |
| return ctx |
| } |
| |
| // This method is required to satisfy the stats.Handler interface. |
| func (h *testStatsHandler) HandleRPC(_ context.Context, _ stats.RPCStats) {} |
| |
| // TagConn exists to satisfy stats.Handler. |
| func (h *testStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { |
| return ctx |
| } |
| |
| // HandleConn exists to satisfy stats.Handler. |
| func (h *testStatsHandler) HandleConn(_ context.Context, _ stats.ConnStats) {} |
| |
| // TestClientConnRPC_WithoutNameResolutionDelay verify that if the resolution |
| // has already happened once before at the time of making RPC, the name |
| // resolution flag is not set indicating there was no delay in name resolution. |
| func (s) TestClientConnRPC_WithoutNameResolutionDelay(t *testing.T) { |
| statsHandler := &testStatsHandler{} |
| ss := &stubserver.StubServer{ |
| EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { |
| return &testpb.Empty{}, nil |
| }, |
| } |
| if err := ss.Start(nil, grpc.WithStatsHandler(statsHandler)); err != nil { |
| t.Fatalf("Failed to start StubServer: %v", err) |
| } |
| defer ss.Stop() |
| |
| rb := manual.NewBuilderWithScheme("instant") |
| rb.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}}) |
| cc := ss.CC |
| defer cc.Close() |
| |
| cc.Connect() |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| testutils.AwaitState(ctx, t, cc, connectivity.Ready) |
| client := testgrpc.NewTestServiceClient(cc) |
| // Verify that the RPC succeeds. |
| if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { |
| t.Fatalf("First RPC failed unexpectedly: %v", err) |
| } |
| // verifying that RPC was not blocked on resolver indicating there was no |
| // delay in name resolution. |
| if statsHandler.nameResolutionDelayed { |
| t.Fatalf("statsHandler.nameResolutionDelayed = %v; want false", statsHandler.nameResolutionDelayed) |
| } |
| } |
| |
| // TestStatsHandlerDetectsResolutionDelay verifies that if this is the |
| // first time resolution is happening at the time of making RPC, |
| // nameResolutionDelayed flag is set indicating there was a delay in name |
| // resolution waiting for resolver to return addresses. |
| func (s) TestClientConnRPC_WithNameResolutionDelay(t *testing.T) { |
| resolutionWait := grpcsync.NewEvent() |
| prevHook := internal.NewStreamWaitingForResolver |
| internal.NewStreamWaitingForResolver = func() { resolutionWait.Fire() } |
| defer func() { internal.NewStreamWaitingForResolver = prevHook }() |
| |
| ss := &stubserver.StubServer{ |
| EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { |
| return &testpb.Empty{}, nil |
| }, |
| } |
| if err := ss.Start(nil); err != nil { |
| t.Fatalf("Failed to start StubServer: %v", err) |
| } |
| defer ss.Stop() |
| |
| statsHandler := &testStatsHandler{} |
| rb := manual.NewBuilderWithScheme("delayed") |
| cc, err := grpc.NewClient(rb.Scheme()+":///test.server", |
| grpc.WithTransportCredentials(insecure.NewCredentials()), |
| grpc.WithResolvers(rb), |
| grpc.WithStatsHandler(statsHandler), |
| ) |
| if err != nil { |
| t.Fatalf("grpc.NewClient() failed: %v", err) |
| } |
| defer cc.Close() |
| go func() { |
| <-resolutionWait.Done() |
| rb.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}}) |
| }() |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| client := testgrpc.NewTestServiceClient(cc) |
| if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { |
| t.Fatalf("EmptyCall RPC failed: %v", err) |
| } |
| if !statsHandler.nameResolutionDelayed { |
| t.Fatalf("statsHandler.nameResolutionDelayed = %v; want true", statsHandler.nameResolutionDelayed) |
| } |
| } |