# -*- coding:utf-8 -*-
#
# Anomaly Detection Framework
# Copyright (C) 2018 Bluekiri BigData Team <bigdata@bluekiri.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from collections import Generator
from typing import Any
from rx.core import Observable
from anomalydetection.backend.stream.agg.functions import AggregationFunction
[docs]class BaseObservable(object):
[docs] def get_observable(self) -> Observable:
raise NotImplementedError("To implement in child classes.")
[docs] def map(self, x: Any) -> Any:
"""
Map items in observable.
:param x: input item
:return: output item
"""
return x
[docs]class FileObservable(BaseObservable):
def __init__(self, file: str) -> None:
"""
FileObservable to transform a file lines to an Observable
:param file: a path to local file
"""
super().__init__()
self.file = file
[docs] def get_observable(self) -> Observable:
return Observable.from_(open(self.file).readlines())
[docs]class BaseStreamConsumer(BaseObservable):
[docs] def poll(self) -> Generator:
raise NotImplementedError("To implement in child classes.")
[docs] def get_observable(self) -> Observable:
return Observable.from_(self.poll())
[docs]class BaseStreamProducer:
[docs] def push(self, message: str) -> None:
raise NotImplementedError("To implement in child classes.")
[docs]class BaseStreamAggregation(object):
def __init__(self,
agg_function: AggregationFunction = AggregationFunction.NONE,
agg_window_millis: int = 0) -> None:
"""
BaseStreamAggregation class
:param agg_function: aggregation function
:param agg_window_millis: aggregation window in milliseconds
"""
super().__init__()
self.agg_function = agg_function
self.agg_window_millis = agg_window_millis