Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions async-cable.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ Gem::Specification.new do |spec|
spec.required_ruby_version = ">= 3.3"

# Requires the `ActionCable::Server::Socket` abstraction introduced by
# https://github.com/rails/rails/pull/50979 (Rails 8.1+).
spec.add_dependency "actioncable", ">= 8.1.0.alpha"
# https://github.com/rails/rails/pull/50979 (currently Rails main).
spec.add_dependency "actioncable", ">= 8.2.0.alpha"
spec.add_dependency "async", "~> 2.9"
spec.add_dependency "async-websocket"
end
2 changes: 1 addition & 1 deletion guides/getting-started/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ $ bundle add async-cable
To use `async-cable`, you need to add the following to your `config/application.rb`:

~~~ ruby
require 'async/cable'
require "async/cable"
~~~

This will automatically add the {ruby Async::Cable::Middleware} to your middleware stack which will handle incoming WebSocket connections and integrates with Action Cable.
176 changes: 176 additions & 0 deletions lib/async/cable/executor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2026, by Samuel Williams.

require "async"

module Async
module Cable
# Fiber-based replacement for `ActionCable::Server::ThreadedExecutor`.
#
# Action Cable uses an `#executor` to dispatch internal async work
# (pub/sub callback invocations, heartbeat timers, periodic channel
# timers) and broadcasts a small interface: `#post`, `#timer`,
# `#shutdown`. Stock Rails backs this with a
# {Concurrent::ThreadPoolExecutor}; under a fiber-scheduler-aware
# server like Falcon every `#post` then bounces through an OS thread
# unnecessarily.
#
# This executor instead spawns Async tasks. Tasks posted from inside
# a reactor run on the caller's reactor (no thread hop). Tasks
# posted or scheduled from outside a reactor run on a dedicated
# reactor thread owned by the executor.
class Executor
# Create a new executor. The dedicated reactor thread is started
# lazily on first use that needs it (timers, or `#post` from
# outside a reactor).
def initialize
@mutex = ::Thread::Mutex.new
@inbox = nil
@thread = nil
end

# Run the given callable asynchronously. When called from inside
# a reactor this spawns a fire-and-forget child task on the
# current reactor; when called from outside a reactor this routes
# the task to the executor's dedicated reactor thread. The return
# value is the executor (matching
# `ActionCable::Server::ThreadedExecutor#post`).
# @parameter task [#call, nil] Callable to run; if nil, the block is used.
def post(task = nil, &block)
block ||= task

if current = ::Async::Task.current?
current.async{block.call}
else
inbox.push(proc{block.call})
end

return self
end

# Schedule a recurring timer. When called from inside a reactor
# this spawns a child task on the current reactor; when called
# from outside a reactor this routes the timer to the executor's
# dedicated reactor thread.
# @parameter interval [Numeric] Seconds between invocations.
# @returns [Timer] A handle that responds to `#shutdown`.
def timer(interval, &block)
timer = Timer.new

if current = ::Async::Task.current?
timer.task = current.async do |inner|
run_timer(inner, interval, block)
end

return timer
end

inbox = timer.inbox = self.inbox
begin
operation = proc do |task|
timer.task = task.async do |inner|
run_timer(inner, interval, block)
end
end

inbox.push(operation)
rescue ::ClosedQueueError
# Executor is shutting down; match the best-effort
# behaviour of posting work during shutdown.
end

return timer
end

# Stop the dedicated reactor thread (if any). Tasks posted to
# the caller's reactor via `#post` are unaffected; their
# lifetime is owned by the calling reactor.
def shutdown
@mutex.synchronize do
return unless @thread
@inbox.close
@thread.join
@thread = nil
@inbox = nil
end
end

# Handle returned from `#timer`. Wraps the underlying
# `Async::Task` and exposes a thread-safe `#shutdown` matching
# the `Concurrent::TimerTask` interface that callers expect.
# Timers running on the dedicated reactor are cancelled through
# the executor's inbox; timers running on the caller's reactor
# are cancelled directly.
class Timer
attr_writer :inbox

# Initialize an empty timer handle.
def initialize
@inbox = nil
@mutex = ::Thread::Mutex.new
@task = nil
end

# Set the underlying task. Called by the executor thread
# once the timer has been scheduled.
def task=(task)
@mutex.synchronize{@task = task}
end

# Cancel the timer. Idempotent; safe to call from any thread
# or fiber.
def shutdown
task = nil

@mutex.synchronize do
task = @task
@task = nil
end
return unless task

if inbox = @inbox
begin
inbox.push(proc{task.stop})
rescue ::ClosedQueueError
# Executor already shut down; the timer task was
# stopped along with its parent reactor.
end
else
task.stop
end
end
end

private

def inbox
@inbox || @mutex.synchronize{@inbox ||= start_thread}
end

def run_timer(task, interval, block)
loop do
task.sleep(interval)
block.call
end
end

def start_thread
inbox = ::Thread::Queue.new

@thread = ::Thread.new do
::Thread.current.name = "async-cable executor"

