snakia.core.es.dispatcher

Functions

nolock()

Classes

Dispatcher()

Event dispatcher

Event(**data)

Filter(*args, **kwargs)

Filter for an event.

Handler(*args, **kwargs)

Handler for an event.

Subscriber(handler, filters, priority)

Subscriber for an event.

TypeVar(name, *constraints[, bound, ...])

Type variable.

defaultdict

defaultdict(default_factory=None, /, [...]) --> dict with default factory

class snakia.core.es.dispatcher.Dispatcher[source]

Bases: object

Event dispatcher

__init__()[source]
property is_running: bool

Returns True if the dispatcher is running.

on(event, filter=None, priority=-1)[source]

Decorator to subscribe to an event.

Return type:

Callable[[Handler[TypeVar(T, bound= Event)]], Handler[TypeVar(T, bound= Event)]]

publish(event)[source]

Add an event to the queue.

Return type:

None

start()[source]

Start the update loop.

Return type:

None

stop()[source]

Stop the update loop.

Return type:

None

subscribe(event_type, subscriber)[source]

Subscribe to an event type.

Return type:

None

unsubscribe(event_type, subscriber)[source]

Unsubscribe from an event type.

Return type:

None

update(block=False)[source]

Update the dispatcher.

Return type:

None