Source code for rx.core.notification

from . import Observer, AnonymousObserver
from rx.concurrency import immediate_scheduler
from rx.internal import extensionclassmethod

from .anonymousobservable import AnonymousObservable


class Notification(object):
    """Represents a notification to an observer."""

    def __init__(self):
        """Default constructor used by derived types."""
        self.has_value = False

    def accept(self, on_next, on_error=None, on_completed=None):
        """Invokes the delegate corresponding to the notification or an
        observer and returns the produced result.

        1 - notification.accept(observer)
        2 - notification.accept(on_next, on_error, on_completed)

        Keyword arguments:
        on_next -- Delegate to invoke for an OnNext notification.
        on_error -- [Optional] Delegate to invoke for an OnError notification.
        on_completed -- [Optional] Delegate to invoke for an OnCompleted notification.

        Returns result produced by the observation."""

        if isinstance(on_next, Observer):
            return self._accept_observable(on_next)
        else:
            return self._accept(on_next, on_error, on_completed)

    def to_observable(self, scheduler=None):
        """Returns an observable sequence with a single notification, using the
        specified scheduler, else the immediate scheduler.

        Keyword arguments:
        scheduler -- [Optional] Scheduler to send out the notification calls on.

        Returns an observable sequence that surfaces the behavior of the
        notification upon subscription.
        """

        notification = self
        scheduler = scheduler or immediate_scheduler

        def subscribe(observer):
            def action(scheduler, state):
                notification._accept_observable(observer)
                if notification.kind == 'N':
                    observer.on_completed()

            return scheduler.schedule(action)
        return AnonymousObservable(subscribe)

    def equals(self, other):
        """Indicates whether this instance and a specified object are equal."""

        other_string = '' if not other else str(other)
        return str(self) == other_string

    def __eq__(self, other):
        return self.equals(other)


class OnNext(Notification):
    """Represents an OnNext notification to an observer."""

    def __init__(self, value):
        """Constructs a notification of a new value."""

        super(OnNext, self).__init__()
        self.value = value
        self.has_value = True
        self.kind = 'N'

    def _accept(self, on_next, on_error=None, on_completed=None):
        return on_next(self.value)

    def _accept_observable(self, observer):
        return observer.on_next(self.value)

    def __str__(self):
        return "OnNext(%s)" % str(self.value)


class OnError(Notification):
    """Represents an OnError notification to an observer."""

    def __init__(self, exception):
        """Constructs a notification of an exception."""

        super(OnError, self).__init__()
        self.exception = exception
        self.kind = 'E'

    def _accept(self, on_next, on_error, on_completed):
        return on_error(self.exception)

    def _accept_observable(self, observer):
        return observer.on_error(self.exception)

    def __str__(self):
        return "OnError(%s)" % str(self.exception)


class OnCompleted(Notification):
    """Represents an OnCompleted notification to an observer."""

    def __init__(self):
        """Constructs a notification of the end of a sequence."""

        super(OnCompleted, self).__init__()
        self.kind = 'C'

    def _accept(self, on_next, on_error, on_completed):
        return on_completed()

    def _accept_observable(self, observer):
        return observer.on_completed()

    def __str__(self):
        return "OnCompleted()"

    @classmethod
    def observer_from_notifier(cls, handler):
        """Creates an observer from a notification callback.

        Keyword arguments:
        handler -- Action that handles a notification.

        Returns the observer object that invokes the specified handler using a
        notification corresponding to each message it receives.
        """

        def _on_next(value):
            return handler(OnNext(value))

        def _on_error(ex):
            return handler(OnError(ex))

        def _on_completed():
            return handler(OnCompleted())

        return AnonymousObserver(_on_next, _on_error, _on_completed)


@extensionclassmethod(Observer)
def from_notifier(cls, handler):
    """Creates an observer from a notification callback.

    Keyword arguments:
    :param handler: Action that handles a notification.

    :returns: The observer object that invokes the specified handler using a
    notification corresponding to each message it receives.
    :rtype: Observer
    """

    def _on_next(value):
        return handler(OnNext(value))

    def _on_error(ex):
        return handler(OnError(ex))

    def _on_completed():
        return handler(OnCompleted())

    return AnonymousObserver(_on_next, _on_error, _on_completed)