1. Introduction
Actors and agents provide thread-safe concurrent state management through message passing. Instead of protecting shared state with locks, you send messages to an actor or agent which processes them one at a time on a dedicated thread. This eliminates data races by design.
Groovy provides two levels of abstraction:
-
Agent — a thread-safe mutable value updated via functions
-
Actor — a message-processing entity with flexible dispatch
Both use virtual threads on JDK 21+ for efficient scaling.
2. Agent
An Agent wraps a value that can be read by any thread but modified
only through serialized update functions:
import groovy.concurrent.Agent
def counter = Agent.create(0)
counter.send { it + 1 }
counter.send { it + 1 }
counter.send { it + 1 }
assert await(counter.getAsync()) == 3
2.1. Reading
// Snapshot read — non-blocking, returns current value
def current = counter.get()
// Consistent read — waits for pending updates to complete
def consistent = await(counter.getAsync())
2.2. Updating
// Fire-and-forget update
counter.send { it + 1 }
// Update and get new value
def newValue = await(counter.sendAndGet { it * 2 })
2.3. Complex state
Agents work with any value type:
def inventory = Agent.create([:])
inventory.send { state -> state + [apples: 10] }
inventory.send { state -> state + [bananas: 5] }
inventory.send { state ->
state.collectEntries { k, v -> [k, v * 2] }
}
assert await(inventory.getAsync()) == [apples: 20, bananas: 10]
2.4. Observing state changes
An agent exposes a Flow.Publisher<T> of state transitions via
changes(). Each successful update emits the new value to every
subscriber that is already subscribed at the time of the update. The
stream is hot (no replay of prior state), per-subscriber buffered,
and closes with onComplete when shutdown() is called:
def agent = Agent.create(0)
try {
async {
3.times { agent.send { it + 1 } }
Thread.sleep(50)
agent.shutdown()
}
def seen = []
for await (v in agent.changes()) {
seen << v
}
assert seen == [1, 2, 3]
} finally {
agent.shutdown()
}
Slow subscribers drop newly-offered values rather than backpressuring
the agent’s update loop — buffered values are still delivered in order,
but the most recent update may be skipped if a subscriber’s buffer is
full (default size 256). If changes() is first called after
shutdown(), the returned publisher is already closed and subscribers
receive onComplete immediately.
3. Actor
An Actor processes messages from a queue on a dedicated thread.
Two factory methods cover the common patterns:
3.1. Reactor (stateless)
A reactor applies a function to each message. The return value
becomes the reply for sendAndGet callers:
import groovy.concurrent.Actor
def doubler = Actor.reactor { it * 2 }
assert await(doubler.sendAndGet(5)) == 10
assert await(doubler.sendAndGet(21)) == 42
doubler.stop()
Reactors are ideal for pure-function message processing — validators, transformers, calculators:
def validator = Actor.reactor { msg ->
if (msg instanceof String && msg.length() > 0) 'valid'
else 'invalid'
}
3.2. Stateful
A stateful actor maintains state across messages. The handler
receives (state, message) and returns the new state:
def counter = Actor.stateful(0) { state, msg ->
switch (msg) {
case 'increment': return state + 1
case 'decrement': return state - 1
case 'reset': return 0
default: return state
}
}
counter.send('increment')
counter.send('increment')
counter.send('decrement')
assert await(counter.sendAndGet('increment')) == 2
counter.stop()
For sendAndGet, the new state is the reply. This makes it easy
to query the current state:
// Send a no-op message to read the state
def currentState = await(counter.sendAndGet('query'))
3.3. Typed message dispatch
Use pattern matching for rich message protocols:
def account = Actor.stateful(0.0) { balance, msg ->
switch (msg) {
case { it instanceof Map && it.deposit }:
return balance + msg.deposit
case { it instanceof Map && it.withdraw }:
if (msg.withdraw > balance)
throw new RuntimeException('Insufficient funds')
return balance - msg.withdraw
default:
return balance
}
}
account.send([deposit: 100])
account.send([withdraw: 30])
def balance = await(account.sendAndGet([deposit: 0]))
assert balance == 70.0
account.stop()
4. Construction options
Actors take an optional ActorOptions configuring the mailbox, the
executor, and a handful of opt-in behaviours. The builder is
value-based; each with* method returns a new configuration.
import groovy.concurrent.Actor
import groovy.concurrent.ActorOptions
def options = ActorOptions.DEFAULTS
.withBoundedMailbox(1000, ActorOptions.Overflow.BLOCK)
def actor = Actor.reactor(handler, options)
4.1. Bounded mailbox
By default the mailbox is unbounded. For backpressure — or to cap memory when producers can outrun the actor — configure a capacity and an overflow policy:
import groovy.concurrent.ActorOptions.Overflow
// BLOCK — sender blocks until capacity is free
ActorOptions.DEFAULTS.withBoundedMailbox(100, Overflow.BLOCK)
// FAIL — send throws IllegalStateException when the mailbox is full
ActorOptions.DEFAULTS.withBoundedMailbox(100, Overflow.FAIL)
// DROP_NEWEST — the incoming message is silently dropped; for
// sendAndGet, the returned Awaitable binds to IllegalStateException
ActorOptions.DEFAULTS.withBoundedMailbox(100, Overflow.DROP_NEWEST)
A handler that calls ctx.self().send(…) on a full BLOCK mailbox
would deadlock (the handler is the actor’s only consumer). The actor
detects this and fails fast with IllegalStateException rather than
parking the worker thread.
4.2. Per-actor executor
Actors default to a shared async executor (virtual threads on JDK 21+). For workload isolation, hand the actor its own executor:
import java.util.concurrent.Executors
def pool = Executors.newSingleThreadExecutor()
def actor = Actor.reactor(handler,
ActorOptions.DEFAULTS.withExecutor(pool))
Other options — withStashBound and withCurrentSelf — are covered
in the FSM section where they’re directly used.
5. Lifecycle
Both actors and agents support lifecycle management. An actor has a three-state lifecycle:
isActive() |
isTerminated() |
Meaning |
|---|---|---|
|
|
Accepting new sends and processing them. |
|
|
Draining — |
|
|
Terminated — the worker thread has exited; all
queued messages are processed and any stashed |
def actor = Actor.reactor { it }
assert actor.isActive()
assert !actor.isTerminated()
actor.stop() // flips isActive() to false immediately
assert !actor.isActive() // new sends fail from here (a send racing with stop() may still land)
// The worker may still be draining queued messages. Poll for terminated
// when you actually need to be sure the actor has finished shutting down.
while (!actor.isTerminated()) Thread.sleep(10)
assert actor.isTerminated()
Actors implement AutoCloseable, so they work with
try-with-resources (Groovy or Java):
Actor.reactor { it * 2 }.withCloseable { actor ->
assert await(actor.sendAndGet(5)) == 10
}
// actor is stopped
6. Error Handling
Exceptions thrown by a handler are delivered to sendAndGet callers
through the awaited reply:
def risky = Actor.reactor { throw new RuntimeException('oops') }
try {
await(risky.sendAndGet('anything'))
} catch (RuntimeException e) {
assert e.message == 'oops'
}
risky.stop()
Fire-and-forget send calls have no reply to carry the exception.
To observe handler failures from those messages — for logging,
metrics, or shutting the actor down — register an onError callback:
def actor = Actor.reactor { msg -> process(msg) }
.onError { Throwable t, msg ->
log.warn("handler failed on {}: {}", msg, t.message)
}
The context-aware overload receives an ActorContext first, useful
when the callback needs to react beyond logging — for example,
stopping the actor:
actor.onError { ctx, t, msg -> ctx.self().stop() }
Exceptions thrown from inside the onError callback itself are
caught and discarded so they cannot destabilise the dispatch loop.
7. FSM-style actors
A stateful actor models a single behaviour. For multi-phase protocols
— where the actor responds differently depending on which "state" it
is in — three additional capabilities on ActorContext cover most of
what you’d reach for from a dedicated FSM library:
-
ctx.become(newHandler)— swap the active handler at runtime; state is preserved across the swap. -
ctx.stash()/ctx.unstashAll()— defer the current message for later replay (typically because it arrived in the wrong phase). -
ctx.scheduleOnce(msg, delay)/ctx.scheduleAtFixedRate(msg, initialDelay, interval)— self-send a message after a delay, with a cancellable handle.
These capabilities are only available via the context-aware factory
overloads, which give the handler an ActorContext parameter.
7.1. Swapping behaviour with become
import groovy.concurrent.Actor
import groovy.concurrent.ActorContext
import groovy.concurrent.StatefulHandler
// Phase A increments by 1; phase B increments by 10.
StatefulHandler phaseB = { ctx, s, m -> s + 10 } as StatefulHandler
def actor = Actor.stateful(0, { ActorContext ctx, int s, m ->
if (m == 'swap') { ctx.become(phaseB); return s }
s + 1
} as StatefulHandler)
assert await(actor.sendAndGet('inc')) == 1
assert await(actor.sendAndGet('swap')) == 1 // state preserved
assert await(actor.sendAndGet('inc')) == 11 // now phase B
The swap takes effect on the next message: the current handler invocation completes normally (including binding any reply). State is preserved verbatim; the new handler receives the same state value the current one would have returned.
7.2. Deferring messages with stash
When a message arrives while the actor is in the wrong phase, defer
it with ctx.stash(). When the phase transitions to one that can
handle it, call ctx.unstashAll() to replay all deferred messages in
FIFO order at the head of the mailbox:
StatefulHandler connected = { ctx, s, m -> "got($m)" } as StatefulHandler
def actor = Actor.stateful('init', { ActorContext ctx, s, m ->
if (m == 'ready') {
ctx.become(connected)
ctx.unstashAll() // replay anything stashed while not ready
return s
}
ctx.stash() // any other message is deferred for now
s
} as StatefulHandler)
def r1 = actor.sendAndGet('A') // stashed
def r2 = actor.sendAndGet('B') // stashed
actor.send('ready') // → connected, then replays A and B
assert await(r1) == 'got(A)'
assert await(r2) == 'got(B)'
The stash is unbounded by default.
An actor that stashes messages from a source whose volume you
do not control (network input, external clients, untrusted callers)
can grow the stash without limit and exhaust the JVM heap if the phase
transition that would call unstashAll() never arrives. For any such
actor, set a bound via withStashBound at construction time.
|
def options = ActorOptions.DEFAULTS
.withStashBound(64, ActorOptions.StashOverflow.REJECT)
Policies are FAIL (throw from stash()), DROP_OLDEST (evict the
oldest stashed message, binding its sendAndGet reply to error), and
REJECT (bind the current message’s reply to error without stashing
it).
7.3. Scheduled self-sends
For state timeouts, heartbeats, retries, and periodic work, the context exposes one-shot and recurring schedules over the shared async scheduler:
import java.time.Duration
def actor = Actor.stateful(0, { ActorContext ctx, s, m ->
if (m == 'start') {
ctx.scheduleAtFixedRate('tick',
Duration.ofSeconds(1), Duration.ofSeconds(5))
return s
}
if (m == 'tick') return s + 1
s
} as StatefulHandler)
scheduleOnce returns a Cancellable so a phase transition can call
off a pending timer:
def timer = ctx.scheduleOnce('auth-timeout', Duration.ofSeconds(30))
// ... later, when AuthResult arrives:
timer.cancel()
Actor.stop() cancels every outstanding timer the actor scheduled —
no zombies after shutdown.
7.4. End-to-end: a connection handshake
A common FSM shape — disconnected → authenticating → connected —
combining all three capabilities:
import groovy.concurrent.Actor
import groovy.concurrent.ActorContext
import groovy.concurrent.StatefulHandler
StatefulHandler disconnected, authenticating, connected
disconnected = { ctx, s, m ->
if (m == 'connect') { ctx.become(authenticating); return s }
s
} as StatefulHandler
authenticating = { ctx, s, m ->
if (m == 'auth-ok') {
ctx.become(connected)
ctx.unstashAll()
return s
}
ctx.stash() // commands arriving during auth are deferred
s
} as StatefulHandler
connected = { ctx, s, m ->
// Real connected handlers use s — here we bump an "ops processed"
// counter to demonstrate that state survives every transition.
[ops: s.ops + 1]
} as StatefulHandler
def actor = Actor.stateful([ops: 0], disconnected)
actor.send('connect')
def r1 = actor.sendAndGet([cmd: 'read']) // stashed in authenticating
actor.send('auth-ok') // → connected; r1 now processes
assert await(r1) == [ops: 1] // state preserved across phases
actor.stop()
8. Choosing Between Agent and Actor
| Aspect | Agent | Actor |
|---|---|---|
State |
Single value, updated via function |
Arbitrary, managed by handler |
Messages |
Update functions only |
Any message type with dispatch |
Reply |
|
|
Multi-phase behaviour |
— |
|
Use case |
Thread-safe counters, caches, accumulators |
State machines, services, typed protocols |
Both guarantee sequential message processing — no locks needed.
9. @ActiveObject / @ActiveMethod
For a more OOP approach, annotate a class with @ActiveObject and
its methods with @ActiveMethod. The compiler automatically routes
annotated method calls through an internal actor — callers just see
a normal class:
import groovy.transform.ActiveObject
import groovy.transform.ActiveMethod
@ActiveObject
class Account {
private double balance = 0
@ActiveMethod
void deposit(double amount) { balance += amount }
@ActiveMethod
void withdraw(double amount) {
if (amount > balance) throw new RuntimeException('Insufficient funds')
balance -= amount
}
@ActiveMethod
double getBalance() { balance }
}
def account = new Account()
account.deposit(100) // async, serialized, blocks until done
account.deposit(50)
account.withdraw(30)
assert account.getBalance() == 120.0
Methods without @ActiveMethod run on the caller’s thread as normal.
9.1. Blocking vs non-blocking
By default, @ActiveMethod calls block until the actor processes
them. For non-blocking calls, set blocking = false:
@ActiveObject
class Service {
@ActiveMethod(blocking = false)
def compute(int x) { x * x }
}
def svc = new Service()
def result = svc.compute(7) // returns Awaitable immediately
assert await(result) == 49
9.2. Thread safety by annotation
The key benefit: you write normal-looking classes with normal methods.
Thread safety is guaranteed by the annotation — all @ActiveMethod
calls are serialized through the actor. No locks, no concurrent
collections, no race conditions.
This also makes the code highly readable for AI tools — the
@ActiveObject annotation explicitly declares the concurrency model,
and each @ActiveMethod contains pure business logic without
messaging boilerplate.