Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support http->http in gateway #4605

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions gateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,37 @@ type (
Upstreams []Upstream
}

// HttpClientConf is the configuration for an HTTP client.
HttpClientConf struct {
Target string
Prefix string `json:",optional"`
Timeout int64 `json:",default=3000"`
}

// RouteMapping is a mapping between a gateway route and an upstream rpc method.
RouteMapping struct {
// Method is the HTTP method, like GET, POST, PUT, DELETE.
Method string
// Path is the HTTP path.
Path string
// RpcPath is the gRPC rpc method, with format of package.service/method
RpcPath string
// RpcPath is the gRPC rpc method, with format of package.service/method, optional.
// If the mapping is for HTTP, it's not necessary.
RpcPath string `json:",optional"`
}

// Upstream is the configuration for an upstream.
Upstream struct {
// Name is the name of the upstream.
Name string `json:",optional"`
// Grpc is the target of the upstream.
Grpc zrpc.RpcClientConf
Grpc *zrpc.RpcClientConf `json:",optional"`
// Http is the target of the upstream.
Http *HttpClientConf `json:",optional=!grpc"`
// ProtoSets is the file list of proto set, like [hello.pb].
// if your proto file import another proto file, you need to write multi-file slice,
// like [hello.pb, common.pb].
ProtoSets []string `json:",optional"`
// Mappings is the mapping between gateway routes and Upstream rpc methods.
// Mappings is the mapping between gateway routes and Upstream methods.
// Keep it blank if annotations are added in rpc methods.
Mappings []RouteMapping `json:",optional"`
}
Expand Down
220 changes: 159 additions & 61 deletions gateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,29 @@
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

"github.com/fullstorydev/grpcurl"
"github.com/golang/protobuf/jsonpb"
"github.com/jhump/protoreflect/grpcreflect"
"github.com/zeromicro/go-zero/core/logc"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mr"
"github.com/zeromicro/go-zero/core/threading"
"github.com/zeromicro/go-zero/gateway/internal"
"github.com/zeromicro/go-zero/rest"
"github.com/zeromicro/go-zero/rest/httpc"
"github.com/zeromicro/go-zero/rest/httpx"
"github.com/zeromicro/go-zero/zrpc"
"google.golang.org/grpc/codes"
)

const defaultHttpScheme = "http"

type (
// Server is a gateway server.
Server struct {
Expand Down Expand Up @@ -83,52 +90,11 @@
source <- up
}
}, func(up Upstream, writer mr.Writer[rest.Route], cancel func(error)) {
var cli zrpc.Client
if s.dialer != nil {
cli = s.dialer(up.Grpc)
} else {
cli = zrpc.MustNewClient(up.Grpc)
}
s.conns = append(s.conns, cli)

source, err := s.createDescriptorSource(cli, up)
if err != nil {
cancel(fmt.Errorf("%s: %w", up.Name, err))
return
}

methods, err := internal.GetMethods(source)
if err != nil {
cancel(fmt.Errorf("%s: %w", up.Name, err))
return
}

resolver := grpcurl.AnyResolverFromDescriptorSource(source)
for _, m := range methods {
if len(m.HttpMethod) > 0 && len(m.HttpPath) > 0 {
writer.Write(rest.Route{
Method: m.HttpMethod,
Path: m.HttpPath,
Handler: s.buildHandler(source, resolver, cli, m.RpcPath),
})
}
}

methodSet := make(map[string]struct{})
for _, m := range methods {
methodSet[m.RpcPath] = struct{}{}
}
for _, m := range up.Mappings {
if _, ok := methodSet[m.RpcPath]; !ok {
cancel(fmt.Errorf("%s: rpc method %s not found", up.Name, m.RpcPath))
return
}

writer.Write(rest.Route{
Method: strings.ToUpper(m.Method),
Path: m.Path,
Handler: s.buildHandler(source, resolver, cli, m.RpcPath),
})
// up.Grpc and up.Http are exclusive
if up.Grpc != nil {
s.buildGrpcRoute(up, writer, cancel)
} else if up.Http != nil {
s.buildHttpRoute(up, writer)
}
}, func(pipe <-chan rest.Route, cancel func(error)) {
for route := range pipe {
Expand All @@ -137,7 +103,7 @@
})
}

func (s *Server) buildHandler(source grpcurl.DescriptorSource, resolver jsonpb.AnyResolver,
func (s *Server) buildGrpcHandler(source grpcurl.DescriptorSource, resolver jsonpb.AnyResolver,
cli zrpc.Client, rpcPath string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
parser, err := internal.NewRequestParser(r, resolver)
Expand All @@ -160,31 +126,119 @@
}
}

