Skip to content

Commit 605fa1c

Browse files
joe4devdfangl
authored andcommitted
Adapt to new init logic (#24)
1 parent 62535dc commit 605fa1c

File tree

7 files changed

+118
-122
lines changed

7 files changed

+118
-122
lines changed

cmd/localstack/awsutil.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33
// LOCALSTACK CHANGES 2022-03-10: modified/collected file from /cmd/aws-lambda-rie/* into this util
44
// LOCALSTACK CHANGES 2022-03-10: minor refactoring of PrintEndReports
5+
// LOCALSTACK CHANGES 2023-10-06: reflect getBootstrap and InitHandler API updates
56

67
package main
78

@@ -11,6 +12,7 @@ import (
1112
log "github.com/sirupsen/logrus"
1213
"go.amzn.com/lambda/interop"
1314
"go.amzn.com/lambda/rapidcore"
15+
"go.amzn.com/lambda/rapidcore/env"
1416
"golang.org/x/sys/unix"
1517
"io"
1618
"io/fs"
@@ -87,7 +89,7 @@ func getBootstrap(args []string) (*rapidcore.Bootstrap, string) {
8789
}
8890
}
8991

90-
return rapidcore.NewBootstrapSingleCmd(bootstrapLookupCmd, currentWorkingDir), handler
92+
return rapidcore.NewBootstrapSingleCmd(bootstrapLookupCmd, currentWorkingDir, ""), handler
9193
}
9294

