Skip to content

Observable Operators

Special operators for composing and transforming observables.

FynX Operators - Observable Operator Implementations and Mixins

Consider a spreadsheet formula: you reference cells, apply functions, combine values. That formula recalculates automatically when inputs change. This module provides that mechanism as Python operators—familiar syntax that composes reactive values into complex behaviors.

FynX uses operator overloading to make reactive code read like natural expressions. Instead of method chains like obs.map(f).filter(g), you write obs >> f & g. That syntax compresses reactive relationships into declarative statements—the operators handle dependency tracking and automatic updates behind the scenes.

The operators work through three mixin classes that consolidate operator overloading logic. OperatorMixin provides the core reactive operators (+, >>, &, ~, |) for all observable types. TupleMixin adds tuple-like behavior to merged observables—iteration, indexing, length operations. ValueMixin enables transparent value access for ObservableValue instances, making reactive attributes behave like regular values while preserving reactive capabilities.

Result: reactive code that reads like mathematical expressions, with automatic optimization and dependency tracking handled transparently.

Operator Semantics

Transform (>>): Apply functions to create derived values

from fynx.observable import Observable

counter = Observable("counter", 5)
doubled = counter >> (lambda x: x * 2)
print(doubled.value)  # 10

Filter (&): Only emit values when conditions are met

data = Observable("data", "hello")
is_ready = Observable("ready", False)
filtered = data & is_ready  # Only emits when is_ready is True

Combine (+): Merge multiple observables into tuples

x = Observable("x", 1)
y = Observable("y", 2)
z = Observable("z", 3)
coordinates = x + y + z
print(coordinates.value)  # (1, 2, 3)

These operators compose to create complex reactive pipelines:

result = (x + y) >> (lambda a, b: a + b) & (total >> (lambda t: t > 10))

Implementation Architecture

The operators delegate to methods in OperationsMixin rather than implementing logic directly. That separation enables lazy loading and avoids circular import issues. When you use obs >> func, Python calls __rshift__, which delegates to obs.then(func). The then method creates computed observables through _create_computed, which registers with the optimization system automatically.

The functions handle different observable types (regular, merged, conditional) uniformly. For merged observables, transformation functions receive unpacked tuple values as separate arguments. For single observables, they receive one argument. That distinction enables functions that work with both coordinate pairs and scalar values.

Performance Characteristics

Operators create computed or conditional observables that evaluate lazily—they recalculate only when accessed after dependencies change. Multiple operators chain without creating intermediate objects—the optimization system fuses sequential transformations into single composed functions. Operators reuse existing infrastructure rather than creating new classes, minimizing memory overhead.

Common Patterns

Data Processing Pipeline:

from fynx import observable

raw_data = observable([1, -2, 3, -4, 5])
processed = (raw_data
    >> (lambda d: [x for x in d if x > 0])  # Filter positive values
    >> (lambda d: sorted(d))                # Sort results
    >> (lambda d: sum(d) / len(d) if d else 0))  # Calculate average
print(processed.value)  # 3.0

Conditional UI Updates:

user_input = observable("")
is_valid = user_input >> (lambda s: len(s) >= 3)
show_error = user_input & ~is_valid  # Show error when input is invalid but not empty

Reactive Calculations:

price = observable(10.0)
quantity = observable(1)
tax_rate = observable(0.08)

subtotal = (price + quantity) >> (lambda p, q: p * q)
tax = subtotal >> (lambda s: s * tax_rate.value)
total = (subtotal + tax) >> (lambda s, t: s + t)
print(total.value)  # 10.8

Error Handling

Transformation function errors propagate normally—the reactive system doesn't swallow exceptions. Invalid operator usage raises TypeError with descriptive messages. Circular dependencies are detected during .set() operations and raise RuntimeError before creating infinite loops.

Best Practices

Keep transformation functions pure—no side effects, no external state access. Use named functions for complex operations rather than long lambda expressions. Break complex chains into intermediate variables for clarity. Handle edge cases explicitly—consider None values, empty collections, and boundary conditions.

See Also

  • fynx.observable: Core observable classes that use these operators and mixins
  • fynx.observable.computed: Computed observables created by the >> operator
  • fynx.observable.conditional: Conditional observables created by the & operator

