snakia.core.rx

Functions

async_chain(func1, /, *funcs)

async_combine(*sources, combiner[, ...])

async_merge(*sources)

chain(func1, /, *funcs)

combine(*sources, combiner[, default_value])

concat(*funcs)

cond(condition, if_true, if_false)

const(value)

filter(f)

map(func, /)

merge(*sources)

Classes

AsyncBindable([default_value])

An asynchronous bindable.

BaseBindable([default_value])

Bindable([default_value])

A bindable value.

BindableSubscriber(*args, **kwargs)

ValueChanged(old_value, new_value)

class snakia.core.rx.AsyncBindable(default_value=None)[source]

Bases: BaseBindable[T], Generic[T]

An asynchronous bindable.

__init__(default_value=None)[source]
property default_value: T
property has_default_value: bool
property has_value: bool
on(run_now=False)[source]

Decorator to subscribe to an value.

Return type:

Callable[[BindableSubscriber[TypeVar(T), Awaitable[Any]]], Optional[Awaitable[None]]]

async set(value)[source]

Set the value.

Return type:

None

set_silent(value)
Return type:

None

subscribe(subscriber, /, run_now=False)[source]

Subscribe to an value.

Return type:

Optional[Awaitable[None]]

property subscribers: tuple[BindableSubscriber[T, Awaitable[Any]], ...]

Get the subscribers.

unsubscribe(subscriber)[source]

Unsubscribe from an value.

Return type:

None

property value: T
class snakia.core.rx.BaseBindable(default_value=None)[source]

Bases: Generic[T]

__init__(default_value=None)[source]
property default_value: T
property has_default_value: bool
property has_value: bool
set_silent(value)[source]
Return type:

None

property value: T
class snakia.core.rx.Bindable(default_value=None)[source]

Bases: BaseBindable[T], Generic[T]

A bindable value.

__init__(default_value=None)[source]
property default_value: T
property has_default_value: bool
property has_value: bool
on(run_now=False)[source]

Decorator to subscribe to an value.

Return type:

Callable[[BindableSubscriber[TypeVar(T), Any]], None]

set(value)[source]

Set the value.

Return type:

None

set_silent(value)
Return type:

None

subscribe(subscriber, /, run_now=False)[source]

Subscribe to an value.

Return type:

None

property subscribers: tuple[BindableSubscriber[T, Any], ...]

Get the subscribers.

unsubscribe(subscriber)[source]

Unsubscribe from an value.

Return type:

None

property value: T
class snakia.core.rx.BindableSubscriber(*args, **kwargs)[source]

Bases: Protocol, Generic[T, R_co]

__init__(*args, **kwargs)
class snakia.core.rx.ValueChanged(old_value, new_value)[source]

Bases: NamedTuple, Generic[T]

count(value, /)

Return number of occurrences of value.

index(value, start=0, stop=9223372036854775807, /)

Return first index of value.

Raises ValueError if the value is not present.

new_value: TypeVar(T)

Alias for field number 1

old_value: TypeVar(T)

Alias for field number 0

snakia.core.rx.async_chain(func1, /, *funcs)[source]
Return type:

Callable[[ParamSpec(P)], Awaitable[Any]]

snakia.core.rx.async_combine(*sources, combiner, default_value=<class 'snakia.types.unique.Unset'>)[source]
Return type:

AsyncBindable[TypeVar(R)]

async snakia.core.rx.async_merge(*sources)[source]
Return type:

AsyncBindable[TypeVar(T)]

snakia.core.rx.chain(func1, /, *funcs)[source]
Return type:

Callable[[ParamSpec(P)], Any]

snakia.core.rx.combine(*sources, combiner, default_value=<class 'snakia.types.unique.Unset'>)[source]
Return type:

Bindable[TypeVar(R)]

snakia.core.rx.concat(*funcs)[source]
Return type:

Callable[[ParamSpec(P)], None]

snakia.core.rx.cond(condition, if_true, if_false)[source]
Return type:

Callable[[ParamSpec(P)], Union[TypeVar(T), TypeVar(F)]]

snakia.core.rx.const(value)[source]
Return type:

Callable[[], TypeVar(T)]

snakia.core.rx.filter(f)[source]
Return type:

Callable[[Iterable[TypeVar(S)]], Iterable[TypeVar(T)]]

snakia.core.rx.map(func, /)[source]
Return type:

Callable[[Iterable[TypeVar(T)]], Iterable[TypeVar(U)]]

snakia.core.rx.merge(*sources)[source]
Return type:

Bindable[TypeVar(T)]

Modules

async_bindable

base_bindable

bindable

chains

combines

concats

conds

consts

filters

maps

merges