Skip to content

Commit

Permalink
Add support for OpenTelemetry in worker (#1058)
Browse files Browse the repository at this point in the history
* OpenTelemetry support in worker (#1049)

* Detect if OpenTelemetry module is present and environment variable is set
* Pre-instrument invocations with a span using Invocation ID + parent span ID
* Forward all user logs to the module to be sent to OTel endpoint

(cherry picked from commit f0c7eec)
  • Loading branch information
andystaples authored Apr 12, 2024
1 parent 468617c commit 684a843
Show file tree
Hide file tree
Showing 16 changed files with 657 additions and 74 deletions.
152 changes: 122 additions & 30 deletions protobuf/src/proto/FunctionRpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ message StreamingMessage {
WorkerInitRequest worker_init_request = 17;
// Worker responds after initializing with its capabilities & status
WorkerInitResponse worker_init_response = 16;


// MESSAGE NOT USED
// Worker periodically sends empty heartbeat message to host
WorkerHeartbeat worker_heartbeat = 15;

Expand Down Expand Up @@ -85,6 +86,13 @@ message StreamingMessage {

// Host gets the list of function load responses
FunctionLoadResponseCollection function_load_response_collection = 32;

// Host sends required metadata to worker to warmup the worker
WorkerWarmupRequest worker_warmup_request = 33;

// Worker responds after warming up with the warmup result
WorkerWarmupResponse worker_warmup_response = 34;

}
}

Expand Down Expand Up @@ -120,7 +128,7 @@ message WorkerInitRequest {

// Worker responds with the result of initializing itself
message WorkerInitResponse {
// NOT USED
// PROPERTY NOT USED
// TODO: Remove from protobuf during next breaking change release
string worker_version = 1;

Expand Down Expand Up @@ -173,7 +181,7 @@ message StatusResult {
repeated RpcLog logs = 3;
}

// NOT USED
// MESSAGE NOT USED
// TODO: Remove from protobuf during next breaking change release
message WorkerHeartbeat {}

Expand All @@ -187,7 +195,7 @@ message WorkerTerminate {
message FileChangeEventRequest {
// Types of File change operations (See link for more info: https://msdn.microsoft.com/en-us/library/t6xf43e0(v=vs.110).aspx)
enum Type {
Unknown = 0;
Unknown = 0;
Created = 1;
Deleted = 2;
Changed = 4;
Expand Down Expand Up @@ -237,8 +245,26 @@ message FunctionEnvironmentReloadRequest {
}

message FunctionEnvironmentReloadResponse {
enum CapabilitiesUpdateStrategy {
// overwrites existing values and appends new ones
// ex. worker init: {A: foo, B: bar} + env reload: {A:foo, B: foo, C: foo} -> {A: foo, B: foo, C: foo}
merge = 0;
// existing capabilities are cleared and new capabilities are applied
// ex. worker init: {A: foo, B: bar} + env reload: {A:foo, C: foo} -> {A: foo, C: foo}
replace = 1;
}
// After specialization, worker sends capabilities & metadata.
// Worker metadata captured for telemetry purposes
WorkerMetadata worker_metadata = 1;

// A map of worker supported features/capabilities
map<string, string> capabilities = 2;

// Status of the response
StatusResult result = 3;

// If no strategy is defined, the host will default to merge
CapabilitiesUpdateStrategy capabilities_update_strategy = 4;
}

// Tell the out-of-proc worker to close any shared memory maps it allocated for given invocation
Expand Down Expand Up @@ -322,10 +348,13 @@ message RpcFunctionMetadata {
// A flag indicating if managed dependency is enabled or not
bool managed_dependency_enabled = 14;

// The optional function execution retry strategy to use on invocation failures.
RpcRetryOptions retry_options = 15;

// Properties for function metadata
// They're usually specific to a worker and largely passed along to the controller API for use
// outside the host
map<string,string> Properties = 16;
map<string,string> properties = 16;
}

// Host tells worker it is ready to receive metadata
Expand Down Expand Up @@ -369,14 +398,14 @@ message InvocationRequest {

// Host sends ActivityId, traceStateString and Tags from host
message RpcTraceContext {
// This corresponds to Activity.Current?.Id
string trace_parent = 1;
// This corresponds to Activity.Current?.Id
string trace_parent = 1;

// This corresponds to Activity.Current?.TraceStateString
string trace_state = 2;
// This corresponds to Activity.Current?.TraceStateString
string trace_state = 2;

// This corresponds to Activity.Current?.Tags
map<string, string> attributes = 3;
// This corresponds to Activity.Current?.Tags
map<string, string> attributes = 3;
}

// Host sends retry context for a function invocation
Expand All @@ -396,8 +425,8 @@ message InvocationCancel {
// Unique id for invocation
string invocation_id = 2;

// Time period before force shutdown
google.protobuf.Duration grace_period = 1; // could also use absolute time
// PROPERTY NOT USED
google.protobuf.Duration grace_period = 1;
}

// Worker responds with status of Invocation
Expand All @@ -415,6 +444,15 @@ message InvocationResponse {
StatusResult result = 3;
}

message WorkerWarmupRequest {
// Full path of worker.config.json location
string worker_directory = 1;
}

message WorkerWarmupResponse {
StatusResult result = 1;
}

// Used to encapsulate data which could be a variety of types
message TypedData {
oneof data {
Expand All @@ -429,6 +467,8 @@ message TypedData {
CollectionString collection_string = 9;
CollectionDouble collection_double = 10;
CollectionSInt64 collection_sint64 = 11;
ModelBindingData model_binding_data = 12;
CollectionModelBindingData collection_model_binding_data = 13;
}
}

Expand Down Expand Up @@ -496,20 +536,20 @@ message ParameterBinding {

// Used to describe a given binding on load
message BindingInfo {
// Indicates whether it is an input or output binding (or a fancy inout binding)
enum Direction {
in = 0;
out = 1;
inout = 2;
}

// Indicates the type of the data for the binding
enum DataType {
undefined = 0;
string = 1;
binary = 2;
stream = 3;
}
// Indicates whether it is an input or output binding (or a fancy inout binding)
enum Direction {
in = 0;
out = 1;
inout = 2;
}

// Indicates the type of the data for the binding
enum DataType {
undefined = 0;
string = 1;
binary = 2;
stream = 3;
}

// Type of binding (e.g. HttpTrigger)
string type = 2;
Expand All @@ -518,6 +558,9 @@ message BindingInfo {
Direction direction = 3;

DataType data_type = 4;

// Properties for binding metadata
map<string, string> properties = 5;
}

// Used to send logs back to the Host
Expand Down Expand Up @@ -582,13 +625,13 @@ message RpcException {
// Textual message describing the exception
string message = 2;

// Worker specifies whether exception is a user exception,
// for purpose of application insights logging. Defaults to false.
// Worker specifies whether exception is a user exception,
// for purpose of application insights logging. Defaults to false.
bool is_user_exception = 4;

// Type of exception. If it's a user exception, the type is passed along to app insights.
// Otherwise, it's ignored for now.
string type = 5;
string type = 5;
}

// Http cookie type. Note that only name and value are used for Http requests
Expand Down Expand Up @@ -647,3 +690,52 @@ message RpcHttp {
map<string,NullableString> nullable_params = 21;
map<string,NullableString> nullable_query = 22;
}

// Message representing Microsoft.Azure.WebJobs.ParameterBindingData
// Used for hydrating SDK-type bindings in out-of-proc workers
message ModelBindingData
{
// The version of the binding data content
string version = 1;

// The extension source of the binding data
string source = 2;

// The content type of the binding data content
string content_type = 3;

// The binding data content
bytes content = 4;
}

// Used to encapsulate collection model_binding_data
message CollectionModelBindingData {
repeated ModelBindingData model_binding_data = 1;
}

// Retry policy which the worker sends the host when the worker indexes
// a function.
message RpcRetryOptions
{
// The retry strategy to use. Valid values are fixed delay or exponential backoff.
enum RetryStrategy
{
exponential_backoff = 0;
fixed_delay = 1;
}

// The maximum number of retries allowed per function execution.
// -1 means to retry indefinitely.
int32 max_retry_count = 2;

// The delay that's used between retries when you're using a fixed delay strategy.
google.protobuf.Duration delay_interval = 3;

// The minimum retry delay when you're using an exponential backoff strategy
google.protobuf.Duration minimum_interval = 4;

// The maximum retry delay when you're using an exponential backoff strategy
google.protobuf.Duration maximum_interval = 5;

RetryStrategy retry_strategy = 6;
}
33 changes: 1 addition & 32 deletions src/DurableSDK/PowerShellServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
using System.Collections.ObjectModel;
using System.Linq;
using System.Management.Automation;
using System.Reflection.Metadata;
using Microsoft.Azure.Functions.PowerShellWorker.PowerShell;
using Microsoft.Azure.Functions.PowerShellWorker.Utility;
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;
using Newtonsoft.Json;
using LogLevel = WebJobs.Script.Grpc.Messages.RpcLog.Types.Level;

internal class PowerShellServices : IPowerShellServices
{
Expand Down Expand Up @@ -43,36 +41,7 @@ public PowerShellServices(PowerShell pwsh, ILogger logger)

public bool isExternalDurableSdkLoaded()
{
// Search for the external DF SDK in the current session
var matchingModules = _pwsh.AddCommand(Utils.GetModuleCmdletInfo)
.AddParameter("FullyQualifiedName", Utils.ExternalDurableSdkName)
.InvokeAndClearCommands<PSModuleInfo>();

// If we get at least one result, we know the external SDK was imported
var numCandidates = matchingModules.Count();
var isModuleInCurrentSession = numCandidates > 0;

if (isModuleInCurrentSession)
{
var candidatesInfo = matchingModules.Select(module => string.Format(
PowerShellWorkerStrings.FoundExternalDurableSdkInSession, module.Name, module.Version, module.Path));
var externalSDKModuleInfo = string.Join('\n', candidatesInfo);

if (numCandidates > 1)
{
// If there's more than 1 result, there may be runtime conflicts
// warn user of potential conflicts
_logger.Log(isUserOnlyLog: false, LogLevel.Warning, String.Format(
PowerShellWorkerStrings.MultipleExternalSDKsInSession,
numCandidates, Utils.ExternalDurableSdkName, externalSDKModuleInfo));
}
else
{
// a single external SDK is in session. Report its metadata
_logger.Log(isUserOnlyLog: false, LogLevel.Trace, externalSDKModuleInfo);
}
}
return isModuleInCurrentSession;
return PowerShellModuleDetector.IsPowerShellModuleLoaded(_pwsh, _logger, Utils.ExternalDurableSdkName);
}

public void EnableExternalDurableSDK()
Expand Down
20 changes: 20 additions & 0 deletions src/Logging/LoggingEventHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using System.Collections.Generic;

namespace Microsoft.Azure.Functions.PowerShellWorker.Utility
{
internal class LoggingEventHandler
{
private Action<string, string, Exception> _eventHandler = (a, b, c) => { };

public void Subscribe(Action<string, string, Exception> handler)
{
_eventHandler = handler;
}

public void LogToHandlers(string level, string message, Exception exception = null)
{
_eventHandler(level, message, exception);
}
}
}
3 changes: 3 additions & 0 deletions src/Logging/RpcLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ internal class RpcLogger : ILogger
private readonly MessagingStream _msgStream;
private string _invocationId;
private string _requestId;
public LoggingEventHandler outputLogHandler = new LoggingEventHandler();

internal RpcLogger(MessagingStream msgStream)
{
Expand Down Expand Up @@ -55,6 +56,8 @@ public void Log(bool isUserOnlyLog, LogLevel logLevel, string message, Exception
};

_msgStream.Write(logMessage);

outputLogHandler.LogToHandlers(logLevel.ToString(), message, exception);
}
else
{
Expand Down
16 changes: 16 additions & 0 deletions src/OpenTelemetry/IPowerShellServicesForOpenTelemetry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Microsoft.Azure.Functions.PowerShellWorker.OpenTelemetry
{
internal interface IPowerShellServicesForOpenTelemetry
{
bool? IsModuleLoaded();
void AddStartOpenTelemetryInvocationCommand(OpenTelemetryInvocationContext otelContext);
void StopOpenTelemetryInvocation(OpenTelemetryInvocationContext otelContext, bool invokeCommands);
void StartFunctionsLoggingListener(bool invokeCommands);
}
}
Loading

0 comments on commit 684a843

Please sign in to comment.