OperatorMixin

Mixin class providing common reactive operators for observable classes.

This mixin consolidates the operator overloading logic that was previously duplicated across multiple observable classes. It provides the core reactive operators (add, rshift, and, invert) that enable FynX's fluent reactive programming syntax.

Classes inheriting from this mixin get automatic support for: - Merging with + operator - Transformation with >> operator - Conditional filtering with & operator - Boolean negation with ~ operator

This mixin should be used by classes that represent reactive values and need to support reactive composition operations.

__add__

__add__(other)

Combine this observable with another using the + operator.

This creates a merged observable that contains a tuple of both values and updates automatically when either observable changes. The merge operation represents the categorical product—combining independent reactive values into a single reactive pair.

Parameters:

Name Type Description Default
other

Another Observable to combine with

required

Returns:

Type Description
Mergeable

A MergedObservable containing both values as a tuple

__and__

__and__(condition)

Create a conditional observable using the & operator for filtered reactivity.

This creates a ConditionalObservable that only emits values when all specified conditions are True, enabling precise control over reactive updates. The operation represents a pullback—filtering the reactive stream through boolean conditions.

Parameters:

Name Type Description Default
condition

A boolean Observable, callable, or compound condition

required

Returns:

Type Description
Conditional

A ConditionalObservable that filters values based on the condition

__invert__

__invert__()

Create a negated boolean observable using the ~ operator.

This creates a computed observable that returns the logical negation of the current boolean value, useful for creating inverse conditions. The negation updates automatically when the source changes.

Returns:

Type Description
Observable[bool]

A computed Observable[bool] with negated boolean value

__or__

__or__(other)

Create a logical OR condition using the | operator.

This creates a conditional observable that only emits when the OR result is truthy. If the initial OR result is falsy, raises ConditionalNeverMet. The operation combines boolean observables with logical disjunction.

Parameters:

Name Type Description Default
other

Another boolean observable to OR with

required

Returns:

Type Description
Observable

A conditional observable that only emits when OR is truthy

Raises:

Type Description
ConditionalNeverMet

If initial OR result is falsy

__radd__

__radd__(other)

Support right-side addition for merging observables.

This enables expressions like other + self to work correctly, ensuring that merged observables can be chained properly. Python calls this method when the left operand doesn't support __add__.

Parameters:

Name Type Description Default
other

Another Observable to combine with

required

Returns:

Type Description
Mergeable

A MergedObservable containing both values as a tuple

__rshift__

__rshift__(func)

Apply a transformation function using the >> operator to create computed observables.

This implements the functorial map operation over observables, allowing you to transform observable values through pure functions while preserving reactivity. The operation satisfies the functor laws: identity and composition preservation.

Parameters:

Name Type Description Default
func Callable

A pure function to apply to the observable's value(s)

required

Returns:

Type Description
Observable

A new computed Observable containing the transformed values

TupleMixin

Mixin class providing tuple-like operators for merged observables.

This mixin adds tuple-like behavior to observables that represent collections of values (like MergedObservable). It provides operators for iteration, indexing, and length operations that make merged observables behave like tuples of their component values.

Classes inheriting from this mixin get automatic support for: - Iteration with for item in merged: - Length with len(merged) - Indexing with merged[0], merged[-1], etc. - Setting values by index with merged[0] = new_value

__getitem__

__getitem__(index)

Allow indexing into the merged observable like a tuple.

__iter__

__iter__()

Allow iteration over the tuple value.

__len__

__len__()

Return the number of combined observables.

__setitem__

__setitem__(index, value)

Allow setting values by index, updating the corresponding source observable.

ValueMixin

Mixin class providing value wrapper operators for ObservableValue.

This mixin adds operators that make observable values behave transparently like their underlying values in most Python contexts. It provides magic methods for equality, string conversion, iteration, indexing, etc., while also supporting the reactive operators.

Classes inheriting from this mixin get automatic support for: - Value-like behavior (equality, string conversion, etc.) - Reactive operators (add, and, invert, rshift) - Transparent access to the wrapped observable

__add__

