class ActiveSupport::Notifications::Fanout
This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.
This class is thread safe. All methods are reentrant.
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/active_support/notifications/fanout.rb, line 27 def initialize @string_subscribers = Hash.new { |h, k| h[k] = [] } @other_subscribers = [] @listeners_for = Concurrent::Map.new super end
Public Instance Methods
finish(name, id, payload, listeners = listeners_for(name))
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 75 def finish(name, id, payload, listeners = listeners_for(name)) iterate_guarding_exceptions(listeners) { |s| s.finish(name, id, payload) } end
iterate_guarding_exceptions(listeners) { |s| ... }
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 87 def iterate_guarding_exceptions(listeners) exceptions = nil listeners.each do |s| yield s rescue Exception => e exceptions ||= [] exceptions << e end if exceptions if exceptions.size == 1 raise exceptions.first else raise InstrumentationSubscriberError.new(exceptions), cause: exceptions.first end end listeners end
listeners_for(name)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 108 def listeners_for(name) # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics) @listeners_for[name] || synchronize do # use synchronisation when accessing @subscribers @listeners_for[name] ||= @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) } end end
listening?(name)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 117 def listening?(name) listeners_for(name).any? end
publish(name, *args)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 79 def publish(name, *args) iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) } end
publish_event(event)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 83 def publish_event(event) iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) } end
start(name, id, payload)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 71 def start(name, id, payload) iterate_guarding_exceptions(listeners_for(name)) { |s| s.start(name, id, payload) } end
subscribe(pattern = nil, callable = nil, monotonic: false, &block)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 34 def subscribe(pattern = nil, callable = nil, monotonic: false, &block) subscriber = Subscribers.new(pattern, callable || block, monotonic) synchronize do case pattern when String @string_subscribers[pattern] << subscriber @listeners_for.delete(pattern) when NilClass, Regexp @other_subscribers << subscriber @listeners_for.clear else raise ArgumentError, "pattern must be specified as a String, Regexp or empty" end end subscriber end
unsubscribe(subscriber_or_name)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 51 def unsubscribe(subscriber_or_name) synchronize do case subscriber_or_name when String @string_subscribers[subscriber_or_name].clear @listeners_for.delete(subscriber_or_name) @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) } else pattern = subscriber_or_name.try(:pattern) if String === pattern @string_subscribers[pattern].delete(subscriber_or_name) @listeners_for.delete(pattern) else @other_subscribers.delete(subscriber_or_name) @listeners_for.clear end end end end
wait()
click to toggle source
This is a sync queue, so there is no waiting.
# File lib/active_support/notifications/fanout.rb, line 122 def wait end