func (s *Server) createDescriptorSource(cli zrpc.Client, up Upstream) (grpcurl.DescriptorSource, error) {
var source grpcurl.DescriptorSource
var err error
func (s *Server) buildGrpcRoute(up Upstream, writer mr.Writer[rest.Route], cancel func(error)) {
var cli zrpc.Client
if s.dialer != nil {
cli = s.dialer(*up.Grpc)
} else {
cli = zrpc.MustNewClient(*up.Grpc)
}

Check warning on line 135 in gateway/server.go

View check run for this annotation

Codecov / codecov/patch

gateway/server.go#L134-L135

Added lines #L134 - L135 were not covered by tests
s.conns = append(s.conns, cli)

if len(up.ProtoSets) > 0 {
source, err = grpcurl.DescriptorSourceFromProtoSets(up.ProtoSets...)
source, err := createDescriptorSource(cli, up)
if err != nil {
cancel(fmt.Errorf("%s: %w", up.Name, err))
return
}

Check warning on line 142 in gateway/server.go

View check run for this annotation

Codecov / codecov/patch

gateway/server.go#L140-L142

Added lines #L140 - L142 were not covered by tests

methods, err := internal.GetMethods(source)
if err != nil {
cancel(fmt.Errorf("%s: %w", up.Name, err))
return
}

Check warning on line 148 in gateway/server.go

View check run for this annotation

Codecov / codecov/patch

gateway/server.go#L146-L148

Added lines #L146 - L148 were not covered by tests

resolver := grpcurl.AnyResolverFromDescriptorSource(source)
for _, m := range methods {
if len(m.HttpMethod) > 0 && len(m.HttpPath) > 0 {
writer.Write(rest.Route{
Method: m.HttpMethod,
Path: m.HttpPath,
Handler: s.buildGrpcHandler(source, resolver, cli, m.RpcPath),
})
}

Check warning on line 158 in gateway/server.go

View check run for this annotation

Codecov / codecov/patch

gateway/server.go#L153-L158

Added lines #L153 - L158 were not covered by tests
}

methodSet := make(map[string]struct{})
for _, m := range methods {
methodSet[m.RpcPath] = struct{}{}
}
for _, m := range up.Mappings {
if _, ok := methodSet[m.RpcPath]; !ok {
cancel(fmt.Errorf("%s: rpc method %s not found", up.Name, m.RpcPath))
return
}

Check warning on line 169 in gateway/server.go

View check run for this annotation

Codecov / codecov/patch

gateway/server.go#L167-L169

Added lines #L167 - L169 were not covered by tests

writer.Write(rest.Route{
Method: strings.ToUpper(m.Method),
Path: m.Path,
Handler: s.buildGrpcHandler(source, resolver, cli, m.RpcPath),
})
}
}

func (s *Server) buildHttpHandler(target *HttpClientConf) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set(httpx.ContentType, httpx.JsonContentType)
req, err := buildRequestWithNewTarget(r, target)
if err != nil {
return nil, err
httpx.ErrorCtx(r.Context(), w, err)
return
}

if target.Timeout > 0 {
timeout := time.Duration(target.Timeout) * time.Millisecond
ctx, cancel := context.WithTimeout(r.Context(), timeout)
defer cancel()
req = req.WithContext(ctx)
}

resp, err := httpc.DoRequest(req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
defer resp.Body.Close()

for key, values := range resp.Header {
for _, value := range values {
w.Header().Add(key, value)
}
}

w.WriteHeader(resp.StatusCode)
if _, err = io.Copy(w, resp.Body); err != nil {
// log the error with original request info
logc.Error(r.Context(), err)

Check warning on line 211 in gateway/server.go

View check run for this annotation

Codecov / codecov/patch

gateway/server.go#L210-L211

Added lines #L210 - L211 were not covered by tests
}
} else {
client := grpcreflect.NewClientAuto(context.Background(), cli.Conn())
source = grpcurl.DescriptorSourceFromServer(context.Background(), client)
}
}

return source, nil
func (s *Server) buildHttpRoute(up Upstream, writer mr.Writer[rest.Route]) {
for _, m := range up.Mappings {
writer.Write(rest.Route{
Method: strings.ToUpper(m.Method),
Path: m.Path,
Handler: s.buildHttpHandler(up.Http),
})
}
}

func (s *Server) ensureUpstreamNames() error {
for i := 0; i < len(s.upstreams); i++ {
target, err := s.upstreams[i].Grpc.BuildTarget()
if err != nil {
return err
if len(s.upstreams[i].Name) > 0 {
continue
}

s.upstreams[i].Name = target
if s.upstreams[i].Grpc != nil {
target, err := s.upstreams[i].Grpc.BuildTarget()
if err != nil {
return err
}

s.upstreams[i].Name = target
} else if s.upstreams[i].Http != nil {
s.upstreams[i].Name = s.upstreams[i].Http.Target
}
}

return nil
Expand All @@ -207,6 +261,50 @@
}
}

func buildRequestWithNewTarget(r *http.Request, target *HttpClientConf) (*http.Request, error) {
u := *r.URL
u.Host = target.Target
if len(u.Scheme) == 0 {
u.Scheme = defaultHttpScheme
}

if len(target.Prefix) > 0 {
var err error
u.Path, err = url.JoinPath(target.Prefix, u.Path)
if err != nil {
return nil, err
}
}

return &http.Request{
Method: r.Method,
URL: &u,
Header: r.Header.Clone(),
Proto: r.Proto,
ProtoMajor: r.ProtoMajor,
ProtoMinor: r.ProtoMinor,
ContentLength: r.ContentLength,
Body: io.NopCloser(r.Body),
}, nil
}

func createDescriptorSource(cli zrpc.Client, up Upstream) (grpcurl.DescriptorSource, error) {
var source grpcurl.DescriptorSource
var err error

if len(up.ProtoSets) > 0 {
source, err = grpcurl.DescriptorSourceFromProtoSets(up.ProtoSets...)
if err != nil {
return nil, err
}

Check warning on line 299 in gateway/server.go

View check run for this annotation

Codecov / codecov/patch

gateway/server.go#L296-L299

Added lines #L296 - L299 were not covered by tests
} else {
client := grpcreflect.NewClientAuto(context.Background(), cli.Conn())
source = grpcurl.DescriptorSourceFromServer(context.Background(), client)
}

return source, nil
}

// withDialer sets a dialer to create a gRPC client.
func withDialer(dialer func(conf zrpc.RpcClientConf) zrpc.Client) func(*Server) {
return func(s *Server) {
Expand Down
Loading
Loading