Observable Operators¶
Special operators for composing and transforming observables.
FynX Operators - Observable Operator Implementations and Mixins¶
Consider a spreadsheet formula: you reference cells, apply functions, combine values. That formula recalculates automatically when inputs change. This module provides that mechanism as Python operators—familiar syntax that composes reactive values into complex behaviors.
FynX uses operator overloading to make reactive code read like natural expressions. Instead of method chains like obs.map(f).filter(g), you write obs >> f & g. That syntax compresses reactive relationships into declarative statements—the operators handle dependency tracking and automatic updates behind the scenes.
The operators work through three mixin classes that consolidate operator overloading logic. OperatorMixin provides the core reactive operators (+, >>, &, ~, |) for all observable types. TupleMixin adds tuple-like behavior to merged observables—iteration, indexing, length operations. ValueMixin enables transparent value access for ObservableValue instances, making reactive attributes behave like regular values while preserving reactive capabilities.
Result: reactive code that reads like mathematical expressions, with automatic optimization and dependency tracking handled transparently.
Operator Semantics¶
Transform (>>): Apply functions to create derived values
from fynx.observable import Observable
counter = Observable("counter", 5)
doubled = counter >> (lambda x: x * 2)
print(doubled.value) # 10
Filter (&): Only emit values when conditions are met
data = Observable("data", "hello")
is_ready = Observable("ready", False)
filtered = data & is_ready # Only emits when is_ready is True
Combine (+): Merge multiple observables into tuples
x = Observable("x", 1)
y = Observable("y", 2)
z = Observable("z", 3)
coordinates = x + y + z
print(coordinates.value) # (1, 2, 3)
These operators compose to create complex reactive pipelines:
Implementation Architecture¶
The operators delegate to methods in OperationsMixin rather than implementing logic directly. That separation enables lazy loading and avoids circular import issues. When you use obs >> func, Python calls __rshift__, which delegates to obs.then(func). The then method creates computed observables through _create_computed, which registers with the optimization system automatically.
The functions handle different observable types (regular, merged, conditional) uniformly. For merged observables, transformation functions receive unpacked tuple values as separate arguments. For single observables, they receive one argument. That distinction enables functions that work with both coordinate pairs and scalar values.
Performance Characteristics¶
Operators create computed or conditional observables that evaluate lazily—they recalculate only when accessed after dependencies change. Multiple operators chain without creating intermediate objects—the optimization system fuses sequential transformations into single composed functions. Operators reuse existing infrastructure rather than creating new classes, minimizing memory overhead.
Common Patterns¶
Data Processing Pipeline:
from fynx import observable
raw_data = observable([1, -2, 3, -4, 5])
processed = (raw_data
>> (lambda d: [x for x in d if x > 0]) # Filter positive values
>> (lambda d: sorted(d)) # Sort results
>> (lambda d: sum(d) / len(d) if d else 0)) # Calculate average
print(processed.value) # 3.0
Conditional UI Updates:
user_input = observable("")
is_valid = user_input >> (lambda s: len(s) >= 3)
show_error = user_input & ~is_valid # Show error when input is invalid but not empty
Reactive Calculations:
price = observable(10.0)
quantity = observable(1)
tax_rate = observable(0.08)
subtotal = (price + quantity) >> (lambda p, q: p * q)
tax = subtotal >> (lambda s: s * tax_rate.value)
total = (subtotal + tax) >> (lambda s, t: s + t)
print(total.value) # 10.8
Error Handling¶
Transformation function errors propagate normally—the reactive system doesn't swallow exceptions. Invalid operator usage raises TypeError with descriptive messages. Circular dependencies are detected during .set() operations and raise RuntimeError before creating infinite loops.
Best Practices¶
Keep transformation functions pure—no side effects, no external state access. Use named functions for complex operations rather than long lambda expressions. Break complex chains into intermediate variables for clarity. Handle edge cases explicitly—consider None values, empty collections, and boundary conditions.
See Also¶
fynx.observable: Core observable classes that use these operators and mixinsfynx.observable.computed: Computed observables created by the>>operatorfynx.observable.conditional: Conditional observables created by the&operator
OperatorMixin ¶
Mixin class providing common reactive operators for observable classes.
This mixin consolidates the operator overloading logic that was previously duplicated across multiple observable classes. It provides the core reactive operators (add, rshift, and, invert) that enable FynX's fluent reactive programming syntax.
Classes inheriting from this mixin get automatic support for:
- Merging with + operator
- Transformation with >> operator
- Conditional filtering with & operator
- Boolean negation with ~ operator
This mixin should be used by classes that represent reactive values and need to support reactive composition operations.
__add__ ¶
Combine this observable with another using the + operator.
This creates a merged observable that contains a tuple of both values and updates automatically when either observable changes. The merge operation represents the categorical product—combining independent reactive values into a single reactive pair.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
Another Observable to combine with |
required |
Returns:
| Type | Description |
|---|---|
Mergeable
|
A MergedObservable containing both values as a tuple |
__and__ ¶
Create a conditional observable using the & operator for filtered reactivity.
This creates a ConditionalObservable that only emits values when all specified conditions are True, enabling precise control over reactive updates. The operation represents a pullback—filtering the reactive stream through boolean conditions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
condition
|
A boolean Observable, callable, or compound condition |
required |
Returns:
| Type | Description |
|---|---|
Conditional
|
A ConditionalObservable that filters values based on the condition |
__invert__ ¶
Create a negated boolean observable using the ~ operator.
This creates a computed observable that returns the logical negation of the current boolean value, useful for creating inverse conditions. The negation updates automatically when the source changes.
Returns:
| Type | Description |
|---|---|
Observable[bool]
|
A computed Observable[bool] with negated boolean value |
__or__ ¶
Create a logical OR condition using the | operator.
This creates a conditional observable that only emits when the OR result is truthy. If the initial OR result is falsy, raises ConditionalNeverMet. The operation combines boolean observables with logical disjunction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
Another boolean observable to OR with |
required |
Returns:
| Type | Description |
|---|---|
Observable
|
A conditional observable that only emits when OR is truthy |
Raises:
| Type | Description |
|---|---|
ConditionalNeverMet
|
If initial OR result is falsy |
__radd__ ¶
Support right-side addition for merging observables.
This enables expressions like other + self to work correctly,
ensuring that merged observables can be chained properly. Python calls
this method when the left operand doesn't support __add__.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
Another Observable to combine with |
required |
Returns:
| Type | Description |
|---|---|
Mergeable
|
A MergedObservable containing both values as a tuple |
__rshift__ ¶
Apply a transformation function using the >> operator to create computed observables.
This implements the functorial map operation over observables, allowing you to transform observable values through pure functions while preserving reactivity. The operation satisfies the functor laws: identity and composition preservation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable
|
A pure function to apply to the observable's value(s) |
required |
Returns:
| Type | Description |
|---|---|
Observable
|
A new computed Observable containing the transformed values |
TupleMixin ¶
Mixin class providing tuple-like operators for merged observables.
This mixin adds tuple-like behavior to observables that represent collections of values (like MergedObservable). It provides operators for iteration, indexing, and length operations that make merged observables behave like tuples of their component values.
Classes inheriting from this mixin get automatic support for:
- Iteration with for item in merged:
- Length with len(merged)
- Indexing with merged[0], merged[-1], etc.
- Setting values by index with merged[0] = new_value
__setitem__ ¶
Allow setting values by index, updating the corresponding source observable.
ValueMixin ¶
Mixin class providing value wrapper operators for ObservableValue.
This mixin adds operators that make observable values behave transparently like their underlying values in most Python contexts. It provides magic methods for equality, string conversion, iteration, indexing, etc., while also supporting the reactive operators.
Classes inheriting from this mixin get automatic support for: - Value-like behavior (equality, string conversion, etc.) - Reactive operators (add, and, invert, rshift) - Transparent access to the wrapped observable
and_operator ¶
Implement the & operator for creating conditional observables.
This operator creates conditional observables that only emit values when boolean conditions are satisfied. The resulting observable filters the reactive stream, preventing unnecessary updates and computations when conditions aren't met.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obs
|
The source observable whose values will be conditionally emitted. |
required | |
condition
|
A boolean observable that acts as a gate. Values from |
required |
Returns:
| Type | Description |
|---|---|
|
A new ConditionalObservable that only emits values when the condition is met. |
|
|
The observable starts with None if the condition is initially False. |
Examples:
from fynx.observable import Observable
# Basic conditional filtering
data = Observable("data", "hello")
is_ready = Observable("ready", False)
filtered = data & is_ready # Only emits when is_ready is True
filtered.subscribe(lambda x: print(f"Received: {x}"))
data.set("world") # No output (is_ready is False)
is_ready.set(True) # Prints: "Received: world"
# Multiple conditions (chained)
user_present = Observable("present", True)
smart_data = data & is_ready & user_present # All must be True
# Practical example: temperature monitoring
temperature = Observable("temp", 20)
alarm_enabled = Observable("alarm", True)
is_critical = Observable("critical", False)
alarm_trigger = temperature & alarm_enabled & is_critical
alarm_trigger.subscribe(lambda t: print(f"🚨 Alarm: {t}°C"))
Note
Multiple conditions can be chained: obs & cond1 & cond2 & cond3.
All conditions must be True for values to be emitted.
See Also
ConditionalObservable: The class that implements conditional behavior Observable.and: The magic method that calls this operator
rshift_operator ¶
Implement the >> operator with comprehensive categorical optimization.
This operator creates computed observables using the full categorical optimization system, applying functor composition fusion, product factorization, and cost-optimal materialization strategies automatically.
Categorical Optimization System: - Rule 1: Functor composition collapse (fuses sequential transformations) - Rule 2: Product factorization (shares common subexpressions) - Rule 3: Pullback fusion (combines sequential filters) - Rule 4: Cost-optimal materialization (decides what to cache vs recompute)
The optimization uses a cost functional C(σ) = α·|Dep(σ)| + β·E[Updates(σ)] + γ·depth(σ) to find semantically equivalent observables with minimal computational cost.
For merged observables (created with +), the function receives multiple arguments
corresponding to the tuple values. For single observables, it receives one argument.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obs
|
Observable[T]
|
The source observable(s) to transform. Can be a single Observable or
a MergedObservable (from |
required |
func
|
Callable[..., U]
|
A pure function that transforms the observable value(s). For merged observables, receives unpacked tuple values as separate arguments. |
required |
Returns:
| Type | Description |
|---|---|
Observable[U]
|
A new computed observable with optimal structure. Updates automatically |
Observable[U]
|
when source observables change, but with dramatically improved performance |
Observable[U]
|
through categorical optimizations. |
Examples:
from fynx.observable import Observable
# Single observable with automatic optimization
counter = Observable("counter", 5)
result = counter >> (lambda x: x * 2) >> (lambda x: x + 10) >> str
# Automatically optimized to single fused computation
# Complex reactive pipelines are optimized globally
width = Observable("width", 10)
height = Observable("height", 20)
area = (width + height) >> (lambda w, h: w * h)
volume = (width + height + Observable("depth", 5)) >> (lambda w, h, d: w * h * d)
# Shared width/height computations are factored out automatically
Performance
- Chain fusion: O(N) depth → O(1) for transformation chains
- Subexpression sharing: Eliminates redundant computations
- Cost optimization: Balances memory vs computation tradeoffs
- Typical speedup: 1000× - 10000× for deep reactive graphs
See Also
Observable.then: The method that creates computed observables
MergedObservable: For combining multiple observables with +
optimizer: The categorical optimization system