Sync do |task|
while operation = inbox.pop
operation.call(task)
end
end
end

return inbox
end
end
end
end
8 changes: 7 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

This is a proof-of-concept adapter for Action Cable.

The `next` branch tracks Rails `main` and relies on the `ActionCable::Server::Socket` abstraction introduced by [rails/rails#50979](https://github.com/rails/rails/pull/50979) (Rails 8.1+). For stable Rails (≤ 8.0), use the `main` branch, which depends on [`actioncable-next`](https://github.com/anycable/actioncable-next).
The `next` branch tracks Rails `main` and relies on the `ActionCable::Server::Socket` abstraction introduced by [rails/rails#50979](https://github.com/rails/rails/pull/50979). For stable Rails (≤ 8.1), use the `main` branch, which depends on [`actioncable-next`](https://github.com/anycable/actioncable-next).

## Rails Compatibility

This branch requires unreleased Action Cable changes from Rails `main`, currently versioned as `8.2.0.alpha`. Released Rails 8.1.x does not include `ActionCable::Server::Socket`; in Rails 8.1, `ActionCable::Connection::Base` still accepts `(server, env, coder: ...)` rather than `(server, socket)`.

The gemspec therefore pins `actioncable >= 8.2.0.alpha` to prevent accidentally resolving against Rails 8.1.x. Once Rails ships a stable release containing rails/rails#50979, this constraint should be changed to that released version.

[![Development Status](https://github.com/socketry/async-cable/workflows/Test/badge.svg)](https://github.com/socketry/async-cable/actions?workflow=Test)

Expand Down
1 change: 1 addition & 0 deletions releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

- Add {ruby Async::Cable::Socket#raw_transmit} for pushing pre-encoded payloads to the client without re-encoding. Enables "fastlane" broadcasts that encode the message once and share it across many connections.
- Add {ruby Async::Cable::Executor}, a fiber-based replacement for `ActionCable::Server::ThreadedExecutor`. Tasks posted from inside a reactor run on the caller's reactor (no thread hop); tasks posted from outside, and all recurring timers, run on a dedicated reactor thread owned by the executor.

## v0.3.0

Expand Down
130 changes: 130 additions & 0 deletions test/async/cable/executor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2026, by Samuel Williams.

require "async/cable/executor"
require "sus/fixtures/async"

describe Async::Cable::Executor do
let(:executor) {subject.new}

with "#post" do
it "runs the block when called from outside a reactor" do
completed = ::Thread::Queue.new
executor.post{completed.push(true)}
expect(completed.pop).to be == true
ensure
executor.shutdown
end

it "returns immediately when called from outside a reactor" do
completed = ::Thread::Queue.new

executor.post do
sleep 0.05
completed.push(true)
end

expect(completed).to be(:empty?)
expect(completed.pop).to be == true
ensure
executor.shutdown
end

it "accepts a positional callable as well as a block" do
completed = ::Thread::Queue.new
executor.post(->{completed.push(:ok)})
expect(completed.pop).to be == :ok
ensure
executor.shutdown
end

with "called from inside a reactor" do
include Sus::Fixtures::Async::ReactorContext

it "runs the block on the caller's reactor with no thread hop" do
caller_thread = ::Thread.current
completed = ::Thread::Queue.new

executor.post do
completed.push(::Thread.current)
end

expect(completed.pop).to be == caller_thread
ensure
executor.shutdown
end
end
end

with "#timer" do
it "runs the block at the configured interval" do
ticks = ::Thread::Queue.new
timer = executor.timer(0.01){ticks.push(true)}

3.times{ticks.pop}

timer.shutdown
ensure
executor.shutdown
end

it "stops invoking the block after #shutdown" do
ticks = ::Thread::Queue.new
timer = executor.timer(0.01){ticks.push(true)}

# Drain a couple of ticks so we know it started:
2.times{ticks.pop}

timer.shutdown

# After shutdown, allow any in-flight tick to land then sample:
sleep 0.05
ticks.clear
sleep 0.05
expect(ticks).to be(:empty?)
ensure
executor.shutdown
end

with "called from inside a reactor" do
include Sus::Fixtures::Async::ReactorContext

it "runs on the caller's reactor with no thread hop" do
caller_thread = ::Thread.current
ticks = ::Thread::Queue.new
timer = executor.timer(0.01){ticks.push(::Thread.current)}

expect(ticks.pop).to be == caller_thread
expect(executor.instance_variable_get(:@thread)).to be_nil

timer.shutdown
ensure
executor.shutdown
end
end
end

with "#shutdown" do
it "is idempotent" do
executor.shutdown
executor.shutdown
end

it "stops the dedicated thread used by timers" do
# A scheduled timer is what spins up the dedicated reactor thread:
ticks = ::Thread::Queue.new
timer = executor.timer(0.01){ticks.push(true)}
ticks.pop

thread = executor.instance_variable_get(:@thread)
expect(thread).to be(:alive?)

timer.shutdown
executor.shutdown

expect(thread).not.to be(:alive?)
end
end
end
Loading