class Rooibos::Command::Outlet
Messaging gateway for custom commands.
Custom commands run in background threads. They produce results that the main loop consumes.
Managing queues and message formats manually is tedious. It scatters queue logic across your codebase and makes mistakes easy.
This class wraps the queue with a clean API. Call put to send tagged messages. Debug mode validates Ractor-shareability.
Use it to send results from HTTP requests, WebSocket streams, or database polls.
Example (One-Shot)
Commands run in their own thread. Blocking calls work fine:
class FetchUserCommand include Rooibos::Command::Custom def initialize(user_id) @user_id = user_id end def call(out, _token) response = Net::HTTP.get(URI("https://api.example.com/users/#{@user_id}")) user = JSON.parse(response) out.put(:user_fetched, Ractor.make_shareable(user: user)) rescue => e out.put(:user_fetch_failed, error: e.message.freeze) end end
Example (Long-Running)
Commands that loop check the cancellation token:
class PollerCommand include Rooibos::Command::Custom def call(out, token) until token.canceled? data = fetch_batch out.put(:batch, Ractor.make_shareable(data)) sleep 5 end out.put(:poller_stopped) end end
Public Class Methods
Source
# File lib/rooibos/command/outlet.rb, line 85 def initialize(message_queue, lifecycle:) @message_queue = message_queue @live = lifecycle @pending_async = [] #: Array[AsyncHandle] end
Public Instance Methods
Source
# File lib/rooibos/command/outlet.rb, line 125 def put(*args) message = (args.size == 1) ? args.first : args.freeze if RatatuiRuby::Debug.enabled? && !Ractor.shareable?(message) raise Rooibos::Error::Invariant, "Message is not Ractor-shareable: #{message.inspect}\n" \ "Use Ractor.make_shareable or Object#freeze." end @message_queue.push(message) end
Sends a message to the runtime.
Custom commands produce results. Messages about those results feed back into your update function. This method handles the wiring.
Use it for complex data flows or transports Rooibos doesn’t ship with.
For structured data and to avoid NoMethodError, define a custom Message class with envelope and domain-specific fields, and mix in Rooibos::Message::Predicates. This follows the same pattern as built-in Message types and RatatuiRuby events.
Structured Messages
class UserFetched < Data.define(:envelope, :user) include Rooibos::Message::Predicates end out.put(UserFetched.new(envelope: :profile, user: alice)) # Update can pattern match: # in { type: :user_fetched, envelope: :profile, user: } # Update can also use predicates: # message.user if message.user_fetched? and message.profile?
Debug mode validates Ractor-shareability.
Source
# File lib/rooibos/command/outlet.rb, line 167 def source(command, token, timeout: 30.0) @live.run_sync(command, token, timeout:) end
Runs a child command synchronously within a custom command.
Use this to orchestrate multi-step workflows: fetch one result, then use it to compose the next command.
The child runs asynchronously in a future. This method blocks until the child calls put, cancellation occurs, or the timeout expires.
- command
-
A callable (lambda or
Customcommand) with +call(out, token)+. - token
-
The parent’s cancellation token, passed through to the child.
- timeout
-
Max seconds to wait for the child’s result (default: 30.0).
Returns the message from the child, or nil if canceled/timed out. Raises if the child command raised an exception.
Example
def call(out, token) user_result = out.source(fetch_user_cmd, token) return if user_result.nil? out.put(:user_loaded, user: user_result) end
Source
# File lib/rooibos/command/outlet.rb, line 212 def standing(command, token) child_outlet = Outlet.new(@message_queue, lifecycle: @live) future = Concurrent::Promises.future do command.call(child_outlet, token) rescue => e @message_queue.push Message::Error.new(command:, exception: e) end handle = AsyncHandle.new(future:) @pending_async << handle handle end
Spawns an async streaming command.
Multiple data sources often need to stream in parallel. Dashboards, real-time feeds, and multi-provider aggregations all face this pattern. Waiting for one source before starting the next creates latency.
This method spawns a child command that runs asynchronously. Messages from the child stream directly to your update function as they arrive. The child gets a full Outlet, so it can nest source or standing calls.
Use wait to block until the child completes, or fire-and-forget for long-running streams.
- command
-
A callable with
call(out, token). - token
-
The parent’s cancellation token.
Returns a handle for use with wait.
Example
A dashboard that opens two SSE streams for live updates. Each stream emits chunks as they arrive — no waiting for the other.
def call(out, token) # Authenticate first (sync) auth = out.source(Authenticate.new, token) return if auth.nil? # Open two SSE streams in parallel — chunks arrive live # Streams remain outstanding until token is canceled out.standing(StreamNotifications.new(auth), token) out.standing(StreamPrices.new(auth), token) end
Source
# File lib/rooibos/command/outlet.rb, line 253 def wait(*handles, token: nil) handles = @pending_async || [] if handles.empty? return if handles.empty? futures = handles.map(&:future) all_done = Concurrent::Promises.zip_futures(*futures) if token # Race completion against cancellation Concurrent::Promises.any_event(all_done, token.origin).wait else all_done.wait end end
Blocks until async commands complete.
After spawning children with standing, the parent command normally returns immediately. Use wait to block until children finish, then emit a completion signal.
This is how custom commands achieve the same end-of-streams dispatch that Command.batch gets automatically with Message::Batch.
- handles
-
Zero or more handles from
standing. If empty, waits for all.
Example
A custom command that streams from two sources and signals when done.
def call(out, token) h1 = out.standing(StreamPrices.new, token) h2 = out.standing(StreamNews.new, token) out.wait(h1, h2) out.put(:streams_closed) # Your custom completion signal end