Skip to content

Commit

Permalink
Update language worker to support parsing command-line arguments pref…
Browse files Browse the repository at this point in the history
…ix with functions-<argumentname> (#997)

* Update language worker to support parsing command-line arguments prefix with functions-<argumentname> (#993)

* Populate language worker metadata in init response (#884) 

* Updated subtree from https://github.com/azure/azure-functions-language-worker-protobuf. Tag: v1.5.8-protofile. Commit: 14b2ba5ccb188c160c0f6c519ec1d4521ee36440 (#876)
  • Loading branch information
Francisco-Gamino authored Sep 11, 2023
1 parent 5c90d7e commit c956de4
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 35 deletions.
9 changes: 0 additions & 9 deletions protobuf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,6 @@ From within the Azure Functions language worker repo:
- `git commit -m "Updated subtree from https://github.com/azure/azure-functions-language-worker-protobuf. Tag: <tag-name>. Commit: <commit hash>"`
- `git push`

## Releasing a Language Worker Protobuf version

1. Draft a release in the GitHub UI
- Be sure to include details of the release
2. Create a release version, following semantic versioning guidelines ([semver.org](https://semver.org/))
3. Tag the version with the pattern: `v<M>.<m>.<p>-protofile` (example: `v1.1.0-protofile`)
4. Merge `dev` to `main`
5. Run the release you'd created

## Consuming FunctionRPC.proto
*Note: Update versionNumber before running following commands*

Expand Down
41 changes: 38 additions & 3 deletions protobuf/src/proto/FunctionRpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ message StreamingMessage {
WorkerInitRequest worker_init_request = 17;
// Worker responds after initializing with its capabilities & status
WorkerInitResponse worker_init_response = 16;

// Worker periodically sends empty heartbeat message to host
WorkerHeartbeat worker_heartbeat = 15;

// Host sends terminate message to worker.
// Worker terminates if it can, otherwise host terminates after a grace period
Expand Down Expand Up @@ -117,13 +120,35 @@ message WorkerInitRequest {

// Worker responds with the result of initializing itself
message WorkerInitResponse {
// Version of worker
// NOT USED
// TODO: Remove from protobuf during next breaking change release
string worker_version = 1;

// A map of worker supported features/capabilities
map<string, string> capabilities = 2;

// Status of the response
StatusResult result = 3;

// Worker metadata captured for telemetry purposes
WorkerMetadata worker_metadata = 4;
}

message WorkerMetadata {
// The runtime/stack name
string runtime_name = 1;

// The version of the runtime/stack
string runtime_version = 2;

// The version of the worker
string worker_version = 3;

// The worker bitness/architecture
string worker_bitness = 4;

// Optional additional custom properties
map<string, string> custom_properties = 5;
}

// Used by the host to determine success/failure/cancellation
Expand All @@ -134,6 +159,7 @@ message StatusResult {
Success = 1;
Cancelled = 2;
}

// Status for the given result
Status status = 4;

Expand All @@ -147,6 +173,10 @@ message StatusResult {
repeated RpcLog logs = 3;
}

// NOT USED
// TODO: Remove from protobuf during next breaking change release
message WorkerHeartbeat {}

// Warning before killing the process after grace_period
// Worker self terminates ..no response on this
message WorkerTerminate {
Expand Down Expand Up @@ -291,6 +321,11 @@ message RpcFunctionMetadata {

// A flag indicating if managed dependency is enabled or not
bool managed_dependency_enabled = 14;

// Properties for function metadata
// They're usually specific to a worker and largely passed along to the controller API for use
// outside the host
map<string,string> Properties = 16;
}

// Host tells worker it is ready to receive metadata
Expand Down Expand Up @@ -549,11 +584,11 @@ message RpcException {

// Worker specifies whether exception is a user exception,
// for purpose of application insights logging. Defaults to false.
optional bool is_user_exception = 4;
bool is_user_exception = 4;

// Type of exception. If it's a user exception, the type is passed along to app insights.
// Otherwise, it's ignored for now.
optional string type = 5;
string type = 5;
}

// Http cookie type. Note that only name and value are used for Http requests
Expand Down
44 changes: 38 additions & 6 deletions src/RequestProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ namespace Microsoft.Azure.Functions.PowerShellWorker
{
using System.Diagnostics;
using LogLevel = Microsoft.Azure.WebJobs.Script.Grpc.Messages.RpcLog.Types.Level;
using System.Runtime.InteropServices;

internal class RequestProcessor
{
private readonly MessagingStream _msgStream;
private readonly System.Management.Automation.PowerShell _firstPwshInstance;
private readonly PowerShellManagerPool _powershellPool;
private DependencyManager _dependencyManager;
private string _pwshVersion;

// Holds the exception if an issue is encountered while processing the function app dependencies.
private Exception _initTerminatingError;
Expand All @@ -37,11 +39,12 @@ internal class RequestProcessor
private Dictionary<StreamingMessage.ContentOneofCase, Func<StreamingMessage, StreamingMessage>> _requestHandlers =
new Dictionary<StreamingMessage.ContentOneofCase, Func<StreamingMessage, StreamingMessage>>();

internal RequestProcessor(MessagingStream msgStream, System.Management.Automation.PowerShell firstPwshInstance)
internal RequestProcessor(MessagingStream msgStream, System.Management.Automation.PowerShell firstPwshInstance, string pwshVersion)
{
_msgStream = msgStream;
_firstPwshInstance = firstPwshInstance;
_powershellPool = new PowerShellManagerPool(() => new RpcLogger(msgStream));
_pwshVersion = pwshVersion;

// Host sends capabilities/init data to worker
_requestHandlers.Add(StreamingMessage.ContentOneofCase.WorkerInitRequest, ProcessWorkerInitRequest);
Expand Down Expand Up @@ -95,6 +98,9 @@ internal async Task ProcessRequestLoop()

internal StreamingMessage ProcessWorkerInitRequest(StreamingMessage request)
{
var stopwatch = new Stopwatch();
stopwatch.Start();

var workerInitRequest = request.WorkerInitRequest;
Environment.SetEnvironmentVariable("AZUREPS_HOST_ENVIRONMENT", $"AzureFunctions/{workerInitRequest.HostVersion}");
Environment.SetEnvironmentVariable("POWERSHELL_DISTRIBUTION_CHANNEL", $"Azure-Functions:{workerInitRequest.HostVersion}");
Expand All @@ -117,6 +123,22 @@ internal StreamingMessage ProcessWorkerInitRequest(StreamingMessage request)
RemoteSessionNamedPipeServer.CreateCustomNamedPipeServer(pipeName);
}

try
{
var rpcLogger = new RpcLogger(_msgStream);
rpcLogger.SetContext(request.RequestId, null);

response.WorkerInitResponse.WorkerMetadata = GetWorkerMetadata(_pwshVersion);

rpcLogger.Log(isUserOnlyLog: false, LogLevel.Trace, string.Format(PowerShellWorkerStrings.WorkerInitCompleted, stopwatch.ElapsedMilliseconds));
}
catch (Exception e)
{
status.Status = StatusResult.Types.Status.Failure;
status.Exception = e.ToRpcException();
return response;
}

return response;
}

Expand Down Expand Up @@ -180,11 +202,10 @@ internal StreamingMessage ProcessFunctionLoadRequest(StreamingMessage request)
return response;
}

// Ideally, the initialization should happen when processing 'WorkerInitRequest', however, the 'WorkerInitRequest'
// message doesn't provide information about the FunctionApp. That information is not available until the first
// 'FunctionLoadRequest' comes in. Therefore, we run initialization here.
// Also, we receive a FunctionLoadRequest when a proxy is configured. Proxies don't have the Metadata.Directory set
// which would cause initialization issues with the PSModulePath. Since they don't have that set, we skip over them.
// Ideally, the initialization should happen when processing 'WorkerInitRequest'. However, we defer the initialization
// until the first 'FunctionLoadRequest' which contains the information about whether Managed Dependencies is enabled for the function app,
// and if it is, we add the Managed Dependencies path to the PSModulePath.
// Also, we receive a FunctionLoadRequest when a proxy is configured. This is just a no-op on the worker size, so we skip over them.
if (!_isFunctionAppInitialized && !functionLoadRequest.Metadata.IsProxy)
{
try
Expand Down Expand Up @@ -519,6 +540,17 @@ private void SetupAppRootPathAndModulePath(FunctionLoadRequest functionLoadReque
.InvokeAndClearCommands();
}

private WorkerMetadata GetWorkerMetadata(string pwshVersion)
{
var data = new WorkerMetadata();
data.WorkerBitness = RuntimeInformation.OSArchitecture.ToString();
data.WorkerVersion = typeof(Worker).Assembly.GetName().Version.ToString();
data.RuntimeVersion = pwshVersion;
data.RuntimeName = "powershell";

return data;
}

#endregion
}
}
103 changes: 86 additions & 17 deletions src/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,70 @@ public async static Task Main(string[] args)
LogLevel.Information,
string.Format(PowerShellWorkerStrings.PowerShellWorkerVersion, typeof(Worker).Assembly.GetName().Version));

WorkerArguments arguments = null;
Parser.Default.ParseArguments<WorkerArguments>(args)
.WithParsed(ops => arguments = ops)
.WithNotParsed(err => Environment.Exit(1));
var workerOptions = new WorkerOptions();

var parser = new Parser(settings =>
{
settings.EnableDashDash = true;
settings.IgnoreUnknownArguments = true;
});
parser.ParseArguments<WorkerArguments>(args)
.WithParsed(workerArgs =>
{
// TODO: Remove parsing old command-line arguments that are not prefixed with functions-<argumentname>
// for more information, see https://github.com/Azure/azure-functions-powershell-worker/issues/995
workerOptions.WorkerId = workerArgs.FunctionsWorkerId ?? workerArgs.WorkerId;
workerOptions.RequestId = workerArgs.FunctionsRequestId ?? workerArgs.RequestId;

if (!string.IsNullOrWhiteSpace(workerArgs.FunctionsUri))
{
try
{
// TODO: Update WorkerOptions to have a URI property instead of host name and port number
// for more information, see https://github.com/Azure/azure-functions-powershell-worker/issues/994
var uri = new Uri(workerArgs.FunctionsUri);
workerOptions.Host = uri.Host;
workerOptions.Port = uri.Port;
}
catch (UriFormatException formatEx)
{
var message = $"Invalid URI format: {workerArgs.FunctionsUri}. Error message: {formatEx.Message}";
throw new ArgumentException(message, nameof(workerArgs.FunctionsUri));
}
}
else
{
workerOptions.Host = workerArgs.Host;
workerOptions.Port = workerArgs.Port;
}

// Validate workerOptions
ValidateProperty("WorkerId", workerOptions.WorkerId);
ValidateProperty("RequestId", workerOptions.RequestId);
ValidateProperty("Host", workerOptions.Host);

if (workerOptions.Port <= 0)
{
throw new ArgumentException("Port number has not been initialized", nameof(workerOptions.Port));
}
});

// Create the very first Runspace so the debugger has the target to attach to.
// This PowerShell instance is shared by the first PowerShellManager instance created in the pool,
// and the dependency manager (used to download dependent modules if needed).
var firstPowerShellInstance = Utils.NewPwshInstance();
LogPowerShellVersion(firstPowerShellInstance);
var pwshVersion = Utils.GetPowerShellVersion(firstPowerShellInstance);
LogPowerShellVersion(pwshVersion);
WarmUpPowerShell(firstPowerShellInstance);

var msgStream = new MessagingStream(arguments.Host, arguments.Port);
var requestProcessor = new RequestProcessor(msgStream, firstPowerShellInstance);
var msgStream = new MessagingStream(workerOptions.Host, workerOptions.Port);
var requestProcessor = new RequestProcessor(msgStream, firstPowerShellInstance, pwshVersion);

// Send StartStream message
var startedMessage = new StreamingMessage()
{
RequestId = arguments.RequestId,
StartStream = new StartStream() { WorkerId = arguments.WorkerId }
RequestId = workerOptions.RequestId,
StartStream = new StartStream() { WorkerId = workerOptions.WorkerId }
};

msgStream.Write(startedMessage);
Expand All @@ -75,28 +119,53 @@ private static void WarmUpPowerShell(System.Management.Automation.PowerShell fir
.InvokeAndClearCommands();
}

private static void LogPowerShellVersion(System.Management.Automation.PowerShell pwsh)
private static void LogPowerShellVersion(string pwshVersion)
{
var message = string.Format(PowerShellWorkerStrings.PowerShellVersion, Utils.GetPowerShellVersion(pwsh));
var message = string.Format(PowerShellWorkerStrings.PowerShellVersion, pwshVersion);
RpcLogger.WriteSystemLog(LogLevel.Information, message);
}

private static void ValidateProperty(string name, string value)
{
if (string.IsNullOrWhiteSpace(value))
{
throw new ArgumentException($"{name} is null or empty", name);
}
}
}

internal class WorkerArguments
{
[Option("host", Required = true, HelpText = "IP Address used to connect to the Host via gRPC.")]
[Option("host", Required = false, HelpText = "IP Address used to connect to the Host via gRPC.")]
public string Host { get; set; }

[Option("port", Required = true, HelpText = "Port used to connect to the Host via gRPC.")]
[Option("port", Required = false, HelpText = "Port used to connect to the Host via gRPC.")]
public int Port { get; set; }

[Option("workerId", Required = true, HelpText = "Worker ID assigned to this language worker.")]
[Option("workerId", Required = false, HelpText = "Worker ID assigned to this language worker.")]
public string WorkerId { get; set; }

[Option("requestId", Required = true, HelpText = "Request ID used for gRPC communication with the Host.")]
[Option("requestId", Required = false, HelpText = "Request ID used for gRPC communication with the Host.")]
public string RequestId { get; set; }

[Option("grpcMaxMessageLength", Required = false, HelpText = "[Deprecated and ignored] gRPC Maximum message size.")]
public int MaxMessageLength { get; set; }
[Option("functions-uri", Required = false, HelpText = "URI with IP Address and Port used to connect to the Host via gRPC.")]
public string FunctionsUri { get; set; }

[Option("functions-workerid", Required = false, HelpText = "Worker ID assigned to this language worker.")]
public string FunctionsWorkerId { get; set; }

[Option("functions-requestid", Required = false, HelpText = "Request ID used for gRPC communication with the Host.")]
public string FunctionsRequestId { get; set; }
}

internal class WorkerOptions
{
public string Host { get; set; }

public int Port { get; set; }

public string WorkerId { get; set; }

public string RequestId { get; set; }
}
}
3 changes: 3 additions & 0 deletions src/resources/PowerShellWorkerStrings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -355,4 +355,7 @@
<data name="AutomaticUpgradesAreDisabled" xml:space="preserve">
<value>Automatic upgrades are disabled in PowerShell 7.0 function apps. To enable this functionality back, please migrate your function app to PowerShell 7.2. For more details, see https://aka.ms/functions-powershell-7.0-to-7.2.</value>
</data>
<data name="WorkerInitCompleted" xml:space="preserve">
<value>Worker init request completed in {0} ms.</value>
</data>
</root>

0 comments on commit c956de4

Please sign in to comment.