Source code for rx.core.checkedobserver

from . import Observer
from rx.internal import extensionmethod
from rx.internal.exceptions import ReEntracyException, CompletedException


class CheckedObserver(Observer):

    def __init__(self, observer):
        self._observer = observer
        self._state = 0  # 0 - idle, 1 - busy, 2 - done

    def on_next(self, value):
        self.check_access()
        try:
            self._observer.on_next(value)
        finally:
            self._state = 0

    def on_error(self, err):
        self.check_access()
        try:
            self._observer.on_error(err)
        finally:
            self._state = 2

    def on_completed(self):
        self.check_access()
        try:
            self._observer.on_completed()
        finally:
            self._state = 2

    def check_access(self):
        """Checks access to the observer for grammar violations.

        OnNext* (OnError | OnCompleted)?
        """

        if self._state == 1:
            raise ReEntracyException()
        if self._state == 2:
            raise CompletedException()
        if self._state == 0:
            self._state = 1


@extensionmethod(Observer)
def checked(self):
    """Checks access to the observer for grammar violations. This includes
    checking for multiple OnError or OnCompleted calls, as well as
    reentrancy in any of the observer methods. If a violation is detected,
    an Error is thrown from the offending observer method call.

    Returns an observer that checks callbacks invocations against the
    observer grammar and, if the checks pass, forwards those to the
    specified observer."""

    return CheckedObserver(self)