Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feedback appreciated] Expose message bus to client #343

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion app/volt/tasks/live_query/live_query_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def updated_collection(collection, skip_channel, from_message_bus=false)

msg_bus = @volt_app.message_bus
if !from_message_bus && collection != 'active_volt_instances' && msg_bus
msg_bus.publish('volt_collection_update', collection)
msg_bus.publish('volt:collection_update', collection)
end
end

Expand Down
99 changes: 99 additions & 0 deletions app/volt/tasks/message_bus_tasks.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Remote endpoint for publishing and subscribing to message bus
# Generally you have the power to publish or subscribe to any channel, even to volt internals, if you
# want to.
# Nevertheless, all channels are protected by an authorization layer, so publishing and subscribing
# from client is only possible if the specified user is allowed to. Per default, channel names starting
# with 'public:' are usable for everyone. If you want to restrict some channels / use the authorization
# layer, have a look at /server/message_bus/client_authorizer, where everything you need to know is
# explained very well.
# Volt uses channels starting with 'volt:' for internal stuff, so be aware of publishing / subscribing
# to these channels (although you could do!)

require 'securerandom'
require 'volt/server/message_bus/client_authorizer'

class MessageBusTasks < Volt::Task

# Publishes a message in the message bus
def publish(channel, message)
fail "[MessageBus] Publishing into channel #{channel} not allowed" unless publishing_allowed? channel

# Trigger subscriptions in remote volt app (via the message bus)
Volt.current_app.message_bus.publish(channel, message)

# Trigger local subscriptions, of local volt app
Volt.current_app.message_bus.trigger!(channel, message)

nil
end

# Subscribe to specific events. Returns a listener_id, useful for unsubscribing
def subscribe(*events)
fail "[MessageBus] Subscribing to channels #{events} not allowed" unless subscribing_allowed? *events

listener_id = generate_listener_id
@@subscriptions ||= {}
@@subscriptions[listener_id] = []

# Todo: Maybe do this in a custom thread?
events.each do |event|
@@subscriptions[listener_id] << Volt.current_app.message_bus.on(event) do |msg|
inform_subscriber(event, msg)
end
end

# Remove all registered listeners on client disconnect
connection_listener = Volt.current_app.on('client_disconnect') do
remove(listener_id)
connection_listener.remove # to avoid endless listeners
end

# Todo: If a client reconnects, automatically reattach all subscriptions?!

return listener_id
end

# Removes a subscription, needs the listener_id (see #subscribe for more info)
def remove(listener_id)
if @@subscriptions && @@subscriptions[listener_id]
@@subscriptions[listener_id].each &:remove
@@subscriptions[listener_id] = nil
end

return listener_id
end


# Checks if publishing to the given channels is allowed
def publishing_allowed?(*channels)
is_allowed? :publish, *channels
end

# Checks if subscribing to the given channels is allowed
def subscribing_allowed?(*channels)
is_allowed? :subscribe, *channels
end

private

# informs subscriber about new message in channel
def inform_subscriber(channel, msg)
return unless subscribing_allowed? channel

@channel.send_message('message_bus_event', channel, msg)
end

# Just returns a random listener_id
def generate_listener_id
SecureRandom.uuid
end

# [helper method] Checks if :subscribe or :publish is allowed in all channels
def is_allowed?(method, *channels)
channels.each do |channel|
return false if Volt::MessageBus::ClientAuthorizer.authorized?(self, method, channel) != true
end

true
end
end
52 changes: 52 additions & 0 deletions lib/volt/controllers/message_bus_helpers.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Current status of this: concept..

# Todo: How to add this to controller lifecycle management (see method todo's)
# Adds a method 'message_bus_subscription' to your controller which makes it easier to
# subscribe to message bus
# Usage: message_bus_subscription :my_event, :my_method
# You can also pass a proc instead of a method. The instance method or proc will be called
# every time the event is fired. The listeners will be removed automatically as soon as the
# controller is not needed anymore.

module Volt
module MessageBusHelpers
module ClassMethods
def message_bus_subscription event, callback
callback = callback.to_sym unless callback.is_a?(Proc)
@message_bus_subscriptions ||= []
@message_bus_subscriptions << {event: event, callback: callback}
end
end

def self.included(base)
base.extend ClassMethods
end

# todo: Call this method automatically on controller startup, but only once!
# before_action won't fit here, and hook in initialize either:
# the block is executed many times (5x) (why?) on start up / with a test on main_controller
def register_message_bus
@message_bus_listeners = []
subscriptions = self.class.instance_variable_get :@message_bus_subscriptions
subscriptions ||= []

subscriptions.each do |subscription|
@message_bus_listeners << Volt.current_app.message_bus.on(subscription[:event]) do |*params|
case subscription[:callback]
when Symbol
send(subscription[:callback])
when Proc
instance_eval(&subscription[:callback])
end
end
end
end

# todo: call this automatically once controller is not needed anymore
# how to integrate this into controller lifecycle management?
def remove_message_bus_listeners
return if @message_bus_listeners.nil?
@message_bus_listeners.each &:remove
end
end
end
79 changes: 79 additions & 0 deletions lib/volt/page/message_bus_client_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
require 'volt/reactive/eventable'
require 'volt/server/message_bus/base_message_bus'

module Volt
class MessageBusClientAdapter < MessageBus::BaseMessageBus
include Eventable

# Custom listener class, proxying Eventable#Listener since
# we have to inform remote on the removal of a listener
class ListenerProxy
def initialize(target, remote_listener_id)
@target = target
@listener_id = remote_listener_id
end

# custom remove implementation: also calls task to remove listener
# returns promise of the task
def remove
@target.remove
MessageBusTasks.remove(@listener_id)
end

# proxy all other methods
def method_missing(method, *args, &block)
@target.send(method, *args, &block)
end
end

# Use subscribe instead of on provided in Eventable
alias_method :subscribe, :on
alias_method :eventable_on, :on # this is only for obtaining the original
# method behaviour although overriding it

# Adds a reference to the client app from this proxy
def initialize(volt_app)
@volt_app = volt_app

# Called when the backend informs us about a new subscribed message bus event
@volt_app.channel.on('message') do |*args|
if args.delete_at(0) == 'message_bus_event'
trigger!(*args)
end
end
end

# Publishes a message into the message bus, returns a promise
def publish(channel_name, message)
MessageBusTasks.publish(channel_name, message)
end

# overwrites subscribe and on from Eventable to register subscription in message bus first
# this will return a promise resolving to ListenerProxy, so you can call ".remove" on it directly
def on(*events, &block)
# Promise to resolve on working subscription, giving you a listener to remove
subscription_promise = Promise.new

MessageBusTasks.subscribe(*events).then do |remote_listener_id|
# Register event locally, TODO: direclty pass block
listener = eventable_on(*events) do |*params|
block.call(*params)
end

# Resolve promise with object of ListenerProxy to enable removing of listener
subscription_promise.resolve(ListenerProxy.new(listener, remote_listener_id))
end.fail do |error|
# Tell promise about failure
subscription_promise.reject(error)
end

subscription_promise
end

# Unnecessary on clients
def disconnect!
raise "You cannot disconnect from message bus on the client. 'disconnect!' is only available on the server."
end

end
end
Loading