anomalydetection.backend¶
This module contains the core of the anomaly detection framework implementation.
backend.entities¶
-
class
anomalydetection.backend.entities.BaseMessageHandler[source]¶ Bases:
typing.GenericBase message handler
-
classmethod
extract_extra(message)[source]¶ Extract extra data from the parsed message
Parameters: message (~T) – parsed message Return type: dictReturns: a dict of extra values
-
classmethod
extract_key(message)[source]¶ Extracts the key of the message, this value is used to group messages
Parameters: message (~T) – parsed message Return type: strReturns: word
-
classmethod
extract_ts(message)[source]¶ Extract the datetime of the message
Parameters: message (~T) – parsed message Return type: datetimeReturns: a datetime object
-
classmethod
extract_value(message)[source]¶ Extracts the value of the message, this value is that is given to make the prediction
Parameters: message (~T) – parsed message Return type: floatReturns: a float value
-
classmethod
-
class
anomalydetection.backend.entities.input_message.InputMessage(application, value, ts)[source]¶ Bases:
objectThis is the parser of a json message
Parameters:
-
class
anomalydetection.backend.entities.input_message.InputMessageHandler[source]¶ Bases:
anomalydetection.backend.entities.BaseMessageHandler-
classmethod
extract_extra(message)¶ Extract extra data from the parsed message
Parameters: message (~T) – parsed message Return type: dictReturns: a dict of extra values
-
classmethod
extract_key(message)[source]¶ Extracts the key of the message, this value is used to group messages
Parameters: message ( InputMessage) – parsed messageReturn type: strReturns: word
-
classmethod
extract_ts(message)¶ Extract the datetime of the message
Parameters: message (~T) – parsed message Return type: datetimeReturns: a datetime object
-
classmethod
extract_value(message)[source]¶ Extracts the value of the message, this value is that is given to make the prediction
Parameters: message ( InputMessage) – parsed messageReturn type: floatReturns: a float value
-
classmethod
parse_message(message)[source]¶ Parse or transform an input message and returns it
Parameters: message ( InputMessage) – message serialized in a stringReturn type: InputMessageReturns: parsed message
-
classmethod
validate_message(message)[source]¶ Validates a message
Parameters: message ( InputMessage) – validates if a message is valid or notReturn type: boolReturns: True if is valid, False if is not
-
classmethod
-
class
anomalydetection.backend.entities.output_message.AnomalyResult(value_lower_limit, value_upper_limit, anomaly_probability, is_anomaly)[source]¶ Bases:
objectAnomalyResult description
Variables: - value_lower_limit – lower bound limit
- value_upper_limit – upper bound limit
- anomaly_probability – probability of being anomalous
- is_anomaly – if its anomalous or not
AnomalyResults constructor
Parameters:
-
class
anomalydetection.backend.entities.output_message.OutputMessage(application, anomaly_results, agg_window_millis=0, agg_function=<AggregationFunction.NONE: 'none'>, agg_value=0, ts=datetime.datetime(2018, 7, 11, 14, 27, 48, 354220))[source]¶ Bases:
objectOutputMessage class description
Variables: - application – application name
- anomaly_results – anomaly results
- agg_window_millis – aggregation window in milliseconds
- agg_function – aggregation function
- agg_value – the value after aggregation
- ts – timestamp
OutputMessage class constructor
Parameters: - application (
str) – application name - anomaly_results (
AnomalyResult) – anomaly results - agg_window_millis (
int) – aggregation window in milliseconds - agg_function (
AggregationFunction) – aggregation function - agg_value (
float) – the value after aggregation - ts (<module 'datetime' from '/usr/lib/python3.5/datetime.py'>) – timestamp
-
class
anomalydetection.backend.entities.output_message.OutputMessageHandler[source]¶ Bases:
anomalydetection.backend.entities.BaseMessageHandler-
classmethod
extract_extra(message)[source]¶ Extract extra data from the parsed message
Parameters: message ( InputMessage) – parsed messageReturn type: dictReturns: a dict of extra values
-
classmethod
extract_key(message)[source]¶ Extracts the key of the message, this value is used to group messages
Parameters: message ( InputMessage) – parsed messageReturn type: strReturns: word
-
classmethod
extract_ts(message)¶ Extract the datetime of the message
Parameters: message (~T) – parsed message Return type: datetimeReturns: a datetime object
-
classmethod
extract_value(message)[source]¶ Extracts the value of the message, this value is that is given to make the prediction
Parameters: message ( InputMessage) – parsed messageReturn type: floatReturns: a float value
-
classmethod
parse_message(message)[source]¶ Parse or transform an input message and returns it
Parameters: message ( OutputMessage) – message serialized in a stringReturn type: InputMessageReturns: parsed message
-
classmethod
validate_message(message)[source]¶ Validates a message
Parameters: message ( InputMessage) – validates if a message is valid or notReturn type: boolReturns: True if is valid, False if is not
-
classmethod
There is a default JSON format message handler implementation ready to use
backend.engine¶
-
class
anomalydetection.backend.engine.builder.BaseEngineBuilder[source]¶ Bases:
objectBaseBuilder, implement this to create Engine Builders.
-
build()[source]¶ Build the engine
Return type: BaseEngineReturns: A BaseEngine implementation instance.
-
-
class
anomalydetection.backend.engine.builder.CADDetectorBuilder(min_value=-9223372036854775807, max_value=9223372036854775807, threshold=0.95, rest_period=30, max_left_semi_contexts_length=8, max_active_neurons_num=16, num_norm_value_bits=3)[source]¶ Bases:
anomalydetection.backend.engine.builder.BaseEngineBuilder-
build()[source]¶ Build the engine
Return type: CADDetectorReturns: A BaseEngine implementation instance.
-
set(name, value)¶
-
type= 'cad'¶
-
-
class
anomalydetection.backend.engine.builder.EMADetectorBuilder(window=100, threshold=0.9999)[source]¶ Bases:
anomalydetection.backend.engine.builder.BaseEngineBuilder-
build()[source]¶ Build the engine
Return type: EMADetectorReturns: A BaseEngine implementation instance.
-
set(name, value)¶
-
type= 'ema'¶
-
-
class
anomalydetection.backend.engine.builder.EngineBuilderFactory[source]¶ Bases:
object-
engines= {'cad': {'key': 'cad', 'name': 'CADDetector'}, 'ema': {'key': 'ema', 'name': 'EMADetector'}, 'robust': {'key': 'robust', 'name': 'RobustDetector'}}¶
-
static
get(name)[source]¶ Return type: BaseEngineBuilder
-
static
get_cad()[source]¶ Return type: CADDetectorBuilder
-
static
get_ema()[source]¶ Return type: EMADetectorBuilder
-
static
get_plugin(name)[source]¶ Return type: BaseEngineBuilder
-
static
get_robust()[source]¶ Return type: RobustDetectorBuilder
-
-
class
anomalydetection.backend.engine.builder.RobustDetectorBuilder(window=100, threshold=0.9999)[source]¶ Bases:
anomalydetection.backend.engine.builder.BaseEngineBuilder-
build()[source]¶ Build the engine
Return type: RobustDetectorReturns: A BaseEngine implementation instance.
-
set(name, value)¶
-
type= 'robust'¶
-
-
class
anomalydetection.backend.engine.BaseEngine[source]¶ Bases:
objectBase class for any Engine implementation.
-
class
anomalydetection.backend.engine.cad_engine.CADDetector(min_value=-9223372036854775807, max_value=9223372036854775807, threshold=0.95, rest_period=30, max_left_semi_contexts_length=8, max_active_neurons_num=16, num_norm_value_bits=3)[source]¶ Bases:
anomalydetection.backend.engine.BaseEngineContextual Anomaly Detector - Open Source Edition https://github.com/smirmik/CAD
-
class
anomalydetection.backend.engine.cad_engine.ContextOperator(max_left_semi_cntx_len)[source]¶ Bases:
object
-
class
anomalydetection.backend.engine.ema_engine.EMADetector(window=100, threshold=2.0)[source]¶ Bases:
anomalydetection.backend.engine.BaseEngineEMADetector constructor :param window: window of samples to work with :param threshold: threshold for confidence
-
class
anomalydetection.backend.engine.robust_z_engine.RobustDetector(window=100, threshold=0.9999)[source]¶ Bases:
anomalydetection.backend.engine.BaseEngine,anomalydetection.common.logging.LoggingMixinAnomaly detection engine based in robust statistics, median and median absolute deviation.
Parameters: - window – window of samples to work with
- threshold – threshold for confidence
-
logger¶ Logger object.
Returns: a configured logger object
backend.interactor¶
-
class
anomalydetection.backend.interactor.BaseEngineInteractor(engine_builder, message_handler)[source]¶ Bases:
objectBaseEngineInteractor is responsible to hold the engine builder and the message handler. It’s also responsible for build engines for each application
BaseEngineInteractor constructor
Parameters: - engine_builder (
BaseEngineBuilder) – engine builder - message_handler (
BaseMessageHandler[~T]) – message handler
- engine_builder (
-
class
anomalydetection.backend.interactor.batch_engine.BatchEngineInteractor(batch, engine_builder, message_handler)[source]¶ Bases:
anomalydetection.backend.interactor.BaseEngineInteractor,anomalydetection.common.logging.LoggingMixinBatchEngineInteractor is an implementation for batch process an Observable
BatchEngineInteractor constructor
Parameters: - batch (
BaseObservable) – an observable - engine_builder (
BaseEngineBuilder) – an engine builder - message_handler (
BaseMessageHandler[~T]) – a message handler
-
get_engine(application)¶ Return the engine for the application in a thread safe way
Parameters: application ( str) – application nameReturns: its engine
-
logger¶ Logger object.
Returns: a configured logger object
-
map_with_engine(input_message)[source]¶ Return type: OutputMessage
- batch (
backend.repository¶
-
class
anomalydetection.backend.repository.BaseObservableRepository(repository)[source]¶ Bases:
anomalydetection.backend.stream.BaseObservableUse a repository as an Observable
BaseObservableRepository constructor
Parameters: repository ( BaseRepository) – a repository
-
class
anomalydetection.backend.repository.BaseRepository(conn)[source]¶ Bases:
object-
fetch(application, from_ts, to_ts)[source]¶ Fetch data from repository, should return ordered data
Parameters: - application – application name
- from_ts – from timestamp
- to_ts – to timestamp
Return type: Iterable[+T_co]Returns: an iterable
-
get_applications()[source]¶ Return a list of distinct applications contained in the repository.
Return type: List[str]Returns: a list of application names
-
insert(message)[source]¶ Insert an OutputMessage into the repository
Parameters: message ( OutputMessage) – an output messageReturn type: None
-
map(item)[source]¶ Map function to map elements returned by fetch method to OutputMessage
Parameters: item ( Any) – an element in fetch iterableReturn type: OutputMessageReturns: an OutputMessage
-
backend.repository.builder¶
-
class
anomalydetection.backend.repository.builder.BaseRepositoryBuilder[source]¶ Bases:
objectBaseBuilder, implement this to create Repository Builders.
-
build()[source]¶ Build a repository
Return type: BaseRepositoryReturns: A BaseRepository implementation instance.
-
-
class
anomalydetection.backend.repository.builder.RepositoryBuilderFactory[source]¶ Bases:
object-
static
get(name)[source]¶ Return type: BaseRepositoryBuilder
-
static
get_plugin(name)[source]¶ Return type: BaseRepositoryBuilder
-
static
get_sqlite()[source]¶ Return type: SQLiteBuilder
-
static
-
class
anomalydetection.backend.repository.builder.SQLiteBuilder(database=None)[source]¶ Bases:
anomalydetection.backend.repository.builder.BaseRepositoryBuilder-
build()[source]¶ Build a repository
Return type: BaseRepositoryReturns: A BaseRepository implementation instance.
-
set(name, value)¶
-
backend.repository.observable¶
-
class
anomalydetection.backend.repository.observable.ObservableRepository(repository, application=None, from_ts=None, to_ts=None)[source]¶ Bases:
anomalydetection.backend.repository.BaseObservableRepositoryCreates ObservableRepository that is capable to act as an observable
Parameters: - repository (
BaseRepository) – the repository - application – application name
- from_ts – from timestamp
- to_ts – to timestamp
-
get_max()¶
-
get_min()¶
-
get_observable()¶ Return type: Observable
- repository (
backend.repository.sqlite¶
-
class
anomalydetection.backend.repository.sqlite.SQLiteRepository(database)[source]¶ Bases:
anomalydetection.backend.repository.BaseRepositorySQLiteRepository constructor
Parameters: database ( str) – database path-
fetch(application, from_ts, to_ts)[source]¶ Fetch data from repository, should return ordered data
Parameters: - application – application name
- from_ts – from timestamp
- to_ts – to timestamp
Return type: Returns: an iterable
-
get_applications()[source]¶ Return a list of distinct applications contained in the repository.
Returns: a list of application names
-
insert(message)[source]¶ Insert an OutputMessage into the repository
Parameters: message ( OutputMessage) – an output message
-
map(item)[source]¶ Map function to map elements returned by fetch method to OutputMessage
Parameters: item ( Row) – an element in fetch iterableReturn type: OutputMessageReturns: an OutputMessage
-
backend.sink¶
-
class
anomalydetection.backend.sink.BaseSink[source]¶ Bases:
rx.core.py3.observer.ObserverBaseSink, implement this to create Sinks
-
as_observer()¶ Hides the identity of an observer.
Returns an observer that hides the identity of the specified observer.
-
checked()¶ Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods. If a violation is detected, an Error is thrown from the offending observer method call.
Returns an observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.
-
classmethod
from_notifier(handler)¶ Creates an observer from a notification callback.
Keyword arguments: :param handler: Action that handles a notification.
Returns: The observer object that invokes the specified handler using a notification corresponding to each message it receives. :rtype: Observer
-
on_completed()¶
-
on_error(error)¶
-
on_next(value)¶
-
to_notifier()¶ Creates a notification callback from an observer.
Returns the action that forwards its input notification to the underlying observer.
-
backend.sink.repository¶
-
class
anomalydetection.backend.sink.repository.RepositorySink(repository)[source]¶ Bases:
anomalydetection.backend.sink.BaseSink,anomalydetection.common.logging.LoggingMixinCreates a RepositorySink that is capable to sink OutputMessages into the given repository
Parameters: repository ( BaseRepository) – a repository-
as_observer()¶ Hides the identity of an observer.
Returns an observer that hides the identity of the specified observer.
-
checked()¶ Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods. If a violation is detected, an Error is thrown from the offending observer method call.
Returns an observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.
-
classmethod
from_notifier(handler)¶ Creates an observer from a notification callback.
Keyword arguments: :param handler: Action that handles a notification.
Returns: The observer object that invokes the specified handler using a notification corresponding to each message it receives. :rtype: Observer
-
logger¶ Logger object.
Returns: a configured logger object
-
to_notifier()¶ Creates a notification callback from an observer.
Returns the action that forwards its input notification to the underlying observer.
-
backend.sink.stream¶
-
class
anomalydetection.backend.sink.stream.StreamSink(stream_producer)[source]¶ Bases:
anomalydetection.backend.sink.BaseSink,anomalydetection.common.logging.LoggingMixinCreates a StreamSink that is capable to sink OutputMessages into the stream producer
Parameters: stream_producer ( BaseStreamProducer) – an stream producer-
as_observer()¶ Hides the identity of an observer.
Returns an observer that hides the identity of the specified observer.
-
checked()¶ Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods. If a violation is detected, an Error is thrown from the offending observer method call.
Returns an observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.
-
classmethod
from_notifier(handler)¶ Creates an observer from a notification callback.
Keyword arguments: :param handler: Action that handles a notification.
Returns: The observer object that invokes the specified handler using a notification corresponding to each message it receives. :rtype: Observer
-
logger¶ Logger object.
Returns: a configured logger object
-
to_notifier()¶ Creates a notification callback from an observer.
Returns the action that forwards its input notification to the underlying observer.
-
backend.sink.websocket¶
-
class
anomalydetection.backend.sink.websocket.WebSocketSink(name, url)[source]¶ Bases:
anomalydetection.backend.sink.BaseSink,anomalydetection.common.logging.LoggingMixinImplementation to Sink OutputMessage stream to a WebSocket
Parameters: -
as_observer()¶ Hides the identity of an observer.
Returns an observer that hides the identity of the specified observer.
-
checked()¶ Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods. If a violation is detected, an Error is thrown from the offending observer method call.
Returns an observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.
-
classmethod
from_notifier(handler)¶ Creates an observer from a notification callback.
Keyword arguments: :param handler: Action that handles a notification.
Returns: The observer object that invokes the specified handler using a notification corresponding to each message it receives. :rtype: Observer
-
logger¶ Logger object.
Returns: a configured logger object
-
to_notifier()¶ Creates a notification callback from an observer.
Returns the action that forwards its input notification to the underlying observer.
-
backend.stream¶
-
class
anomalydetection.backend.stream.BaseStreamAggregation(agg_function=<AggregationFunction.NONE: 'none'>, agg_window_millis=0)[source]¶ Bases:
objectBaseStreamAggregation class
Parameters: - agg_function (
AggregationFunction) – aggregation function - agg_window_millis (
int) – aggregation window in milliseconds
- agg_function (
-
class
anomalydetection.backend.stream.BaseStreamConsumer[source]¶
-
class
anomalydetection.backend.stream.FileObservable(file)[source]¶ Bases:
anomalydetection.backend.stream.BaseObservableFileObservable to transform a file lines to an Observable
Parameters: file ( str) – a path to local file
backend.stream.agg¶
backend.stream.builder¶
backend.stream.kafka¶
-
class
anomalydetection.backend.stream.kafka.KafkaStreamConsumer(broker_servers, input_topic, group_id)[source]¶ Bases:
anomalydetection.backend.stream.BaseStreamConsumer,anomalydetection.common.logging.LoggingMixinKafkaStreamConsumer constructor
Parameters: -
get_observable()¶ Return type: Observable
-
logger¶ Logger object.
Returns: a configured logger object
-
-
class
anomalydetection.backend.stream.kafka.KafkaStreamProducer(broker_servers, output_topic)[source]¶ Bases:
anomalydetection.backend.stream.BaseStreamProducer,anomalydetection.common.logging.LoggingMixinKafkaStreamProducer constructor
Parameters: -
logger¶ Logger object.
Returns: a configured logger object
-
-
class
anomalydetection.backend.stream.kafka.SparkKafkaStreamConsumer(broker_servers, input_topic, group_id, agg_function, agg_window_millis, spark_opts={}, multiprocessing=True)[source]¶ Bases:
anomalydetection.backend.stream.BaseStreamConsumer,anomalydetection.backend.stream.BaseStreamAggregation,anomalydetection.common.logging.LoggingMixinSparkKafkaStreamConsumer constructor
Parameters: - broker_servers (
str) – broker servers - input_topic (
str) – input topic - group_id (
str) – consumer group id - agg_function (
AggregationFunction) – aggregation function to apply - agg_window_millis (
int) – aggregation window in milliseconds - spark_opts (
dict) – spark options dict - multiprocessing – use multiprocessing instead of threading
-
get_observable()¶ Return type: Observable
-
logger¶ Logger object.
Returns: a configured logger object
- broker_servers (