| /* |
| * |
| * Copyright 2024 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| // Package delegatingresolver implements a resolver capable of resolving both |
| // target URIs and proxy addresses. |
| package delegatingresolver |
| |
| import ( |
| "fmt" |
| "net" |
| "net/http" |
| "net/url" |
| "sync" |
| |
| "google.golang.org/grpc/grpclog" |
| "google.golang.org/grpc/internal/envconfig" |
| "google.golang.org/grpc/internal/proxyattributes" |
| "google.golang.org/grpc/internal/transport" |
| "google.golang.org/grpc/internal/transport/networktype" |
| "google.golang.org/grpc/resolver" |
| "google.golang.org/grpc/serviceconfig" |
| ) |
| |
| var ( |
| logger = grpclog.Component("delegating-resolver") |
| // HTTPSProxyFromEnvironment will be overwritten in the tests |
| HTTPSProxyFromEnvironment = http.ProxyFromEnvironment |
| ) |
| |
| const defaultPort = "443" |
| |
| // delegatingResolver manages both target URI and proxy address resolution by |
| // delegating these tasks to separate child resolvers. Essentially, it acts as |
| // an intermediary between the gRPC ClientConn and the child resolvers. |
| // |
| // It implements the [resolver.Resolver] interface. |
| type delegatingResolver struct { |
| target resolver.Target // parsed target URI to be resolved |
| cc resolver.ClientConn // gRPC ClientConn |
| proxyURL *url.URL // proxy URL, derived from proxy environment and target |
| |
| // We do not hold both mu and childMu in the same goroutine. Avoid holding |
| // both locks when calling into the child, as the child resolver may |
| // synchronously callback into the channel. |
| mu sync.Mutex // protects all the fields below |
| targetResolverState *resolver.State // state of the target resolver |
| proxyAddrs []resolver.Address // resolved proxy addresses; empty if no proxy is configured |
| |
| // childMu serializes calls into child resolvers. It also protects access to |
| // the following fields. |
| childMu sync.Mutex |
| targetResolver resolver.Resolver // resolver for the target URI, based on its scheme |
| proxyResolver resolver.Resolver // resolver for the proxy URI; nil if no proxy is configured |
| } |
| |
| // nopResolver is a resolver that does nothing. |
| type nopResolver struct{} |
| |
| func (nopResolver) ResolveNow(resolver.ResolveNowOptions) {} |
| |
| func (nopResolver) Close() {} |
| |
| // proxyURLForTarget determines the proxy URL for the given address based on the |
| // environment. It can return the following: |
| // - nil URL, nil error: No proxy is configured or the address is excluded |
| // using the `NO_PROXY` environment variable or if req.URL.Host is |
| // "localhost" (with or without // a port number) |
| // - nil URL, non-nil error: An error occurred while retrieving the proxy URL. |
| // - non-nil URL, nil error: A proxy is configured, and the proxy URL was |
| // retrieved successfully without any errors. |
| func proxyURLForTarget(address string) (*url.URL, error) { |
| req := &http.Request{URL: &url.URL{ |
| Scheme: "https", |
| Host: address, |
| }} |
| return HTTPSProxyFromEnvironment(req) |
| } |
| |
| // New creates a new delegating resolver that can create up to two child |
| // resolvers: |
| // - one to resolve the proxy address specified using the supported |
| // environment variables. This uses the registered resolver for the "dns" |
| // scheme. It is lazily built when a target resolver update contains at least |
| // one TCP address. |
| // - one to resolve the target URI using the resolver specified by the scheme |
| // in the target URI or specified by the user using the WithResolvers dial |
| // option. As a special case, if the target URI's scheme is "dns" and a |
| // proxy is specified using the supported environment variables, the target |
| // URI's path portion is used as the resolved address unless target |
| // resolution is enabled using the dial option. |
| func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions, targetResolverBuilder resolver.Builder, targetResolutionEnabled bool) (resolver.Resolver, error) { |
| r := &delegatingResolver{ |
| target: target, |
| cc: cc, |
| proxyResolver: nopResolver{}, |
| targetResolver: nopResolver{}, |
| } |
| |
| addr := target.Endpoint() |
| var err error |
| if target.URL.Scheme == "dns" && !targetResolutionEnabled && envconfig.EnableDefaultPortForProxyTarget { |
| addr, err = parseTarget(addr) |
| if err != nil { |
| return nil, fmt.Errorf("delegating_resolver: invalid target address %q: %v", target.Endpoint(), err) |
| } |
| } |
| |
| r.proxyURL, err = proxyURLForTarget(addr) |
| if err != nil { |
| return nil, fmt.Errorf("delegating_resolver: failed to determine proxy URL for target %q: %v", target, err) |
| } |
| |
| // proxy is not configured or proxy address excluded using `NO_PROXY` env |
| // var, so only target resolver is used. |
| if r.proxyURL == nil { |
| return targetResolverBuilder.Build(target, cc, opts) |
| } |
| |
| if logger.V(2) { |
| logger.Infof("Proxy URL detected : %s", r.proxyURL) |
| } |
| |
| // Resolver updates from one child may trigger calls into the other. Block |
| // updates until the children are initialized. |
| r.childMu.Lock() |
| defer r.childMu.Unlock() |
| // When the scheme is 'dns' and target resolution on client is not enabled, |
| // resolution should be handled by the proxy, not the client. Therefore, we |
| // bypass the target resolver and store the unresolved target address. |
| if target.URL.Scheme == "dns" && !targetResolutionEnabled { |
| r.targetResolverState = &resolver.State{ |
| Addresses: []resolver.Address{{Addr: addr}}, |
| Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: addr}}}}, |
| } |
| r.updateTargetResolverState(*r.targetResolverState) |
| return r, nil |
| } |
| wcc := &wrappingClientConn{ |
| stateListener: r.updateTargetResolverState, |
| parent: r, |
| } |
| if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil { |
| return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err) |
| } |
| return r, nil |
| } |
| |
| // proxyURIResolver creates a resolver for resolving proxy URIs using the "dns" |
| // scheme. It adjusts the proxyURL to conform to the "dns:///" format and builds |
| // a resolver with a wrappingClientConn to capture resolved addresses. |
| func (r *delegatingResolver) proxyURIResolver(opts resolver.BuildOptions) (resolver.Resolver, error) { |
| proxyBuilder := resolver.Get("dns") |
| if proxyBuilder == nil { |
| panic("delegating_resolver: resolver for proxy not found for scheme dns") |
| } |
| url := *r.proxyURL |
| url.Scheme = "dns" |
| url.Path = "/" + r.proxyURL.Host |
| url.Host = "" // Clear the Host field to conform to the "dns:///" format |
| |
| proxyTarget := resolver.Target{URL: url} |
| wcc := &wrappingClientConn{ |
| stateListener: r.updateProxyResolverState, |
| parent: r, |
| } |
| return proxyBuilder.Build(proxyTarget, wcc, opts) |
| } |
| |
| func (r *delegatingResolver) ResolveNow(o resolver.ResolveNowOptions) { |
| r.childMu.Lock() |
| defer r.childMu.Unlock() |
| r.targetResolver.ResolveNow(o) |
| r.proxyResolver.ResolveNow(o) |
| } |
| |
| func (r *delegatingResolver) Close() { |
| r.childMu.Lock() |
| defer r.childMu.Unlock() |
| r.targetResolver.Close() |
| r.targetResolver = nil |
| |
| r.proxyResolver.Close() |
| r.proxyResolver = nil |
| } |
| |
| func needsProxyResolver(state *resolver.State) bool { |
| for _, addr := range state.Addresses { |
| if !skipProxy(addr) { |
| return true |
| } |
| } |
| for _, endpoint := range state.Endpoints { |
| for _, addr := range endpoint.Addresses { |
| if !skipProxy(addr) { |
| return true |
| } |
| } |
| } |
| return false |
| } |
| |
| // parseTarget takes a target string and ensures it is a valid "host:port" target. |
| // |
| // It does the following: |
| // 1. If the target already has a port (e.g., "host:port", "[ipv6]:port"), |
| // it is returned as is. |
| // 2. If the host part is empty (e.g., ":80"), it defaults to "localhost", |
| // returning "localhost:80". |
| // 3. If the target is missing a port (e.g., "host", "ipv6"), the defaultPort |
| // is added. |
| // |
| // An error is returned for empty targets or targets with a trailing colon |
| // but no port (e.g., "host:"). |
| func parseTarget(target string) (string, error) { |
| if target == "" { |
| return "", fmt.Errorf("missing address") |
| } |
| |
| host, port, err := net.SplitHostPort(target) |
| if err != nil { |
| // If SplitHostPort fails, it's likely because the port is missing. |
| // We append the default port and return the result. |
| return net.JoinHostPort(target, defaultPort), nil |
| } |
| |
| // If SplitHostPort succeeds, we check for edge cases. |
| if port == "" { |
| // A success with an empty port means the target had a trailing colon, |
| // e.g., "host:", which is an error. |
| return "", fmt.Errorf("missing port after port-separator colon") |
| } |
| if host == "" { |
| // A success with an empty host means the target was like ":80". |
| // We default the host to "localhost". |
| host = "localhost" |
| } |
| return net.JoinHostPort(host, port), nil |
| } |
| |
| func skipProxy(address resolver.Address) bool { |
| // Avoid proxy when network is not tcp. |
| networkType, ok := networktype.Get(address) |
| if !ok { |
| networkType, _ = transport.ParseDialTarget(address.Addr) |
| } |
| if networkType != "tcp" { |
| return true |
| } |
| |
| req := &http.Request{URL: &url.URL{ |
| Scheme: "https", |
| Host: address.Addr, |
| }} |
| // Avoid proxy when address included in `NO_PROXY` environment variable or |
| // fails to get the proxy address. |
| url, err := HTTPSProxyFromEnvironment(req) |
| if err != nil || url == nil { |
| return true |
| } |
| return false |
| } |
| |
| // updateClientConnStateLocked constructs a combined list of addresses by |
| // pairing each proxy address with every target address of type TCP. For each |
| // pair, it creates a new [resolver.Address] using the proxy address and |
| // attaches the corresponding target address and user info as attributes. Target |
| // addresses that are not of type TCP are appended to the list as-is. The |
| // function returns nil if either resolver has not yet provided an update, and |
| // returns the result of ClientConn.UpdateState once both resolvers have |
| // provided at least one update. |
| func (r *delegatingResolver) updateClientConnStateLocked() error { |
| if r.targetResolverState == nil || r.proxyAddrs == nil { |
| return nil |
| } |
| |
| // If multiple resolved proxy addresses are present, we send only the |
| // unresolved proxy host and let net.Dial handle the proxy host name |
| // resolution when creating the transport. Sending all resolved addresses |
| // would increase the number of addresses passed to the ClientConn and |
| // subsequently to load balancing (LB) policies like Round Robin, leading |
| // to additional TCP connections. However, if there's only one resolved |
| // proxy address, we send it directly, as it doesn't affect the address |
| // count returned by the target resolver and the address count sent to the |
| // ClientConn. |
| var proxyAddr resolver.Address |
| if len(r.proxyAddrs) == 1 { |
| proxyAddr = r.proxyAddrs[0] |
| } else { |
| proxyAddr = resolver.Address{Addr: r.proxyURL.Host} |
| } |
| var addresses []resolver.Address |
| for _, targetAddr := range (*r.targetResolverState).Addresses { |
| if skipProxy(targetAddr) { |
| addresses = append(addresses, targetAddr) |
| continue |
| } |
| addresses = append(addresses, proxyattributes.Set(proxyAddr, proxyattributes.Options{ |
| User: r.proxyURL.User, |
| ConnectAddr: targetAddr.Addr, |
| })) |
| } |
| |
| // For each target endpoint, construct a new [resolver.Endpoint] that |
| // includes all addresses from all proxy endpoints and the addresses from |
| // that target endpoint, preserving the number of target endpoints. |
| var endpoints []resolver.Endpoint |
| for _, endpt := range (*r.targetResolverState).Endpoints { |
| var addrs []resolver.Address |
| for _, targetAddr := range endpt.Addresses { |
| // Avoid proxy when network is not tcp. |
| if skipProxy(targetAddr) { |
| addrs = append(addrs, targetAddr) |
| continue |
| } |
| for _, proxyAddr := range r.proxyAddrs { |
| addrs = append(addrs, proxyattributes.Set(proxyAddr, proxyattributes.Options{ |
| User: r.proxyURL.User, |
| ConnectAddr: targetAddr.Addr, |
| })) |
| } |
| } |
| endpoints = append(endpoints, resolver.Endpoint{Addresses: addrs}) |
| } |
| // Use the targetResolverState for its service config and attributes |
| // contents. The state update is only sent after both the target and proxy |
| // resolvers have sent their updates, and curState has been updated with the |
| // combined addresses. |
| curState := *r.targetResolverState |
| curState.Addresses = addresses |
| curState.Endpoints = endpoints |
| return r.cc.UpdateState(curState) |
| } |
| |
| // updateProxyResolverState updates the proxy resolver state by storing proxy |
| // addresses and endpoints, marking the resolver as ready, and triggering a |
| // state update if both proxy and target resolvers are ready. If the ClientConn |
| // returns a non-nil error, it calls `ResolveNow()` on the target resolver. It |
| // is a StateListener function of wrappingClientConn passed to the proxy |
| // resolver. |
| func (r *delegatingResolver) updateProxyResolverState(state resolver.State) error { |
| r.mu.Lock() |
| defer r.mu.Unlock() |
| if logger.V(2) { |
| logger.Infof("Addresses received from proxy resolver: %s", state.Addresses) |
| } |
| if len(state.Endpoints) > 0 { |
| // We expect exactly one address per endpoint because the proxy resolver |
| // uses "dns" resolution. |
| r.proxyAddrs = make([]resolver.Address, 0, len(state.Endpoints)) |
| for _, endpoint := range state.Endpoints { |
| r.proxyAddrs = append(r.proxyAddrs, endpoint.Addresses...) |
| } |
| } else if state.Addresses != nil { |
| r.proxyAddrs = state.Addresses |
| } else { |
| r.proxyAddrs = []resolver.Address{} // ensure proxyAddrs is non-nil to indicate an update has been received |
| } |
| err := r.updateClientConnStateLocked() |
| // Another possible approach was to block until updates are received from |
| // both resolvers. But this is not used because calling `New()` triggers |
| // `Build()` for the first resolver, which calls `UpdateState()`. And the |
| // second resolver hasn't sent an update yet, so it would cause `New()` to |
| // block indefinitely. |
| if err != nil { |
| go func() { |
| r.childMu.Lock() |
| defer r.childMu.Unlock() |
| if r.targetResolver != nil { |
| r.targetResolver.ResolveNow(resolver.ResolveNowOptions{}) |
| } |
| }() |
| } |
| return err |
| } |
| |
| // updateTargetResolverState is the StateListener function provided to the |
| // target resolver via wrappingClientConn. It updates the resolver state and |
| // marks the target resolver as ready. If the update includes at least one TCP |
| // address and the proxy resolver has not yet been constructed, it initializes |
| // the proxy resolver. A combined state update is triggered once both resolvers |
| // are ready. If all addresses are non-TCP, it proceeds without waiting for the |
| // proxy resolver. If ClientConn.UpdateState returns a non-nil error, |
| // ResolveNow() is called on the proxy resolver. |
| func (r *delegatingResolver) updateTargetResolverState(state resolver.State) error { |
| r.mu.Lock() |
| defer r.mu.Unlock() |
| |
| if logger.V(2) { |
| logger.Infof("Addresses received from target resolver: %v", state.Addresses) |
| } |
| r.targetResolverState = &state |
| // If all addresses returned by the target resolver have a non-TCP network |
| // type, or are listed in the `NO_PROXY` environment variable, do not wait |
| // for proxy update. |
| if !needsProxyResolver(r.targetResolverState) { |
| return r.cc.UpdateState(*r.targetResolverState) |
| } |
| |
| // The proxy resolver may be rebuilt multiple times, specifically each time |
| // the target resolver sends an update, even if the target resolver is built |
| // successfully but building the proxy resolver fails. |
| if len(r.proxyAddrs) == 0 { |
| go func() { |
| r.childMu.Lock() |
| defer r.childMu.Unlock() |
| if _, ok := r.proxyResolver.(nopResolver); !ok { |
| return |
| } |
| proxyResolver, err := r.proxyURIResolver(resolver.BuildOptions{}) |
| if err != nil { |
| r.cc.ReportError(fmt.Errorf("delegating_resolver: unable to build the proxy resolver: %v", err)) |
| return |
| } |
| r.proxyResolver = proxyResolver |
| }() |
| } |
| |
| err := r.updateClientConnStateLocked() |
| if err != nil { |
| go func() { |
| r.childMu.Lock() |
| defer r.childMu.Unlock() |
| if r.proxyResolver != nil { |
| r.proxyResolver.ResolveNow(resolver.ResolveNowOptions{}) |
| } |
| }() |
| } |
| return nil |
| } |
| |
| // wrappingClientConn serves as an intermediary between the parent ClientConn |
| // and the child resolvers created here. It implements the resolver.ClientConn |
| // interface and is passed in that capacity to the child resolvers. |
| type wrappingClientConn struct { |
| // Callback to deliver resolver state updates |
| stateListener func(state resolver.State) error |
| parent *delegatingResolver |
| } |
| |
| // UpdateState receives resolver state updates and forwards them to the |
| // appropriate listener function (either for the proxy or target resolver). |
| func (wcc *wrappingClientConn) UpdateState(state resolver.State) error { |
| return wcc.stateListener(state) |
| } |
| |
| // ReportError intercepts errors from the child resolvers and passes them to |
| // ClientConn. |
| func (wcc *wrappingClientConn) ReportError(err error) { |
| wcc.parent.cc.ReportError(err) |
| } |
| |
| // NewAddress intercepts the new resolved address from the child resolvers and |
| // passes them to ClientConn. |
| func (wcc *wrappingClientConn) NewAddress(addrs []resolver.Address) { |
| wcc.UpdateState(resolver.State{Addresses: addrs}) |
| } |
| |
| // ParseServiceConfig parses the provided service config and returns an object |
| // that provides the parsed config. |
| func (wcc *wrappingClientConn) ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult { |
| return wcc.parent.cc.ParseServiceConfig(serviceConfigJSON) |
| } |