__add__(other)

Support merging observables with + operator.

__and__

__and__(condition)

Support conditional observables with & operator.

__invert__

__invert__()

Support negating conditions with ~ operator.

__radd__

__radd__(other)

Support right-side addition for merging observables.

__rshift__

__rshift__(func)

Support computed observables with >> operator.

and_operator

and_operator(obs, condition)

Implement the & operator for creating conditional observables.

This operator creates conditional observables that only emit values when boolean conditions are satisfied. The resulting observable filters the reactive stream, preventing unnecessary updates and computations when conditions aren't met.

Parameters:

Name Type Description Default
obs

The source observable whose values will be conditionally emitted.

required
condition

A boolean observable that acts as a gate. Values from obs are only emitted when this condition is True.

required

Returns:

Type Description

A new ConditionalObservable that only emits values when the condition is met.

The observable starts with None if the condition is initially False.

Examples:

from fynx.observable import Observable

# Basic conditional filtering
data = Observable("data", "hello")
is_ready = Observable("ready", False)

filtered = data & is_ready  # Only emits when is_ready is True

filtered.subscribe(lambda x: print(f"Received: {x}"))
data.set("world")      # No output (is_ready is False)
is_ready.set(True)     # Prints: "Received: world"

# Multiple conditions (chained)
user_present = Observable("present", True)
smart_data = data & is_ready & user_present  # All must be True

# Practical example: temperature monitoring
temperature = Observable("temp", 20)
alarm_enabled = Observable("alarm", True)
is_critical = Observable("critical", False)

alarm_trigger = temperature & alarm_enabled & is_critical
alarm_trigger.subscribe(lambda t: print(f"🚨 Alarm: {t}°C"))
Note

Multiple conditions can be chained: obs & cond1 & cond2 & cond3. All conditions must be True for values to be emitted.

See Also

ConditionalObservable: The class that implements conditional behavior Observable.and: The magic method that calls this operator

rshift_operator

rshift_operator(obs, func)

Implement the >> operator with comprehensive categorical optimization.

This operator creates computed observables using the full categorical optimization system, applying functor composition fusion, product factorization, and cost-optimal materialization strategies automatically.

Categorical Optimization System: - Rule 1: Functor composition collapse (fuses sequential transformations) - Rule 2: Product factorization (shares common subexpressions) - Rule 3: Pullback fusion (combines sequential filters) - Rule 4: Cost-optimal materialization (decides what to cache vs recompute)

The optimization uses a cost functional C(σ) = α·|Dep(σ)| + β·E[Updates(σ)] + γ·depth(σ) to find semantically equivalent observables with minimal computational cost.

For merged observables (created with +), the function receives multiple arguments corresponding to the tuple values. For single observables, it receives one argument.

Parameters:

Name Type Description Default
obs Observable[T]

The source observable(s) to transform. Can be a single Observable or a MergedObservable (from + operator).

required
func Callable[..., U]

A pure function that transforms the observable value(s). For merged observables, receives unpacked tuple values as separate arguments.

required

Returns:

Type Description
Observable[U]

A new computed observable with optimal structure. Updates automatically

Observable[U]

when source observables change, but with dramatically improved performance

Observable[U]

through categorical optimizations.

Examples:

from fynx.observable import Observable

# Single observable with automatic optimization
counter = Observable("counter", 5)
result = counter >> (lambda x: x * 2) >> (lambda x: x + 10) >> str
# Automatically optimized to single fused computation

# Complex reactive pipelines are optimized globally
width = Observable("width", 10)
height = Observable("height", 20)
area = (width + height) >> (lambda w, h: w * h)
volume = (width + height + Observable("depth", 5)) >> (lambda w, h, d: w * h * d)
# Shared width/height computations are factored out automatically
Performance
  • Chain fusion: O(N) depth → O(1) for transformation chains
  • Subexpression sharing: Eliminates redundant computations
  • Cost optimization: Balances memory vs computation tradeoffs
  • Typical speedup: 1000× - 10000× for deep reactive graphs
See Also

Observable.then: The method that creates computed observables MergedObservable: For combining multiple observables with + optimizer: The categorical optimization system