-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cache fallback layer to RPC call #333
Comments
Hi Péter, The point of extension is the We initialize the WampHost wampHost = new WampHost(new RpcCacheWampRealmContainer()); Next we need to do some ceremony and define a bunch of implementations for all the interfaces in the hierarchy. public class RpcCacheWampRealmContainer : IWampRealmContainer
{
private readonly IWampRealmContainer mContainer;
public RpcCacheWampRealmContainer() : this(new WampRealmContainer())
{
}
public RpcCacheWampRealmContainer(IWampRealmContainer container)
{
mContainer = container;
}
public IWampRealm GetRealmByName(string name)
{
IWampRealm realm = mContainer.GetRealmByName(name);
RpcCacheWampRealm wrapped = new RpcCacheWampRealm(realm);
return wrapped;
}
}
public class RpcCacheWampRealm : IWampRealm
{
private readonly IWampRealm mRealm;
private readonly IWampRpcOperationCatalog mRpcCacheOperationCatalog;
public RpcCacheWampRealm(IWampRealm realm)
{
mRealm = realm;
mRpcCacheOperationCatalog = new RpcCacheWampRpcOperationCatalog(mRealm.RpcCatalog);
}
public string Name => mRealm.Name;
public IWampRpcOperationCatalog RpcCatalog => mRpcCacheOperationCatalog;
public IWampTopicContainer TopicContainer => mRealm.TopicContainer;
} Now the magic happens in our implementation of In order to make my life simpler, I will only show how to implement this when your function takes no arguments. In order to take care of the case where your function takes arguments, you will need to create some struct that stores them and write some equality comparer code. We introduce some cache interface for our purpose: public interface IRpcInvocationCache
{
void MapInvocationToResult(string procedure,
ResultArguments resultArguments);
bool TryGetInvocationResult(string procedure,
out ResultArguments value);
}
public class ResultArguments
{
public YieldOptions YieldOptions { get; }
public object[] Arguments { get; }
public IDictionary<string, object> ArgumentsKeywords { get; }
public ResultArguments(YieldOptions yieldOptions,
object[] arguments = null,
IDictionary<string, object> argumentsKeywords = null)
{
YieldOptions = yieldOptions;
Arguments = arguments;
ArgumentsKeywords = argumentsKeywords;
}
public static ResultArguments FromArguments<TMessage>
(YieldOptions yieldOptions,
TMessage[] arguments = null,
IDictionary<string, TMessage> argumentsKeywords = null)
{
return new ResultArguments(yieldOptions,
arguments?.Cast<object>().ToArray(),
argumentsKeywords?.ToDictionary(x => x.Key,
x => (object) x.Value));
}
}
public class LocalRpcInvocationCache : IRpcInvocationCache
{
private readonly ConcurrentDictionary<string, ResultArguments> mProcedureUriToResult =
new ConcurrentDictionary<string, ResultArguments>();
public void MapInvocationToResult(string procedure, ResultArguments resultArguments)
{
mProcedureUriToResult[procedure] = resultArguments;
}
public bool TryGetInvocationResult(string procedure, out ResultArguments value)
{
return mProcedureUriToResult.TryGetValue(procedure, out value);
}
} Next we implement public class RpcCacheWampRpcOperationCatalog : IWampRpcOperationCatalog
{
private readonly IWampRpcOperationCatalog mRpcOperationCatalog;
private readonly IRpcInvocationCache mInvocationCache = new LocalRpcInvocationCache();
public RpcCacheWampRpcOperationCatalog(IWampRpcOperationCatalog rpcOperationCatalog)
{
mRpcOperationCatalog = rpcOperationCatalog;
}
public IWampCancellableInvocation Invoke<TMessage>(IWampRawRpcOperationRouterCallback caller, IWampFormatter<TMessage> formatter,
InvocationDetails details, string procedure)
{
if (mRpcOperationCatalog.GetMatchingOperation(procedure) != null)
{
return mRpcOperationCatalog.Invoke(new RpcCacheWampRawOperationRouterCallback(caller, procedure, mInvocationCache),
formatter, details, procedure);
}
else
{
if (!mInvocationCache.TryGetInvocationResult(procedure, out ResultArguments resultArguments))
{
// Call "base" to throw an exception.
return mRpcOperationCatalog.Invoke(caller, formatter, details, procedure);
}
else
{
if (resultArguments.ArgumentsKeywords != null)
{
caller.Result(WampObjectFormatter.Value, resultArguments.YieldOptions,
resultArguments.Arguments, resultArguments.ArgumentsKeywords);
}
else if (resultArguments.Arguments != null)
{
caller.Result(WampObjectFormatter.Value, resultArguments.YieldOptions,
resultArguments.Arguments);
}
else
{
caller.Result(WampObjectFormatter.Value, resultArguments.YieldOptions);
}
return null;
}
}
}
public IWampCancellableInvocation Invoke<TMessage>(IWampRawRpcOperationRouterCallback caller, IWampFormatter<TMessage> formatter,
InvocationDetails details, string procedure, TMessage[] arguments)
{
if (arguments.Length == 0)
{
return Invoke(caller, formatter, details, procedure);
}
return mRpcOperationCatalog.Invoke(caller, formatter, details, procedure, arguments);
}
public IWampCancellableInvocation Invoke<TMessage>(IWampRawRpcOperationRouterCallback caller, IWampFormatter<TMessage> formatter,
InvocationDetails details, string procedure, TMessage[] arguments,
IDictionary<string, TMessage> argumentsKeywords)
{
return mRpcOperationCatalog.Invoke(caller, formatter, details, procedure, arguments, argumentsKeywords);
}
public IWampRegistrationSubscriptionToken Register(IWampRpcOperation operation, RegisterOptions options)
{
return mRpcOperationCatalog.Register(operation, options);
}
public event EventHandler<WampProcedureRegisterEventArgs> RegistrationAdded
{
add => mRpcOperationCatalog.RegistrationAdded += value;
remove => mRpcOperationCatalog.RegistrationAdded -= value;
}
public event EventHandler<WampProcedureRegisterEventArgs> RegistrationRemoved
{
add => mRpcOperationCatalog.RegistrationRemoved += value;
remove => mRpcOperationCatalog.RegistrationRemoved -= value;
}
public IWampRpcOperation GetMatchingOperation(string criteria)
{
return mRpcOperationCatalog.GetMatchingOperation(criteria);
}
private class RpcCacheWampRawOperationRouterCallback : IWampRawRpcOperationRouterCallback
{
private readonly IWampRawRpcOperationRouterCallback mCallback;
private readonly string mProcedureName;
private readonly IRpcInvocationCache mInvocationCache;
public RpcCacheWampRawOperationRouterCallback(
IWampRawRpcOperationRouterCallback callback, string procedureName,
IRpcInvocationCache invocationCache)
{
mCallback = callback;
mProcedureName = procedureName;
mInvocationCache = invocationCache;
}
public void Result<TMessage>(IWampFormatter<TMessage> formatter, YieldOptions options)
{
mInvocationCache.MapInvocationToResult(mProcedureName, new ResultArguments(options));
mCallback.Result(formatter, options);
}
public void Result<TMessage>(IWampFormatter<TMessage> formatter, YieldOptions options, TMessage[] arguments)
{
mInvocationCache.MapInvocationToResult(mProcedureName, ResultArguments.FromArguments(options, arguments));
mCallback.Result(formatter, options, arguments);
}
public void Result<TMessage>(IWampFormatter<TMessage> formatter, YieldOptions options, TMessage[] arguments,
IDictionary<string, TMessage> argumentsKeywords)
{
mInvocationCache.MapInvocationToResult(mProcedureName, ResultArguments.FromArguments(options, arguments, argumentsKeywords));
mCallback.Result(formatter, options, arguments, argumentsKeywords);
}
public void Error<TMessage>(IWampFormatter<TMessage> formatter, TMessage details, string error)
{
mCallback.Error(formatter, details, error);
}
public void Error<TMessage>(IWampFormatter<TMessage> formatter, TMessage details, string error, TMessage[] arguments)
{
mCallback.Error(formatter, details, error, arguments);
}
public void Error<TMessage>(IWampFormatter<TMessage> formatter, TMessage details, string error, TMessage[] arguments,
TMessage argumentsKeywords)
{
mCallback.Error(formatter, details, error, arguments, argumentsKeywords);
}
}
} Here's a gist with a full working example. In case you decide to complete this implementation, please upload it here so others can make use of if. Elad |
I'd like some help implementing a caching layer into the Router as a fallback if the callee is not registered at the moment of an RPC call.
The problem is as follows:
Client -> Router -> Callee (RPC)
Callee gives data to client.
Callee goes offline.
Client tries to call RPC, gets „procedure not registered”.
Basically if the procedure is not found, it would try to get the previous calls’ data from Redis/some other cache instead of throwing a „procedure not registered” right away. I’ve tried following the inner workings of the router by debugging it, however I do not think it is conventionally possible to implement it this way.
So what I would basically want to achieve is:
Client -> Router ->Callee
Callee->Router -> Cache data here -> caller
Callee goes offline
Client- >router->procedure not found -> go to cache -> cache data found -> return cache data instead.
I know it would be possible to implement this at the callers end, however it would be a lot simpler and a lot more efficient if it could be implemented on the router's side.
The text was updated successfully, but these errors were encountered: