anomalydetection.backend¶
This module contains the core of the anomaly detection framework implementation.
backend.entities¶
-
class
anomalydetection.backend.entities.
BaseMessageHandler
[source]¶ Bases:
typing.Generic
Base message handler
-
classmethod
extract_extra
(message)[source]¶ Extract extra data from the parsed message
Parameters: message (~T) – parsed message Return type: dict
Returns: a dict of extra values
-
classmethod
extract_key
(message)[source]¶ Extracts the key of the message, this value is used to group messages
Parameters: message (~T) – parsed message Return type: str
Returns: word
-
classmethod
extract_ts
(message)[source]¶ Extract the datetime of the message
Parameters: message (~T) – parsed message Return type: datetime
Returns: a datetime object
-
classmethod
extract_value
(message)[source]¶ Extracts the value of the message, this value is that is given to make the prediction
Parameters: message (~T) – parsed message Return type: float
Returns: a float value
-
classmethod
-
class
anomalydetection.backend.entities.input_message.
InputMessage
(application, value, ts)[source]¶ Bases:
object
This is the parser of a json message
Parameters:
-
class
anomalydetection.backend.entities.input_message.
InputMessageHandler
[source]¶ Bases:
anomalydetection.backend.entities.BaseMessageHandler
-
classmethod
extract_extra
(message)¶ Extract extra data from the parsed message
Parameters: message (~T) – parsed message Return type: dict
Returns: a dict of extra values
-
classmethod
extract_key
(message)[source]¶ Extracts the key of the message, this value is used to group messages
Parameters: message ( InputMessage
) – parsed messageReturn type: str
Returns: word
-
classmethod
extract_ts
(message)¶ Extract the datetime of the message
Parameters: message (~T) – parsed message Return type: datetime
Returns: a datetime object
-
classmethod
extract_value
(message)[source]¶ Extracts the value of the message, this value is that is given to make the prediction
Parameters: message ( InputMessage
) – parsed messageReturn type: float
Returns: a float value
-
classmethod
parse_message
(message)[source]¶ Parse or transform an input message and returns it
Parameters: message ( InputMessage
) – message serialized in a stringReturn type: InputMessage
Returns: parsed message
-
classmethod
validate_message
(message)[source]¶ Validates a message
Parameters: message ( InputMessage
) – validates if a message is valid or notReturn type: bool
Returns: True if is valid, False if is not
-
classmethod
-
class
anomalydetection.backend.entities.output_message.
AnomalyResult
(value_lower_limit, value_upper_limit, anomaly_probability, is_anomaly)[source]¶ Bases:
object
AnomalyResult description
Variables: - value_lower_limit – lower bound limit
- value_upper_limit – upper bound limit
- anomaly_probability – probability of being anomalous
- is_anomaly – if its anomalous or not
AnomalyResults constructor
Parameters:
-
class
anomalydetection.backend.entities.output_message.
OutputMessage
(application, anomaly_results, agg_window_millis=0, agg_function=<AggregationFunction.NONE: 'none'>, agg_value=0, ts=datetime.datetime(2018, 7, 11, 14, 27, 48, 354220))[source]¶ Bases:
object
OutputMessage class description
Variables: - application – application name
- anomaly_results – anomaly results
- agg_window_millis – aggregation window in milliseconds
- agg_function – aggregation function
- agg_value – the value after aggregation
- ts – timestamp
OutputMessage class constructor
Parameters: - application (
str
) – application name - anomaly_results (
AnomalyResult
) – anomaly results - agg_window_millis (
int
) – aggregation window in milliseconds - agg_function (
AggregationFunction
) – aggregation function - agg_value (
float
) – the value after aggregation - ts (<module 'datetime' from '/usr/lib/python3.5/datetime.py'>) – timestamp
-
class
anomalydetection.backend.entities.output_message.
OutputMessageHandler
[source]¶ Bases:
anomalydetection.backend.entities.BaseMessageHandler
-
classmethod
extract_extra
(message)[source]¶ Extract extra data from the parsed message
Parameters: message ( InputMessage
) – parsed messageReturn type: dict
Returns: a dict of extra values
-
classmethod
extract_key
(message)[source]¶ Extracts the key of the message, this value is used to group messages
Parameters: message ( InputMessage
) – parsed messageReturn type: str
Returns: word
-
classmethod
extract_ts
(message)¶ Extract the datetime of the message
Parameters: message (~T) – parsed message Return type: datetime
Returns: a datetime object
-
classmethod
extract_value
(message)[source]¶ Extracts the value of the message, this value is that is given to make the prediction
Parameters: message ( InputMessage
) – parsed messageReturn type: float
Returns: a float value
-
classmethod
parse_message
(message)[source]¶ Parse or transform an input message and returns it
Parameters: message ( OutputMessage
) – message serialized in a stringReturn type: InputMessage
Returns: parsed message
-
classmethod
validate_message
(message)[source]¶ Validates a message
Parameters: message ( InputMessage
) – validates if a message is valid or notReturn type: bool
Returns: True if is valid, False if is not
-
classmethod
There is a default JSON format message handler implementation ready to use
backend.engine¶
-
class
anomalydetection.backend.engine.builder.
BaseEngineBuilder
[source]¶ Bases:
object
BaseBuilder, implement this to create Engine Builders.
-
build
()[source]¶ Build the engine
Return type: BaseEngine
Returns: A BaseEngine implementation instance.
-
-
class
anomalydetection.backend.engine.builder.
CADDetectorBuilder
(min_value=-9223372036854775807, max_value=9223372036854775807, threshold=0.95, rest_period=30, max_left_semi_contexts_length=8, max_active_neurons_num=16, num_norm_value_bits=3)[source]¶ Bases:
anomalydetection.backend.engine.builder.BaseEngineBuilder
-
build
()[source]¶ Build the engine
Return type: CADDetector
Returns: A BaseEngine implementation instance.
-
set
(name, value)¶
-
type
= 'cad'¶
-
-
class
anomalydetection.backend.engine.builder.
EMADetectorBuilder
(window=100, threshold=0.9999)[source]¶ Bases:
anomalydetection.backend.engine.builder.BaseEngineBuilder
-
build
()[source]¶ Build the engine
Return type: EMADetector
Returns: A BaseEngine implementation instance.
-
set
(name, value)¶
-
type
= 'ema'¶
-
-
class
anomalydetection.backend.engine.builder.
EngineBuilderFactory
[source]¶ Bases:
object
-
engines
= {'cad': {'key': 'cad', 'name': 'CADDetector'}, 'ema': {'key': 'ema', 'name': 'EMADetector'}, 'robust': {'key': 'robust', 'name': 'RobustDetector'}}¶
-
static
get
(name)[source]¶ Return type: BaseEngineBuilder
-
static
get_cad
()[source]¶ Return type: CADDetectorBuilder
-
static
get_ema
()[source]¶ Return type: EMADetectorBuilder
-
static
get_plugin
(name)[source]¶ Return type: BaseEngineBuilder
-
static
get_robust
()[source]¶ Return type: RobustDetectorBuilder
-
-
class
anomalydetection.backend.engine.builder.
RobustDetectorBuilder
(window=100, threshold=0.9999)[source]¶ Bases:
anomalydetection.backend.engine.builder.BaseEngineBuilder
-
build
()[source]¶ Build the engine
Return type: RobustDetector
Returns: A BaseEngine implementation instance.
-
set
(name, value)¶
-
type
= 'robust'¶
-
-
class
anomalydetection.backend.engine.
BaseEngine
[source]¶ Bases:
object
Base class for any Engine implementation.
-
class
anomalydetection.backend.engine.cad_engine.
CADDetector
(min_value=-9223372036854775807, max_value=9223372036854775807, threshold=0.95, rest_period=30, max_left_semi_contexts_length=8, max_active_neurons_num=16, num_norm_value_bits=3)[source]¶ Bases:
anomalydetection.backend.engine.BaseEngine
Contextual Anomaly Detector - Open Source Edition https://github.com/smirmik/CAD
-
class
anomalydetection.backend.engine.cad_engine.
ContextOperator
(max_left_semi_cntx_len)[source]¶ Bases:
object
-
class
anomalydetection.backend.engine.ema_engine.
EMADetector
(window=100, threshold=2.0)[source]¶ Bases:
anomalydetection.backend.engine.BaseEngine
EMADetector constructor :param window: window of samples to work with :param threshold: threshold for confidence
-
class
anomalydetection.backend.engine.robust_z_engine.
RobustDetector
(window=100, threshold=0.9999)[source]¶ Bases:
anomalydetection.backend.engine.BaseEngine
,anomalydetection.common.logging.LoggingMixin
Anomaly detection engine based in robust statistics, median and median absolute deviation.
Parameters: - window – window of samples to work with
- threshold – threshold for confidence
-
logger
¶ Logger object.
Returns: a configured logger object
backend.interactor¶
-
class
anomalydetection.backend.interactor.
BaseEngineInteractor
(engine_builder, message_handler)[source]¶ Bases:
object
BaseEngineInteractor is responsible to hold the engine builder and the message handler. It’s also responsible for build engines for each application
BaseEngineInteractor constructor
Parameters: - engine_builder (
BaseEngineBuilder
) – engine builder - message_handler (
BaseMessageHandler
[~T]) – message handler
- engine_builder (
-
class
anomalydetection.backend.interactor.batch_engine.
BatchEngineInteractor
(batch, engine_builder, message_handler)[source]¶ Bases:
anomalydetection.backend.interactor.BaseEngineInteractor
,anomalydetection.common.logging.LoggingMixin
BatchEngineInteractor is an implementation for batch process an Observable
BatchEngineInteractor constructor
Parameters: - batch (
BaseObservable
) – an observable - engine_builder (
BaseEngineBuilder
) – an engine builder - message_handler (
BaseMessageHandler
[~T]) – a message handler
-
get_engine
(application)¶ Return the engine for the application in a thread safe way
Parameters: application ( str
) – application nameReturns: its engine
-
logger
¶ Logger object.
Returns: a configured logger object
-
map_with_engine
(input_message)[source]¶ Return type: OutputMessage
- batch (
backend.repository¶
-
class
anomalydetection.backend.repository.
BaseObservableRepository
(repository)[source]¶ Bases:
anomalydetection.backend.stream.BaseObservable
Use a repository as an Observable
BaseObservableRepository constructor
Parameters: repository ( BaseRepository
) – a repository
-
class
anomalydetection.backend.repository.
BaseRepository
(conn)[source]¶ Bases:
object
-
fetch
(application, from_ts, to_ts)[source]¶ Fetch data from repository, should return ordered data
Parameters: - application – application name
- from_ts – from timestamp
- to_ts – to timestamp
Return type: Iterable
[+T_co]Returns: an iterable
-
get_applications
()[source]¶ Return a list of distinct applications contained in the repository.
Return type: List
[str
]Returns: a list of application names
-
insert
(message)[source]¶ Insert an OutputMessage into the repository
Parameters: message ( OutputMessage
) – an output messageReturn type: None
-
map
(item)[source]¶ Map function to map elements returned by fetch method to OutputMessage
Parameters: item ( Any
) – an element in fetch iterableReturn type: OutputMessage
Returns: an OutputMessage
-
backend.repository.builder¶
-
class
anomalydetection.backend.repository.builder.
BaseRepositoryBuilder
[source]¶ Bases:
object
BaseBuilder, implement this to create Repository Builders.
-
build
()[source]¶ Build a repository
Return type: BaseRepository
Returns: A BaseRepository implementation instance.
-
-
class
anomalydetection.backend.repository.builder.
RepositoryBuilderFactory
[source]¶ Bases:
object
-
static
get
(name)[source]¶ Return type: BaseRepositoryBuilder
-
static
get_plugin
(name)[source]¶ Return type: BaseRepositoryBuilder
-
static
get_sqlite
()[source]¶ Return type: SQLiteBuilder
-
static
-
class
anomalydetection.backend.repository.builder.
SQLiteBuilder
(database=None)[source]¶ Bases:
anomalydetection.backend.repository.builder.BaseRepositoryBuilder
-
build
()[source]¶ Build a repository
Return type: BaseRepository
Returns: A BaseRepository implementation instance.
-
set
(name, value)¶
-
backend.repository.observable¶
-
class
anomalydetection.backend.repository.observable.
ObservableRepository
(repository, application=None, from_ts=None, to_ts=None)[source]¶ Bases:
anomalydetection.backend.repository.BaseObservableRepository
Creates ObservableRepository that is capable to act as an observable
Parameters: - repository (
BaseRepository
) – the repository - application – application name
- from_ts – from timestamp
- to_ts – to timestamp
-
get_max
()¶
-
get_min
()¶
-
get_observable
()¶ Return type: Observable
- repository (
backend.repository.sqlite¶
-
class
anomalydetection.backend.repository.sqlite.
SQLiteRepository
(database)[source]¶ Bases:
anomalydetection.backend.repository.BaseRepository
SQLiteRepository constructor
Parameters: database ( str
) – database path-
fetch
(application, from_ts, to_ts)[source]¶ Fetch data from repository, should return ordered data
Parameters: - application – application name
- from_ts – from timestamp
- to_ts – to timestamp
Return type: Returns: an iterable
-
get_applications
()[source]¶ Return a list of distinct applications contained in the repository.
Returns: a list of application names
-
insert
(message)[source]¶ Insert an OutputMessage into the repository
Parameters: message ( OutputMessage
) – an output message
-
map
(item)[source]¶ Map function to map elements returned by fetch method to OutputMessage
Parameters: item ( Row
) – an element in fetch iterableReturn type: OutputMessage
Returns: an OutputMessage
-
backend.sink¶
-
class
anomalydetection.backend.sink.
BaseSink
[source]¶ Bases:
rx.core.py3.observer.Observer
BaseSink, implement this to create Sinks
-
as_observer
()¶ Hides the identity of an observer.
Returns an observer that hides the identity of the specified observer.
-
checked
()¶ Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods. If a violation is detected, an Error is thrown from the offending observer method call.
Returns an observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.
-
classmethod
from_notifier
(handler)¶ Creates an observer from a notification callback.
Keyword arguments: :param handler: Action that handles a notification.
Returns: The observer object that invokes the specified handler using a notification corresponding to each message it receives. :rtype: Observer
-
on_completed
()¶
-
on_error
(error)¶
-
on_next
(value)¶
-
to_notifier
()¶ Creates a notification callback from an observer.
Returns the action that forwards its input notification to the underlying observer.
-
backend.sink.repository¶
-
class
anomalydetection.backend.sink.repository.
RepositorySink
(repository)[source]¶ Bases:
anomalydetection.backend.sink.BaseSink
,anomalydetection.common.logging.LoggingMixin
Creates a RepositorySink that is capable to sink OutputMessages into the given repository
Parameters: repository ( BaseRepository
) – a repository-
as_observer
()¶ Hides the identity of an observer.
Returns an observer that hides the identity of the specified observer.
-
checked
()¶ Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods. If a violation is detected, an Error is thrown from the offending observer method call.
Returns an observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.
-
classmethod
from_notifier
(handler)¶ Creates an observer from a notification callback.
Keyword arguments: :param handler: Action that handles a notification.
Returns: The observer object that invokes the specified handler using a notification corresponding to each message it receives. :rtype: Observer
-
logger
¶ Logger object.
Returns: a configured logger object
-
to_notifier
()¶ Creates a notification callback from an observer.
Returns the action that forwards its input notification to the underlying observer.
-
backend.sink.stream¶
-
class
anomalydetection.backend.sink.stream.
StreamSink
(stream_producer)[source]¶ Bases:
anomalydetection.backend.sink.BaseSink
,anomalydetection.common.logging.LoggingMixin
Creates a StreamSink that is capable to sink OutputMessages into the stream producer
Parameters: stream_producer ( BaseStreamProducer
) – an stream producer-
as_observer
()¶ Hides the identity of an observer.
Returns an observer that hides the identity of the specified observer.
-
checked
()¶ Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods. If a violation is detected, an Error is thrown from the offending observer method call.
Returns an observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.
-
classmethod
from_notifier
(handler)¶ Creates an observer from a notification callback.
Keyword arguments: :param handler: Action that handles a notification.
Returns: The observer object that invokes the specified handler using a notification corresponding to each message it receives. :rtype: Observer
-
logger
¶ Logger object.
Returns: a configured logger object
-
to_notifier
()¶ Creates a notification callback from an observer.
Returns the action that forwards its input notification to the underlying observer.
-
backend.sink.websocket¶
-
class
anomalydetection.backend.sink.websocket.
WebSocketSink
(name, url)[source]¶ Bases:
anomalydetection.backend.sink.BaseSink
,anomalydetection.common.logging.LoggingMixin
Implementation to Sink OutputMessage stream to a WebSocket
Parameters: -
as_observer
()¶ Hides the identity of an observer.
Returns an observer that hides the identity of the specified observer.
-
checked
()¶ Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods. If a violation is detected, an Error is thrown from the offending observer method call.
Returns an observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.
-
classmethod
from_notifier
(handler)¶ Creates an observer from a notification callback.
Keyword arguments: :param handler: Action that handles a notification.
Returns: The observer object that invokes the specified handler using a notification corresponding to each message it receives. :rtype: Observer
-
logger
¶ Logger object.
Returns: a configured logger object
-
to_notifier
()¶ Creates a notification callback from an observer.
Returns the action that forwards its input notification to the underlying observer.
-
backend.stream¶
-
class
anomalydetection.backend.stream.
BaseStreamAggregation
(agg_function=<AggregationFunction.NONE: 'none'>, agg_window_millis=0)[source]¶ Bases:
object
BaseStreamAggregation class
Parameters: - agg_function (
AggregationFunction
) – aggregation function - agg_window_millis (
int
) – aggregation window in milliseconds
- agg_function (
-
class
anomalydetection.backend.stream.
BaseStreamConsumer
[source]¶
-
class
anomalydetection.backend.stream.
FileObservable
(file)[source]¶ Bases:
anomalydetection.backend.stream.BaseObservable
FileObservable to transform a file lines to an Observable
Parameters: file ( str
) – a path to local file
backend.stream.agg¶
backend.stream.builder¶
backend.stream.kafka¶
-
class
anomalydetection.backend.stream.kafka.
KafkaStreamConsumer
(broker_servers, input_topic, group_id)[source]¶ Bases:
anomalydetection.backend.stream.BaseStreamConsumer
,anomalydetection.common.logging.LoggingMixin
KafkaStreamConsumer constructor
Parameters: -
get_observable
()¶ Return type: Observable
-
logger
¶ Logger object.
Returns: a configured logger object
-
-
class
anomalydetection.backend.stream.kafka.
KafkaStreamProducer
(broker_servers, output_topic)[source]¶ Bases:
anomalydetection.backend.stream.BaseStreamProducer
,anomalydetection.common.logging.LoggingMixin
KafkaStreamProducer constructor
Parameters: -
logger
¶ Logger object.
Returns: a configured logger object
-
-
class
anomalydetection.backend.stream.kafka.
SparkKafkaStreamConsumer
(broker_servers, input_topic, group_id, agg_function, agg_window_millis, spark_opts={}, multiprocessing=True)[source]¶ Bases:
anomalydetection.backend.stream.BaseStreamConsumer
,anomalydetection.backend.stream.BaseStreamAggregation
,anomalydetection.common.logging.LoggingMixin
SparkKafkaStreamConsumer constructor
Parameters: - broker_servers (
str
) – broker servers - input_topic (
str
) – input topic - group_id (
str
) – consumer group id - agg_function (
AggregationFunction
) – aggregation function to apply - agg_window_millis (
int
) – aggregation window in milliseconds - spark_opts (
dict
) – spark options dict - multiprocessing – use multiprocessing instead of threading
-
get_observable
()¶ Return type: Observable
-
logger
¶ Logger object.
Returns: a configured logger object
- broker_servers (