9395
func PrintEndReports(invokeId string, initDuration string, memorySize string, invokeStart time.Time, timeoutDuration time.Duration, w io.Writer) {
@@ -203,7 +205,7 @@ func getSubFoldersInList(prefix string, pathList []string) (oldFolders []string,
203205
return
204206
}
205207

206-
func InitHandler(sandbox Sandbox, functionVersion string, timeout int64) (time.Time, time.Time) {
208+
func InitHandler(sandbox Sandbox, functionVersion string, timeout int64, bs interop.Bootstrap) (time.Time, time.Time) {
207209
additionalFunctionEnvironmentVariables := map[string]string{}
208210

209211
// Add default Env Vars if they were not defined. This is a required otherwise 1p Python2.7, Python3.6, and
@@ -226,15 +228,23 @@ func InitHandler(sandbox Sandbox, functionVersion string, timeout int64) (time.T
226228
// pass to rapid
227229
sandbox.Init(&interop.Init{
228230
Handler: GetenvWithDefault("AWS_LAMBDA_FUNCTION_HANDLER", os.Getenv("_HANDLER")),
229-
CorrelationID: "initCorrelationID", // TODO
230231
AwsKey: os.Getenv("AWS_ACCESS_KEY_ID"),
231232
AwsSecret: os.Getenv("AWS_SECRET_ACCESS_KEY"),
232233
AwsSession: os.Getenv("AWS_SESSION_TOKEN"),
233234
XRayDaemonAddress: GetenvWithDefault("AWS_XRAY_DAEMON_ADDRESS", "127.0.0.1:2000"),
234235
FunctionName: GetenvWithDefault("AWS_LAMBDA_FUNCTION_NAME", "test_function"),
235236
FunctionVersion: functionVersion,
236237

238+
// TODO: Implement runtime management controls
239+
// https://aws.amazon.com/blogs/compute/introducing-aws-lambda-runtime-management-controls/
240+
RuntimeInfo: interop.RuntimeInfo{
241+
ImageJSON: "{}",
242+
Arn: "",
243+
Version: ""},
237244
CustomerEnvironmentVariables: additionalFunctionEnvironmentVariables,
245+
SandboxType: interop.SandboxClassic,
246+
Bootstrap: bs,
247+
EnvironmentVariables: env.NewEnvironment(),
238248
}, timeout*1000)
239249
initEnd := time.Now()
240250
return initStart, initEnd

cmd/localstack/custom_interop.go

+41-107
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package main
22

3+
// Original implementation: lambda/rapidcore/server.go includes Server struct with state
4+
// Server interface between Runtime API and this init: lambda/interop/model.go:358
5+
36
import (
47
"bytes"
58
"encoding/json"
69
"fmt"
710
"github.com/go-chi/chi"
811
log "github.com/sirupsen/logrus"
9-
"go.amzn.com/lambda/core"
1012
"go.amzn.com/lambda/core/statejson"
1113
"go.amzn.com/lambda/interop"
1214
"go.amzn.com/lambda/rapidcore"
@@ -38,8 +40,8 @@ const (
3840
)
3941

4042
func (l *LocalStackAdapter) SendStatus(status LocalStackStatus, payload []byte) error {
41-
status_url := fmt.Sprintf("%s/status/%s/%s", l.UpstreamEndpoint, l.RuntimeId, status)
42-
_, err := http.Post(status_url, "application/json", bytes.NewReader(payload))
43+
statusUrl := fmt.Sprintf("%s/status/%s/%s", l.UpstreamEndpoint, l.RuntimeId, status)
44+
_, err := http.Post(statusUrl, "application/json", bytes.NewReader(payload))
4345
if err != nil {
4446
return err
4547
}
@@ -62,7 +64,7 @@ type ErrorResponse struct {
6264
StackTrace []string `json:"stackTrace,omitempty"`
6365
}
6466

65-
func NewCustomInteropServer(lsOpts *LsOpts, delegate rapidcore.InteropServer, logCollector *LogCollector) (server *CustomInteropServer) {
67+
func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollector *LogCollector) (server *CustomInteropServer) {
6668
server = &CustomInteropServer{
6769
delegate: delegate.(*rapidcore.Server),
6870
port: lsOpts.InteropPort,
@@ -99,9 +101,7 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate rapidcore.InteropServer, lo
99101
InvokedFunctionArn: invokeR.InvokedFunctionArn,
100102
Payload: strings.NewReader(invokeR.Payload), // r.Body,
101103
NeedDebugLogs: true,
102-
CorrelationID: "invokeCorrelationID",
103-
104-
TraceID: invokeR.TraceId,
104+
TraceID: invokeR.TraceId,
105105
// TODO: set correct segment ID from request
106106
//LambdaSegmentID: "LambdaSegmentID", // r.Header.Get("X-Amzn-Segment-Id"),
107107
//CognitoIdentityID: "",
@@ -194,147 +194,81 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate rapidcore.InteropServer, lo
194194
return server
195195
}
196196

197-
func (c *CustomInteropServer) StartAcceptingDirectInvokes() error {
198-
log.Traceln("Function called")
199-
err := c.localStackAdapter.SendStatus(Ready, []byte{})
200-
if err != nil {
201-
return err
202-
}
203-
return c.delegate.StartAcceptingDirectInvokes()
197+
func (c *CustomInteropServer) SendResponse(invokeID string, headers map[string]string, reader io.Reader, trailers http.Header, request *interop.CancellableRequest) error {
198+
log.Traceln("SendResponse called")
199+
return c.delegate.SendResponse(invokeID, headers, reader, trailers, request)
204200
}
205201

206-
func (c *CustomInteropServer) SendResponse(invokeID string, contentType string, response io.Reader) error {
207-
log.Traceln("Function called")
208-
return c.delegate.SendResponse(invokeID, contentType, response)
202+
func (c *CustomInteropServer) SendErrorResponse(invokeID string, response *interop.ErrorResponse) error {
203+
log.Traceln("SendErrorResponse called")
204+
return c.delegate.SendErrorResponse(invokeID, response)
209205
}
210206

211-
func (c *CustomInteropServer) SendErrorResponse(invokeID string, response *interop.ErrorResponse) error {
212-
is, err := c.InternalState()
213-
if err != nil {
214-
return err
215-
}
216-
rs := is.Runtime.State
217-
if rs.Name == core.RuntimeInitErrorStateName {
218-
err = c.localStackAdapter.SendStatus(Error, response.Payload)
219-
if err != nil {
220-
return err
221-
}
207+
// SendInitErrorResponse writes error response during init to a shared memory and sends GIRD FAULT.
208+
func (c *CustomInteropServer) SendInitErrorResponse(invokeID string, response *interop.ErrorResponse) error {
209+
log.Traceln("SendInitErrorResponse called")
210+
if err := c.localStackAdapter.SendStatus(Error, response.Payload); err != nil {
211+
log.Fatalln("Failed to send init error to LocalStack " + err.Error() + ". Exiting.")
222212
}
223-
224-
return c.delegate.SendErrorResponse(invokeID, response)
213+
return c.delegate.SendInitErrorResponse(invokeID, response)
225214
}
226215

227216
func (c *CustomInteropServer) GetCurrentInvokeID() string {
228-
log.Traceln("Function called")
217+
log.Traceln("GetCurrentInvokeID called")
229218
return c.delegate.GetCurrentInvokeID()
230219
}
231220

232-
func (c *CustomInteropServer) CommitResponse() error {
233-
log.Traceln("Function called")
234-
return c.delegate.CommitResponse()
235-
}
236-
237-
func (c *CustomInteropServer) SendRunning(running *interop.Running) error {
238-
log.Traceln("Function called")
239-
return c.delegate.SendRunning(running)
240-
}
241-
242221
func (c *CustomInteropServer) SendRuntimeReady() error {
243-
log.Traceln("Function called")
222+
log.Traceln("SendRuntimeReady called")
244223
return c.delegate.SendRuntimeReady()
245224
}
246225

247-
func (c *CustomInteropServer) SendDone(done *interop.Done) error {
248-
log.Traceln("Function called")
249-
return c.delegate.SendDone(done)
250-
}
251-
252-
func (c *CustomInteropServer) SendDoneFail(fail *interop.DoneFail) error {
253-
log.Traceln("Function called")
254-
return c.delegate.SendDoneFail(fail)
255-
}
256-
257-
func (c *CustomInteropServer) StartChan() <-chan *interop.Start {
258-
log.Traceln("Function called")
259-
return c.delegate.StartChan()
260-
}
261-
262-
func (c *CustomInteropServer) InvokeChan() <-chan *interop.Invoke {
263-
log.Traceln("Function called")
264-
return c.delegate.InvokeChan()
265-
}
266-
267-
func (c *CustomInteropServer) ResetChan() <-chan *interop.Reset {
268-
log.Traceln("Function called")
269-
return c.delegate.ResetChan()
270-
}
271-
272-
func (c *CustomInteropServer) ShutdownChan() <-chan *interop.Shutdown {
273-
log.Traceln("Function called")
274-
return c.delegate.ShutdownChan()
275-
}
276-
277-
func (c *CustomInteropServer) TransportErrorChan() <-chan error {
278-
log.Traceln("Function called")
279-
return c.delegate.TransportErrorChan()
280-
}
281-
282-
func (c *CustomInteropServer) Clear() {
283-
log.Traceln("Function called")
284-
c.delegate.Clear()
285-
}
286-
287-
func (c *CustomInteropServer) IsResponseSent() bool {
288-
log.Traceln("Function called")
289-
return c.delegate.IsResponseSent()
290-
}
291-
292-
func (c *CustomInteropServer) SetInternalStateGetter(cb interop.InternalStateGetter) {
293-
log.Traceln("Function called")
294-
c.delegate.SetInternalStateGetter(cb)
295-
}
296-
297-
func (c *CustomInteropServer) Init(i *interop.Start, invokeTimeoutMs int64) {
298-
log.Traceln("Function called")
299-
c.delegate.Init(i, invokeTimeoutMs)
226+
func (c *CustomInteropServer) Init(i *interop.Init, invokeTimeoutMs int64) error {
227+
log.Traceln("Init called")
228+
return c.delegate.Init(i, invokeTimeoutMs)
300229
}
301230

302231
func (c *CustomInteropServer) Invoke(responseWriter http.ResponseWriter, invoke *interop.Invoke) error {
303-
log.Traceln("Function called")
232+
log.Traceln("Invoke called")
304233
return c.delegate.Invoke(responseWriter, invoke)
305234
}
306235

307236
func (c *CustomInteropServer) FastInvoke(w http.ResponseWriter, i *interop.Invoke, direct bool) error {
308-
log.Traceln("Function called")
237+
log.Traceln("FastInvoke called")
309238
return c.delegate.FastInvoke(w, i, direct)
310239
}
311240

312241
func (c *CustomInteropServer) Reserve(id string, traceID, lambdaSegmentID string) (*rapidcore.ReserveResponse, error) {
313-
log.Traceln("Function called")
242+
log.Traceln("Reserve called")
314243
return c.delegate.Reserve(id, traceID, lambdaSegmentID)
315244
}
316245

317246
func (c *CustomInteropServer) Reset(reason string, timeoutMs int64) (*statejson.ResetDescription, error) {
318-
log.Traceln("Function called")
247+
log.Traceln("Reset called")
319248
return c.delegate.Reset(reason, timeoutMs)
320249
}
321250

322251
func (c *CustomInteropServer) AwaitRelease() (*statejson.InternalStateDescription, error) {
323-
log.Traceln("Function called")
252+
log.Traceln("AwaitRelease called")
324253
return c.delegate.AwaitRelease()
325254
}
326255

327-
func (c *CustomInteropServer) Shutdown(shutdown *interop.Shutdown) *statejson.InternalStateDescription {
328-
log.Traceln("Function called")
329-
return c.delegate.Shutdown(shutdown)
330-
}
331-
332256
func (c *CustomInteropServer) InternalState() (*statejson.InternalStateDescription, error) {
333-
log.Traceln("Function called")
257+
log.Traceln("InternalState called")
334258
return c.delegate.InternalState()
335259
}
336260

337261
func (c *CustomInteropServer) CurrentToken() *interop.Token {
338-
log.Traceln("Function called")
262+
log.Traceln("CurrentToken called")
339263
return c.delegate.CurrentToken()
340264
}
265+
266+
func (c *CustomInteropServer) SetSandboxContext(sbCtx interop.SandboxContext) {
267+
log.Traceln("SetSandboxContext called")
268+
c.delegate.SetSandboxContext(sbCtx)
269+
}
270+
271+
func (c *CustomInteropServer) SetInternalStateGetter(cb interop.InternalStateGetter) {
272+
log.Traceln("SetInternalStateGetter called")
273+
c.delegate.InternalStateGetter = cb
274+
}

cmd/localstack/logs_egress_api.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package main
2+
3+
import (
4+
"io"
5+
"os"
6+
)
7+
8+
// This LocalStack LogsEgressAPI builder allows to customize log capturing, in our case using the logCollector.
9+
10+
type LocalStackLogsEgressAPI struct {
11+
logCollector *LogCollector
12+
}
13+
14+
func NewLocalStackLogsEgressAPI(logCollector *LogCollector) *LocalStackLogsEgressAPI {
15+
return &LocalStackLogsEgressAPI{
16+
logCollector: logCollector,
17+
}
18+
}
19+
20+
// The interface StdLogsEgressAPI for the functions below is defined in the under cmd/localstack/logs_egress_api.go
21+
// The default implementation is a NoOpLogsEgressAPI
22+
23+
func (s *LocalStackLogsEgressAPI) GetExtensionSockets() (io.Writer, io.Writer, error) {
24+
// os.Stderr can not be used for the stderrWriter because stderr is for internal logging (not customer visible).
25+
return io.MultiWriter(s.logCollector, os.Stdout), io.MultiWriter(s.logCollector, os.Stdout), nil
26+
}
27+
28+
func (s *LocalStackLogsEgressAPI) GetRuntimeSockets() (io.Writer, io.Writer, error) {
29+
// os.Stderr can not be used for the stderrWriter because stderr is for internal logging (not customer visible).
30+
return io.MultiWriter(s.logCollector, os.Stdout), io.MultiWriter(s.logCollector, os.Stdout), nil
31+
}

cmd/localstack/main.go

+28-8
Original file line numberDiff line numberDiff line change
@@ -161,14 +161,15 @@ func main() {
161161
}
162162
}
163163

164-
logCollector := NewLogCollector()
165-
166164
// file watcher for hot-reloading
167165
fileWatcherContext, cancelFileWatcher := context.WithCancel(context.Background())
168166

167+
logCollector := NewLogCollector()
168+
localStackLogsEgressApi := NewLocalStackLogsEgressAPI(logCollector)
169+
169170
// build sandbox
170171
sandbox := rapidcore.
171-
NewSandboxBuilder(bootstrap).
172+
NewSandboxBuilder().
172173
//SetTracer(tracer).
173174
AddShutdownFunc(func() {
174175
log.Debugln("Stopping file watcher")
@@ -178,7 +179,7 @@ func main() {
178179
}).
179180
SetExtensionsFlag(true).
180181
SetInitCachingFlag(true).
181-
SetTailLogOutput(logCollector)
182+
SetLogsEgressAPI(localStackLogsEgressApi)
182183

183184
// xray daemon
184185
endpoint := "http://" + lsOpts.LocalstackIP + ":" + lsOpts.EdgePort
@@ -192,7 +193,7 @@ func main() {
192193
})
193194
runDaemon(d) // async
194195

195-
defaultInterop := sandbox.InteropServer()
196+
defaultInterop := sandbox.DefaultInteropServer()
196197
interopServer := NewCustomInteropServer(lsOpts, defaultInterop, logCollector)
197198
sandbox.SetInteropServer(interopServer)
198199
if len(handler) > 0 {
@@ -204,7 +205,10 @@ func main() {
204205
})
205206

206207
// initialize all flows and start runtime API
207-
go sandbox.Create()
208+
sandboxContext, internalStateFn := sandbox.Create()
209+
// Populate our custom interop server
210+
interopServer.SetSandboxContext(sandboxContext)
211+
interopServer.SetInternalStateGetter(internalStateFn)
208212

209213
// get timeout
210214
invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT") // TODO: collect all AWS_* env parsing
@@ -214,8 +218,24 @@ func main() {
214218
}
215219
go RunHotReloadingListener(interopServer, lsOpts.HotReloadingPaths, fileWatcherContext)
216220

217-
// start runtime init
218-
go InitHandler(sandbox, GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION"), int64(invokeTimeoutSeconds)) // TODO: replace this with a custom init
221+
// start runtime init. It is important to start `InitHandler` synchronously because we need to ensure the
222+
// notification channels and status fields are properly initialized before `AwaitInitialized`
223+
log.Debugln("Starting runtime init.")
224+
InitHandler(sandbox.LambdaInvokeAPI(), GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION"), int64(invokeTimeoutSeconds), bootstrap) // TODO: replace this with a custom init
225+
226+
log.Debugln("Awaiting initialization of runtime init.")
227+
if err := interopServer.delegate.AwaitInitialized(); err != nil {
228+
// Error cases: ErrInitDoneFailed or ErrInitResetReceived
229+
log.Errorln("Runtime init failed to initialize: " + err.Error() + ". Exiting.")
230+
// NOTE: Sending the error status to LocalStack is handled beforehand in the custom_interop.go through the
231+
// callback SendInitErrorResponse because it contains the correct error response payload.
232+
return
233+
}
234+
235+
log.Debugln("Completed initialization of runtime init. Sending status ready to LocalStack.")
236+
if err := interopServer.localStackAdapter.SendStatus(Ready, []byte{}); err != nil {
237+
log.Fatalln("Failed to send status ready to LocalStack " + err.Error() + ". Exiting.")
238+
}
219239

220240
<-exitChan
221241
}

0 commit comments

Comments
 (0)