Observable Operators¶
Special operators for composing and transforming observables.
FynX Operators - Observable Operator Implementations and Mixins¶
This module provides the core operator implementations and mixins that enable FynX's fluent reactive programming syntax. These operators allow observables to be composed using intuitive Python operators, creating complex reactive behaviors from simple building blocks.
Why Operators?¶
FynX uses Python's operator overloading to provide a natural, readable syntax for reactive programming. Instead of verbose method calls, you can express reactive relationships using familiar operators:
observable >> function
- Transform values reactivelyobservable & condition
- Filter values conditionallyobs1 + obs2 + obs3
- Combine observables
This approach makes reactive code more declarative and easier to understand.
Operator Overview¶
Transform (>>
): Apply functions to create derived values
Filter (&
): Only emit values when conditions are met
Combine (+
): Merge multiple observables into tuples
These operators work together to create complex reactive pipelines:
Operator Mixins¶
This module also provides mixin classes that consolidate operator overloading logic:
OperatorMixin: Provides common reactive operators (add, rshift, and, invert) for all observable types that support reactive composition.
TupleMixin: Adds tuple-like behavior (iter, len, getitem, setitem) for observables that represent collections of values.
ValueMixin: Provides transparent value wrapper behavior for ObservableValue instances, making them behave like regular Python values while supporting reactive operators.
Implementation Details¶
The operators are implemented as standalone functions rather than methods to avoid circular import issues and enable lazy loading. They are called automatically when you use the corresponding operators on Observable instances.
The functions handle different observable types (regular, merged, conditional) appropriately, ensuring consistent behavior across the reactive system.
Performance Characteristics¶
- Lazy Evaluation: Operators create computed/conditional observables that only evaluate when needed
- Efficient Composition: Multiple operators can be chained without creating intermediate objects
- Memory Conscious: Operators reuse existing infrastructure rather than creating new classes
Common Patterns¶
Data Processing Pipeline:
raw_data = observable([])
processed = (raw_data
>> (lambda d: [x for x in d if x > 0]) # Filter positive values
>> (lambda d: sorted(d)) # Sort results
>> (lambda d: sum(d) / len(d) if d else 0)) # Calculate average
Conditional UI Updates:
user_input = observable("")
is_valid = user_input >> (lambda s: len(s) >= 3)
show_error = user_input & ~is_valid # Show error when input is invalid but not empty
Reactive Calculations:
price = observable(10.0)
quantity = observable(1)
tax_rate = observable(0.08)
subtotal = (price + quantity) >> (lambda p, q: p * q)
tax = subtotal >> (lambda s: s * tax_rate.value)
total = (subtotal + tax) >> (lambda s, t: s + t)
Error Handling¶
Operators handle errors gracefully: - Transformation function errors are propagated but don't break the reactive system - Invalid operator usage provides clear error messages - Circular dependencies are detected and prevented
Best Practices¶
- Keep Functions Pure: Transformation functions should not have side effects
- Use Meaningful Lambdas: Complex operations deserve named functions
- Chain Thoughtfully: Break complex chains into intermediate variables for clarity
- Handle Edge Cases: Consider what happens with None, empty collections, etc.
Migration from Method Calls¶
If you're familiar with other reactive libraries, here's how FynX operators compare:
# Other libraries (method-based)
result = obs.map(lambda x: x * 2).filter(lambda x: x > 10)
# FynX (operator-based)
result = obs >> (lambda x: x * 2) & (obs >> (lambda x: x > 10))
The operator syntax is more concise and readable for simple transformations.
See Also¶
fynx.observable
: Core observable classes that use these operators and mixinsfynx.computed
: Computed observables created by the>>
operatorfynx.watch
: Conditional reactive functions (alternative to&
)
OperatorMixin ¶
Mixin class providing common reactive operators for observable classes.
This mixin consolidates the operator overloading logic that was previously duplicated across multiple observable classes. It provides the core reactive operators (add, rshift, and, invert) that enable FynX's fluent reactive programming syntax.
Classes inheriting from this mixin get automatic support for:
- Merging with +
operator
- Transformation with >>
operator
- Conditional filtering with &
operator
- Boolean negation with ~
operator
This mixin should be used by classes that represent reactive values and need to support reactive composition operations.
__add__ ¶
Combine this observable with another using the + operator.
This creates a merged observable that contains a tuple of both values and updates automatically when either observable changes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other
|
Another Observable to combine with |
required |
Returns:
Type | Description |
---|---|
Mergeable
|
A MergedObservable containing both values as a tuple |
__and__ ¶
Create a conditional observable using the & operator for filtered reactivity.
This creates a ConditionalObservable that only emits values when all specified conditions are True, enabling precise control over reactive updates.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
condition
|
A boolean Observable, callable, or compound condition |
required |
Returns:
Type | Description |
---|---|
Conditional
|
A ConditionalObservable that filters values based on the condition |
__invert__ ¶
Create a negated boolean observable using the ~ operator.
This creates a computed observable that returns the logical negation of the current boolean value, useful for creating inverse conditions.
Returns:
Type | Description |
---|---|
Observable[bool]
|
A computed Observable[bool] with negated boolean value |
__or__ ¶
Create a logical OR condition using the | operator.
This creates a conditional observable that only emits when the OR result is truthy. If the initial OR result is falsy, raises ConditionalNeverMet.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other
|
Another boolean observable to OR with |
required |
Returns:
Type | Description |
---|---|
Observable
|
A conditional observable that only emits when OR is truthy |
Raises:
Type | Description |
---|---|
ConditionalNeverMet
|
If initial OR result is falsy |
__radd__ ¶
Support right-side addition for merging observables.
This enables expressions like other + self
to work correctly,
ensuring that merged observables can be chained properly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other
|
Another Observable to combine with |
required |
Returns:
Type | Description |
---|---|
Mergeable
|
A MergedObservable containing both values as a tuple |
__rshift__ ¶
Apply a transformation function using the >> operator to create computed observables.
This implements the functorial map operation over observables, allowing you to transform observable values through pure functions while preserving reactivity.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func
|
Callable
|
A pure function to apply to the observable's value(s) |
required |
Returns:
Type | Description |
---|---|
Observable
|
A new computed Observable containing the transformed values |
TupleMixin ¶
Mixin class providing tuple-like operators for merged observables.
This mixin adds tuple-like behavior to observables that represent collections of values (like MergedObservable). It provides operators for iteration, indexing, and length operations that make merged observables behave like tuples of their component values.
Classes inheriting from this mixin get automatic support for:
- Iteration with for item in merged:
- Length with len(merged)
- Indexing with merged[0]
, merged[-1]
, etc.
- Setting values by index with merged[0] = new_value
__setitem__ ¶
Allow setting values by index, updating the corresponding source observable.
ValueMixin ¶
Mixin class providing value wrapper operators for ObservableValue.
This mixin adds operators that make observable values behave transparently like their underlying values in most Python contexts. It provides magic methods for equality, string conversion, iteration, indexing, etc., while also supporting the reactive operators.
Classes inheriting from this mixin get automatic support for: - Value-like behavior (equality, string conversion, etc.) - Reactive operators (add, and, invert, rshift) - Transparent access to the wrapped observable
and_operator ¶
Implement the &
operator for creating conditional observables.
This operator creates conditional observables that only emit values when boolean conditions are satisfied. The resulting observable filters the reactive stream, preventing unnecessary updates and computations when conditions aren't met.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obs
|
The source observable whose values will be conditionally emitted. |
required | |
condition
|
A boolean observable that acts as a gate. Values from |
required |
Returns:
Type | Description |
---|---|
A new ConditionalObservable that only emits values when the condition is met. |
|
The observable starts with None if the condition is initially False. |
Examples:
from fynx.observable import Observable
# Basic conditional filtering
data = Observable("data", "hello")
is_ready = Observable("ready", False)
filtered = data & is_ready # Only emits when is_ready is True
filtered.subscribe(lambda x: print(f"Received: {x}"))
data.set("world") # No output (is_ready is False)
is_ready.set(True) # Prints: "Received: world"
# Multiple conditions (chained)
user_present = Observable("present", True)
smart_data = data & is_ready & user_present # All must be True
# Practical example: temperature monitoring
temperature = Observable("temp", 20)
alarm_enabled = Observable("alarm", True)
is_critical = Observable("critical", False)
alarm_trigger = temperature & alarm_enabled & is_critical
alarm_trigger.subscribe(lambda t: print(f"🚨 Alarm: {t}°C"))
Note
Multiple conditions can be chained: obs & cond1 & cond2 & cond3
.
All conditions must be True for values to be emitted.
See Also
ConditionalObservable: The class that implements conditional behavior Observable.and: The magic method that calls this operator
rshift_operator ¶
Implement the >>
operator with comprehensive categorical optimization.
This operator creates computed observables using the full categorical optimization system, applying functor composition fusion, product factorization, and cost-optimal materialization strategies automatically.
Categorical Optimization System: - Rule 1: Functor composition collapse (fuses sequential transformations) - Rule 2: Product factorization (shares common subexpressions) - Rule 3: Pullback fusion (combines sequential filters) - Rule 4: Cost-optimal materialization (decides what to cache vs recompute)
The optimization uses a cost functional C(σ) = α·|Dep(σ)| + β·E[Updates(σ)] + γ·depth(σ) to find semantically equivalent observables with minimal computational cost.
For merged observables (created with +
), the function receives multiple arguments
corresponding to the tuple values. For single observables, it receives one argument.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obs
|
Observable[T]
|
The source observable(s) to transform. Can be a single Observable or
a MergedObservable (from |
required |
func
|
Callable[..., U]
|
A pure function that transforms the observable value(s). For merged observables, receives unpacked tuple values as separate arguments. |
required |
Returns:
Type | Description |
---|---|
Observable[U]
|
A new computed observable with optimal structure. Updates automatically |
Observable[U]
|
when source observables change, but with dramatically improved performance |
Observable[U]
|
through categorical optimizations. |
Examples:
from fynx.observable import Observable
# Single observable with automatic optimization
counter = Observable("counter", 5)
result = counter >> (lambda x: x * 2) >> (lambda x: x + 10) >> str
# Automatically optimized to single fused computation
# Complex reactive pipelines are optimized globally
width = Observable("width", 10)
height = Observable("height", 20)
area = (width + height) >> (lambda w, h: w * h)
volume = (width + height + Observable("depth", 5)) >> (lambda w, h, d: w * h * d)
# Shared width/height computations are factored out automatically
Performance
- Chain fusion: O(N) depth → O(1) for transformation chains
- Subexpression sharing: Eliminates redundant computations
- Cost optimization: Balances memory vs computation tradeoffs
- Typical speedup: 1000× - 10000× for deep reactive graphs
See Also
Observable.then: The method that creates computed observables
MergedObservable: For combining multiple observables with +
optimizer: The categorical optimization system