snakia.core.rx
Functions
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Classes
|
An asynchronous bindable. |
|
|
|
A bindable value. |
|
|
|
- class snakia.core.rx.AsyncBindable(default_value=None)[source]
Bases:
BaseBindable[T],Generic[T]An asynchronous bindable.
- property default_value: T
- property has_default_value: bool
- property has_value: bool
- on(run_now=False)[source]
Decorator to subscribe to an value.
- Return type:
Callable[[BindableSubscriber[TypeVar(T),Awaitable[Any]]],Optional[Awaitable[None]]]
- set_silent(value)
- Return type:
None
- subscribe(subscriber, /, run_now=False)[source]
Subscribe to an value.
- Return type:
Optional[Awaitable[None]]
- property subscribers: tuple[BindableSubscriber[T, Awaitable[Any]], ...]
Get the subscribers.
- property value: T
- class snakia.core.rx.BaseBindable(default_value=None)[source]
Bases:
Generic[T]- property default_value: T
- property has_default_value: bool
- property has_value: bool
- property value: T
- class snakia.core.rx.Bindable(default_value=None)[source]
Bases:
BaseBindable[T],Generic[T]A bindable value.
- property default_value: T
- property has_default_value: bool
- property has_value: bool
- on(run_now=False)[source]
Decorator to subscribe to an value.
- Return type:
Callable[[BindableSubscriber[TypeVar(T),Any]],None]
- set_silent(value)
- Return type:
None
- property subscribers: tuple[BindableSubscriber[T, Any], ...]
Get the subscribers.
- property value: T
- class snakia.core.rx.BindableSubscriber(*args, **kwargs)[source]
Bases:
Protocol,Generic[T,R_co]- __init__(*args, **kwargs)
- class snakia.core.rx.ValueChanged(old_value, new_value)[source]
Bases:
NamedTuple,Generic[T]- count(value, /)
Return number of occurrences of value.
- index(value, start=0, stop=9223372036854775807, /)
Return first index of value.
Raises ValueError if the value is not present.
-
new_value:
TypeVar(T) Alias for field number 1
-
old_value:
TypeVar(T) Alias for field number 0
- snakia.core.rx.async_chain(func1, /, *funcs)[source]
- Return type:
Callable[[ParamSpec(P)],Awaitable[Any]]
- snakia.core.rx.async_combine(*sources, combiner, default_value=<class 'snakia.types.unique.Unset'>)[source]
- Return type:
AsyncBindable[TypeVar(R)]
- async snakia.core.rx.async_merge(*sources)[source]
- Return type:
AsyncBindable[TypeVar(T)]
- snakia.core.rx.combine(*sources, combiner, default_value=<class 'snakia.types.unique.Unset'>)[source]
- Return type:
Bindable[TypeVar(R)]
- snakia.core.rx.cond(condition, if_true, if_false)[source]
- Return type:
Callable[[ParamSpec(P)],Union[TypeVar(T),TypeVar(F)]]
- snakia.core.rx.filter(f)[source]
- Return type:
Callable[[Iterable[TypeVar(S)]],Iterable[TypeVar(T)]]
- snakia.core.rx.map(func, /)[source]
- Return type:
Callable[[Iterable[TypeVar(T)]],Iterable[TypeVar(U)]]
Modules