diff --git a/src/Eventing/EventSources.cs b/src/Eventing/EventSources.cs new file mode 100644 index 00000000..a392d8ea --- /dev/null +++ b/src/Eventing/EventSources.cs @@ -0,0 +1,13 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal static class EventSources + { + public const string ScriptFiles = "ScriptFiles"; + public const string Rpc = "Rpc"; + public const string Worker = "Worker"; + public const string WorkerProcess = "WorkerProcess"; + } +} diff --git a/src/Eventing/IScriptEventManager.cs b/src/Eventing/IScriptEventManager.cs new file mode 100644 index 00000000..8490a756 --- /dev/null +++ b/src/Eventing/IScriptEventManager.cs @@ -0,0 +1,12 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal interface IScriptEventManager : IObservable + { + void Publish(ScriptEvent scriptEvent); + } +} diff --git a/src/Eventing/Rpc/InboundEvent.cs b/src/Eventing/Rpc/InboundEvent.cs new file mode 100644 index 00000000..35630798 --- /dev/null +++ b/src/Eventing/Rpc/InboundEvent.cs @@ -0,0 +1,17 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Grpc.Core; +using Microsoft.Azure.WebJobs.Script.Grpc.Messages; + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal class InboundEvent : RpcEvent + { + public IAsyncStreamReader requestStream; + + public InboundEvent(string workerId, StreamingMessage message) : base(workerId, message, MessageOrigin.Worker) + { + } + } +} diff --git a/src/Eventing/Rpc/OutboundEvent.cs b/src/Eventing/Rpc/OutboundEvent.cs new file mode 100644 index 00000000..ded58db6 --- /dev/null +++ b/src/Eventing/Rpc/OutboundEvent.cs @@ -0,0 +1,13 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +using Microsoft.Azure.WebJobs.Script.Grpc.Messages; + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal class OutboundEvent : RpcEvent + { + public OutboundEvent(string workerId, StreamingMessage message) : base(workerId, message, MessageOrigin.Host) + { + } + } +} diff --git a/src/Eventing/Rpc/RpcEvent.cs b/src/Eventing/Rpc/RpcEvent.cs new file mode 100644 index 00000000..d4f4f863 --- /dev/null +++ b/src/Eventing/Rpc/RpcEvent.cs @@ -0,0 +1,31 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +using Microsoft.Azure.WebJobs.Script.Grpc.Messages; + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal class RpcEvent : ScriptEvent + { + internal RpcEvent(string workerId, StreamingMessage message, MessageOrigin origin = MessageOrigin.Host) + : base(message.ContentCase.ToString(), EventSources.Rpc) + { + Message = message; + Origin = origin; + WorkerId = workerId; + } + + public enum MessageOrigin + { + Worker, + Host + } + + public MessageOrigin Origin { get; } + + public StreamingMessage.ContentOneofCase MessageType => Message.ContentCase; + + public string WorkerId { get; } + + public StreamingMessage Message { get; } + } +} diff --git a/src/Eventing/ScriptEvent.cs b/src/Eventing/ScriptEvent.cs new file mode 100644 index 00000000..14b683c9 --- /dev/null +++ b/src/Eventing/ScriptEvent.cs @@ -0,0 +1,18 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal class ScriptEvent + { + public ScriptEvent(string name, string source) + { + Name = name; + Source = source; + } + + public string Name { get; } + + public string Source { get; } + } +} diff --git a/src/Eventing/ScriptEventManager.cs b/src/Eventing/ScriptEventManager.cs new file mode 100644 index 00000000..0a36bc8a --- /dev/null +++ b/src/Eventing/ScriptEventManager.cs @@ -0,0 +1,51 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Reactive.Subjects; + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal sealed class ScriptEventManager : IScriptEventManager, IDisposable + { + private readonly Subject _subject = new Subject(); + private bool _disposed = false; + + public void Publish(ScriptEvent scriptEvent) + { + ThrowIfDisposed(); + + _subject.OnNext(scriptEvent); + } + + public IDisposable Subscribe(IObserver observer) + { + ThrowIfDisposed(); + + return _subject.Subscribe(observer); + } + + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(ScriptEventManager)); + } + } + + private void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + _subject.Dispose(); + } + + _disposed = true; + } + } + + public void Dispose() => Dispose(true); + } +} diff --git a/src/Logging/RpcLogger.cs b/src/Logging/RpcLogger.cs index 97e5eeef..422e55f2 100644 --- a/src/Logging/RpcLogger.cs +++ b/src/Logging/RpcLogger.cs @@ -55,7 +55,7 @@ public void Log(LogLevel logLevel, string message, Exception exception = null, b } }; - _msgStream.Write(logMessage); + _msgStream.AddToBlockingQueue(logMessage); } else { diff --git a/src/Messaging/MessagingStream.cs b/src/Messaging/MessagingStream.cs index 750afde9..71a9afdc 100644 --- a/src/Messaging/MessagingStream.cs +++ b/src/Messaging/MessagingStream.cs @@ -4,6 +4,7 @@ // using System; +using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; @@ -15,7 +16,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Messaging internal class MessagingStream { private readonly AsyncDuplexStreamingCall _call; - private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(initialCount: 1, maxCount: 1); + private BlockingCollection _blockingCollectionQueue = new BlockingCollection(); internal MessagingStream(string host, int port) { @@ -28,30 +29,34 @@ internal MessagingStream(string host, int port) /// internal StreamingMessage GetCurrentMessage() => _call.ResponseStream.Current; + internal void AddToBlockingQueue(StreamingMessage streamingMessage) + { + _blockingCollectionQueue.Add(streamingMessage); + } + /// /// Move to the next message. /// internal async Task MoveNext() => await _call.ResponseStream.MoveNext(CancellationToken.None); - /// - /// Write the outgoing message. - /// - internal void Write(StreamingMessage message) => WriteImplAsync(message).ConfigureAwait(false); - /// /// Take a message from the buffer and write to the gRPC channel. /// - private async Task WriteImplAsync(StreamingMessage message) + internal Task Write() { - try - { - await _semaphoreSlim.WaitAsync(); - await _call.RequestStream.WriteAsync(message); - } - finally + var consumer = Task.Run(async () => { - _semaphoreSlim.Release(); - } + foreach (var rpcWriteMsg in _blockingCollectionQueue.GetConsumingEnumerable()) + { + await _call.RequestStream.WriteAsync(rpcWriteMsg); + } + }); + return consumer; + } + + internal Task Write(StreamingMessage msg) + { + return _call.RequestStream.WriteAsync(msg); } } } diff --git a/src/Microsoft.Azure.Functions.PowerShellWorker.csproj b/src/Microsoft.Azure.Functions.PowerShellWorker.csproj index 683b0e8a..749d003c 100644 --- a/src/Microsoft.Azure.Functions.PowerShellWorker.csproj +++ b/src/Microsoft.Azure.Functions.PowerShellWorker.csproj @@ -25,6 +25,7 @@ Licensed under the MIT license. See LICENSE file in the project root for full li + diff --git a/src/RequestProcessor.cs b/src/RequestProcessor.cs index cd9760f1..0b3773c6 100644 --- a/src/RequestProcessor.cs +++ b/src/RequestProcessor.cs @@ -15,6 +15,9 @@ using Microsoft.Azure.Functions.PowerShellWorker.DependencyManagement; using Microsoft.Azure.WebJobs.Script.Grpc.Messages; using LogLevel = Microsoft.Azure.WebJobs.Script.Grpc.Messages.RpcLog.Types.Level; +using System.Reactive.Linq; +using System.Reactive.Concurrency; +using System.Collections.Concurrent; namespace Microsoft.Azure.Functions.PowerShellWorker { @@ -32,15 +35,24 @@ internal class RequestProcessor // Indicate whether the FunctionApp has been initialized. private bool _isFunctionAppInitialized; - private Dictionary> _requestHandlers = - new Dictionary>(); + private IScriptEventManager _eventManager; + private string _workerId; - internal RequestProcessor(MessagingStream msgStream) + private Dictionary>> _requestHandlers = + new Dictionary>>(); + + internal RequestProcessor(MessagingStream msgStream, string workerId) { _msgStream = msgStream; _powershellPool = new PowerShellManagerPool(msgStream); _functionLoader = new FunctionLoader(); _dependencyManager = new DependencyManager(); + _eventManager = new ScriptEventManager(); + _workerId = workerId; + _eventManager.OfType() + .ObserveOn(NewThreadScheduler.Default) + .Where(msg => msg.WorkerId == _workerId) + .Subscribe((msg) => InboundEventHandler(msg)); // Host sends capabilities/init data to worker _requestHandlers.Add(StreamingMessage.ContentOneofCase.WorkerInitRequest, ProcessWorkerInitRequest); @@ -68,31 +80,36 @@ internal RequestProcessor(MessagingStream msgStream) _requestHandlers.Add(StreamingMessage.ContentOneofCase.FunctionEnvironmentReloadRequest, ProcessFunctionEnvironmentReloadRequest); } + internal async void InboundEventHandler(InboundEvent serverMessage) + { + StreamingMessage response = null; + StreamingMessage request = serverMessage.Message; + if (_requestHandlers.TryGetValue(request.ContentCase, out Func> requestFunc)) + { + response = await requestFunc(request); + } + else + { + RpcLogger.WriteSystemLog(string.Format(PowerShellWorkerStrings.UnsupportedMessage, request.ContentCase)); + } + + if (response != null) + { + _msgStream.AddToBlockingQueue(response); + } + } + internal async Task ProcessRequestLoop() { - StreamingMessage request, response; + StreamingMessage request; while (await _msgStream.MoveNext()) { request = _msgStream.GetCurrentMessage(); - - if (_requestHandlers.TryGetValue(request.ContentCase, out Func requestFunc)) - { - response = requestFunc(request); - } - else - { - RpcLogger.WriteSystemLog(string.Format(PowerShellWorkerStrings.UnsupportedMessage, request.ContentCase)); - continue; - } - - if (response != null) - { - _msgStream.Write(response); - } + _eventManager.Publish(new InboundEvent(_workerId, request)); } } - internal StreamingMessage ProcessWorkerInitRequest(StreamingMessage request) + internal Task ProcessWorkerInitRequest(StreamingMessage request) { StreamingMessage response = NewStreamingMessageTemplate( request.RequestId, @@ -109,15 +126,15 @@ internal StreamingMessage ProcessWorkerInitRequest(StreamingMessage request) RemoteSessionNamedPipeServer.CreateCustomNamedPipeServer(pipeName); } - return response; + return Task.FromResult(response); } - internal StreamingMessage ProcessWorkerTerminateRequest(StreamingMessage request) + internal Task ProcessWorkerTerminateRequest(StreamingMessage request) { return null; } - internal StreamingMessage ProcessWorkerStatusRequest(StreamingMessage request) + internal Task ProcessWorkerStatusRequest(StreamingMessage request) { // WorkerStatusResponse type says that it is not used but this will create an empty one anyway to return to the host StreamingMessage response = NewStreamingMessageTemplate( @@ -125,10 +142,10 @@ internal StreamingMessage ProcessWorkerStatusRequest(StreamingMessage request) StreamingMessage.ContentOneofCase.WorkerStatusResponse, out StatusResult status); - return response; + return Task.FromResult(response); } - internal StreamingMessage ProcessFileChangeEventRequest(StreamingMessage request) + internal Task ProcessFileChangeEventRequest(StreamingMessage request) { return null; } @@ -139,7 +156,7 @@ internal StreamingMessage ProcessFileChangeEventRequest(StreamingMessage request /// concurrently as a FunctionApp doesn't include a lot functions in general. Having this step sequential /// will make the Runspace-level initialization easier and more predictable. /// - internal StreamingMessage ProcessFunctionLoadRequest(StreamingMessage request) + internal Task ProcessFunctionLoadRequest(StreamingMessage request) { FunctionLoadRequest functionLoadRequest = request.FunctionLoadRequest; @@ -156,7 +173,7 @@ internal StreamingMessage ProcessFunctionLoadRequest(StreamingMessage request) { status.Status = StatusResult.Types.Status.Failure; status.Exception = _initTerminatingError.ToRpcException(); - return response; + return Task.FromResult(response); } // Ideally, the initialization should happen when processing 'WorkerInitRequest', however, the 'WorkerInitRequest' @@ -177,7 +194,7 @@ internal StreamingMessage ProcessFunctionLoadRequest(StreamingMessage request) status.Status = StatusResult.Types.Status.Failure; status.Exception = e.ToRpcException(); - return response; + return Task.FromResult(response); } } @@ -192,14 +209,14 @@ internal StreamingMessage ProcessFunctionLoadRequest(StreamingMessage request) status.Exception = e.ToRpcException(); } - return response; + return Task.FromResult(response); } /// /// Method to process a InvocationRequest. /// This method checks out a worker from the pool, and then starts the actual invocation in a threadpool thread. /// - internal StreamingMessage ProcessInvocationRequest(StreamingMessage request) + internal Task ProcessInvocationRequest(StreamingMessage request) { AzFunctionInfo functionInfo = null; PowerShellManager psManager = null; @@ -208,19 +225,7 @@ internal StreamingMessage ProcessInvocationRequest(StreamingMessage request) { functionInfo = _functionLoader.GetFunctionInfo(request.InvocationRequest.FunctionId); psManager = _powershellPool.CheckoutIdleWorker(request, functionInfo); - - if (_powershellPool.UpperBound == 1) - { - // When the concurrency upper bound is 1, we can handle only one invocation at a time anyways, - // so it's better to just do it on the current thread to reduce the required synchronization. - ProcessInvocationRequestImpl(request, functionInfo, psManager); - } - else - { - // When the concurrency upper bound is more than 1, we have to handle the invocation in a worker - // thread, so multiple invocations can make progress at the same time, even though by time-sharing. - Task.Run(() => ProcessInvocationRequestImpl(request, functionInfo, psManager)); - } + return ProcessInvocationRequestImpl(request, functionInfo, psManager); } catch (Exception e) { @@ -235,17 +240,15 @@ internal StreamingMessage ProcessInvocationRequest(StreamingMessage request) status.Status = StatusResult.Types.Status.Failure; status.Exception = e.ToRpcException(); - return response; + return Task.FromResult(response); } - - return null; } /// /// Implementation method to actual invoke the corresponding function. /// InvocationRequest messages are processed in parallel when there are multiple PowerShellManager instances in the pool. /// - private void ProcessInvocationRequestImpl(StreamingMessage request, AzFunctionInfo functionInfo, PowerShellManager psManager) + private async Task ProcessInvocationRequestImpl(StreamingMessage request, AzFunctionInfo functionInfo, PowerShellManager psManager) { InvocationRequest invocationRequest = request.InvocationRequest; StreamingMessage response = NewStreamingMessageTemplate( @@ -258,8 +261,8 @@ private void ProcessInvocationRequestImpl(StreamingMessage request, AzFunctionIn { // Invoke the function and return a hashtable of out binding data Hashtable results = functionInfo.Type == AzFunctionType.OrchestrationFunction - ? InvokeOrchestrationFunction(psManager, functionInfo, invocationRequest) - : InvokeSingleActivityFunction(psManager, functionInfo, invocationRequest); + ? await InvokeOrchestrationFunction(psManager, functionInfo, invocationRequest) + : await InvokeSingleActivityFunction(psManager, functionInfo, invocationRequest); BindOutputFromResult(response.InvocationResponse, functionInfo, results); } @@ -273,22 +276,22 @@ private void ProcessInvocationRequestImpl(StreamingMessage request, AzFunctionIn _powershellPool.ReclaimUsedWorker(psManager); } - _msgStream.Write(response); + return response; } - internal StreamingMessage ProcessInvocationCancelRequest(StreamingMessage request) + internal Task ProcessInvocationCancelRequest(StreamingMessage request) { return null; } - internal StreamingMessage ProcessFunctionEnvironmentReloadRequest(StreamingMessage request) + internal Task ProcessFunctionEnvironmentReloadRequest(StreamingMessage request) { StreamingMessage response = NewStreamingMessageTemplate( request.RequestId, StreamingMessage.ContentOneofCase.FunctionEnvironmentReloadResponse, out StatusResult status); - return response; + return Task.FromResult(response); } #region Helper_Methods @@ -359,7 +362,7 @@ private StreamingMessage NewStreamingMessageTemplate(string requestId, Streaming /// /// Invoke an orchestration function. /// - private Hashtable InvokeOrchestrationFunction(PowerShellManager psManager, AzFunctionInfo functionInfo, InvocationRequest invocationRequest) + private Task InvokeOrchestrationFunction(PowerShellManager psManager, AzFunctionInfo functionInfo, InvocationRequest invocationRequest) { throw new NotImplementedException(PowerShellWorkerStrings.DurableFunctionNotSupported); } @@ -367,7 +370,7 @@ private Hashtable InvokeOrchestrationFunction(PowerShellManager psManager, AzFun /// /// Invoke a regular function or an activity function. /// - private Hashtable InvokeSingleActivityFunction(PowerShellManager psManager, AzFunctionInfo functionInfo, InvocationRequest invocationRequest) + private Task InvokeSingleActivityFunction(PowerShellManager psManager, AzFunctionInfo functionInfo, InvocationRequest invocationRequest) { const string InvocationId = "InvocationId"; const string FunctionDirectory = "FunctionDirectory"; @@ -403,7 +406,7 @@ private Hashtable InvokeSingleActivityFunction(PowerShellManager psManager, AzFu } } - return psManager.InvokeFunction(functionInfo, triggerMetadata, invocationRequest.InputData); + return Task.FromResult(psManager.InvokeFunction(functionInfo, triggerMetadata, invocationRequest.InputData)); } /// diff --git a/src/Worker.cs b/src/Worker.cs index 75cd8051..e68071f0 100644 --- a/src/Worker.cs +++ b/src/Worker.cs @@ -11,6 +11,7 @@ using Microsoft.Azure.Functions.PowerShellWorker.Messaging; using Microsoft.Azure.Functions.PowerShellWorker.Utility; using Microsoft.Azure.WebJobs.Script.Grpc.Messages; +using System.Threading; namespace Microsoft.Azure.Functions.PowerShellWorker { @@ -28,18 +29,18 @@ public async static Task Main(string[] args) Parser.Default.ParseArguments(args) .WithParsed(ops => arguments = ops) .WithNotParsed(err => Environment.Exit(1)); - var msgStream = new MessagingStream(arguments.Host, arguments.Port); - var requestProcessor = new RequestProcessor(msgStream); + var requestProcessor = new RequestProcessor(msgStream, arguments.WorkerId); // Send StartStream message var startedMessage = new StreamingMessage() { RequestId = arguments.RequestId, StartStream = new StartStream() { WorkerId = arguments.WorkerId } }; - - msgStream.Write(startedMessage); - await requestProcessor.ProcessRequestLoop(); + await msgStream.Write(startedMessage); + var writerTask = msgStream.Write(); + var readerTask = requestProcessor.ProcessRequestLoop(); + await Task.WhenAll(writerTask, readerTask); } }