Skip to content

Commit

Permalink
Fix http_client input connect bug
Browse files Browse the repository at this point in the history
This issue was uncovered because the `http_client` components
would not try to reconnect if the internal codec couldn't be
initialised. The fix addresses the issue at the scanner level.

`h.codecCtor.Create` will return a `*service.OwnedScanner`, but
`h.codec` has type `codec.DeprecatedFallbackStream`, so if we get
an error from `h.codecCtor.Create`, the `h.codec != nil` check in
`Connect` will be true, so Benthos will think the input is
connected. This can happen if the codec specification isn't
compatible with the received data.

Signed-off-by: Mihai Todor <[email protected]>
  • Loading branch information
mihaitodor committed Mar 2, 2025
1 parent 4d6fce6 commit ba69e0f
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
8 changes: 7 additions & 1 deletion public/service/codec/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@ type codecRPublic struct {
func (r *codecRPublic) Create(rdr io.ReadCloser, aFn service.AckFunc, details *service.ScannerSourceDetails) (DeprecatedFallbackStream, error) {
sDetails := service.NewScannerSourceDetails()
sDetails.SetName(details.Name())
return r.newCtor.Create(rdr, aFn, sDetails)

os, err := r.newCtor.Create(rdr, aFn, sDetails)
if err != nil {
return nil, err
}

return os, err
}

func (r *codecRPublic) Close(ctx context.Context) error {
Expand Down
28 changes: 28 additions & 0 deletions public/service/codec/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,31 @@ func TestInteropCodecDefault(t *testing.T) {
require.NoError(t, strm.Close(context.Background()))
assert.True(t, acked)
}

func TestInteropCodecError(t *testing.T) {
// This test asserts that `DeprecatedFallbackCodec.Create` returns a nil `DeprecatedFallbackStream` if the scanner
// `Create` method returns an error.

confSpec := service.NewConfigSpec().Fields(codec.DeprecatedCodecFields("lines")...)
pConf, err := confSpec.ParseYAML(`
scanner:
decompress:
algorithm: gzip
into:
lines: {}
`, nil)
require.NoError(t, err)

rdr, err := codec.DeprecatedCodecFromParsed(pConf)
require.NoError(t, err)

// The `decompress` scanner will error out when trying to parse the `gzip` header from the nil buffer.
buf := bytes.NewReader(nil)
strm, err := rdr.Create(io.NopCloser(buf), func(ctx context.Context, err error) error {
return nil
}, service.NewScannerSourceDetails())
require.ErrorIs(t, err, io.EOF)
if strm != nil {
assert.Fail(t, "expected nil stream")
}
}

0 comments on commit ba69e0f

Please sign in to comment.