anomalydetection.backend

This module contains the core of the anomaly detection framework implementation.

backend.core

backend.core.plugins

backend.core.config

backend.entities

class anomalydetection.backend.entities.BaseMessageHandler[source]

Bases: typing.Generic

Base message handler

classmethod extract_extra(message)[source]

Extract extra data from the parsed message

Parameters:message (~T) – parsed message
Return type:dict
Returns:a dict of extra values
classmethod extract_key(message)[source]

Extracts the key of the message, this value is used to group messages

Parameters:message (~T) – parsed message
Return type:str
Returns:word
classmethod extract_ts(message)[source]

Extract the datetime of the message

Parameters:message (~T) – parsed message
Return type:datetime
Returns:a datetime object
classmethod extract_value(message)[source]

Extracts the value of the message, this value is that is given to make the prediction

Parameters:message (~T) – parsed message
Return type:float
Returns:a float value
classmethod parse_message(message)[source]

Parse or transform an input message and returns it

Parameters:message (Any) – message serialized in a string
Return type:~T
Returns:parsed message
classmethod validate_message(message)[source]

Validates a message

Parameters:message (~T) – validates if a message is valid or not
Return type:bool
Returns:True if is valid, False if is not
class anomalydetection.backend.entities.input_message.InputMessage(application, value, ts)[source]

Bases: object

This is the parser of a json message

Parameters:
  • application (str) – application
  • value (float) – value
  • ts (Any) – datetime or current time stamp string in ISO 8601
to_dict()[source]
to_json()[source]
class anomalydetection.backend.entities.input_message.InputMessageHandler[source]

Bases: anomalydetection.backend.entities.BaseMessageHandler

classmethod extract_extra(message)

Extract extra data from the parsed message

Parameters:message (~T) – parsed message
Return type:dict
Returns:a dict of extra values
classmethod extract_key(message)[source]

Extracts the key of the message, this value is used to group messages

Parameters:message (InputMessage) – parsed message
Return type:str
Returns:word
classmethod extract_ts(message)

Extract the datetime of the message

Parameters:message (~T) – parsed message
Return type:datetime
Returns:a datetime object
classmethod extract_value(message)[source]

Extracts the value of the message, this value is that is given to make the prediction

Parameters:message (InputMessage) – parsed message
Return type:float
Returns:a float value
classmethod parse_message(message)[source]

Parse or transform an input message and returns it

Parameters:message (InputMessage) – message serialized in a string
Return type:InputMessage
Returns:parsed message
classmethod validate_message(message)[source]

Validates a message

Parameters:message (InputMessage) – validates if a message is valid or not
Return type:bool
Returns:True if is valid, False if is not
class anomalydetection.backend.entities.output_message.AnomalyResult(value_lower_limit, value_upper_limit, anomaly_probability, is_anomaly)[source]

Bases: object

AnomalyResult description

Variables:
  • value_lower_limit – lower bound limit
  • value_upper_limit – upper bound limit
  • anomaly_probability – probability of being anomalous
  • is_anomaly – if its anomalous or not

AnomalyResults constructor

Parameters:
  • value_lower_limit (float) – lower bound limit
  • value_upper_limit (float) – upper bound limit
  • anomaly_probability (float) – probability of being anomalous
  • is_anomaly (bool) – if its anomalous or not
to_dict()[source]
class anomalydetection.backend.entities.output_message.OutputMessage(application, anomaly_results, agg_window_millis=0, agg_function=<AggregationFunction.NONE: 'none'>, agg_value=0, ts=datetime.datetime(2018, 7, 11, 14, 27, 48, 354220))[source]

Bases: object

OutputMessage class description

Variables:
  • application – application name
  • anomaly_results – anomaly results
  • agg_window_millis – aggregation window in milliseconds
  • agg_function – aggregation function
  • agg_value – the value after aggregation
  • ts – timestamp

OutputMessage class constructor

Parameters:
  • application (str) – application name
  • anomaly_results (AnomalyResult) – anomaly results
  • agg_window_millis (int) – aggregation window in milliseconds
  • agg_function (AggregationFunction) – aggregation function
  • agg_value (float) – the value after aggregation
  • ts (<module 'datetime' from '/usr/lib/python3.5/datetime.py'>) – timestamp
to_dict(ts2str=False)[source]
to_input()[source]
to_plain_dict()[source]
class anomalydetection.backend.entities.output_message.OutputMessageHandler[source]

Bases: anomalydetection.backend.entities.BaseMessageHandler

classmethod extract_extra(message)[source]

Extract extra data from the parsed message

Parameters:message (InputMessage) – parsed message
Return type:dict
Returns:a dict of extra values
classmethod extract_key(message)[source]

Extracts the key of the message, this value is used to group messages

Parameters:message (InputMessage) – parsed message
Return type:str
Returns:word
classmethod extract_ts(message)

Extract the datetime of the message

Parameters:message (~T) – parsed message
Return type:datetime
Returns:a datetime object
classmethod extract_value(message)[source]

Extracts the value of the message, this value is that is given to make the prediction

Parameters:message (InputMessage) – parsed message
Return type:float
Returns:a float value
classmethod parse_message(message)[source]

Parse or transform an input message and returns it

Parameters:message (OutputMessage) – message serialized in a string
Return type:InputMessage
Returns:parsed message
classmethod validate_message(message)[source]

Validates a message

Parameters:message (InputMessage) – validates if a message is valid or not
Return type:bool
Returns:True if is valid, False if is not

There is a default JSON format message handler implementation ready to use

backend.engine

class anomalydetection.backend.engine.builder.BaseEngineBuilder[source]

Bases: object

BaseBuilder, implement this to create Engine Builders.

build()[source]

Build the engine

Return type:BaseEngine
Returns:A BaseEngine implementation instance.
set(name, value)[source]
class anomalydetection.backend.engine.builder.CADDetectorBuilder(min_value=-9223372036854775807, max_value=9223372036854775807, threshold=0.95, rest_period=30, max_left_semi_contexts_length=8, max_active_neurons_num=16, num_norm_value_bits=3)[source]

Bases: anomalydetection.backend.engine.builder.BaseEngineBuilder

build()[source]

Build the engine

Return type:CADDetector
Returns:A BaseEngine implementation instance.
set(name, value)
set_max_active_neurons_num(max_active_neurons_num)[source]
set_max_left_semi_contexts_length(max_left_semi_contexts_length)[source]
set_max_value(value)[source]
set_min_value(value)[source]
set_num_norm_value_bits(num_norm_value_bits)[source]
set_rest_period(rest_period)[source]
set_threshold(threshold)[source]
type = 'cad'
class anomalydetection.backend.engine.builder.EMADetectorBuilder(window=100, threshold=0.9999)[source]

Bases: anomalydetection.backend.engine.builder.BaseEngineBuilder

build()[source]

Build the engine

Return type:EMADetector
Returns:A BaseEngine implementation instance.
set(name, value)
set_threshold(threshold)[source]
set_window(window)[source]
type = 'ema'
class anomalydetection.backend.engine.builder.EngineBuilderFactory[source]

Bases: object

engines = {'cad': {'key': 'cad', 'name': 'CADDetector'}, 'ema': {'key': 'ema', 'name': 'EMADetector'}, 'robust': {'key': 'robust', 'name': 'RobustDetector'}}
static get(name)[source]
Return type:BaseEngineBuilder
static get_cad()[source]
Return type:CADDetectorBuilder
static get_ema()[source]
Return type:EMADetectorBuilder
static get_plugin(name)[source]
Return type:BaseEngineBuilder
static get_robust()[source]
Return type:RobustDetectorBuilder
classmethod register_engine(key, class_name)[source]
class anomalydetection.backend.engine.builder.RobustDetectorBuilder(window=100, threshold=0.9999)[source]

Bases: anomalydetection.backend.engine.builder.BaseEngineBuilder

build()[source]

Build the engine

Return type:RobustDetector
Returns:A BaseEngine implementation instance.
set(name, value)
set_threshold(threshold)[source]
set_window(window)[source]
type = 'robust'
class anomalydetection.backend.engine.BaseEngine[source]

Bases: object

Base class for any Engine implementation.

predict(value, **kwargs)[source]

Predict if the given value is anomalous.

Parameters:
  • value (float) – value to predict
  • kwargs – extra data to make the prediction
Return type:

AnomalyResult

Returns:

anomaly result

class anomalydetection.backend.engine.cad_engine.CADDetector(min_value=-9223372036854775807, max_value=9223372036854775807, threshold=0.95, rest_period=30, max_left_semi_contexts_length=8, max_active_neurons_num=16, num_norm_value_bits=3)[source]

Bases: anomalydetection.backend.engine.BaseEngine

Contextual Anomaly Detector - Open Source Edition https://github.com/smirmik/CAD

get_anomaly_score(input_data)[source]
predict(value, **kwargs)[source]

Predict if the given value is anomalous.

Parameters:
  • value (float) – value to predict
  • kwargs – extra data to make the prediction
Return type:

AnomalyResult

Returns:

anomaly result

step(inp_facts)[source]
class anomalydetection.backend.engine.cad_engine.ContextOperator(max_left_semi_cntx_len)[source]

Bases: object

context_crosser(left_or_right, facts_list, new_context_flag=False, potential_new_contexts=[])[source]
get_context_by_facts(new_contexts_list, zero_level=0)[source]
update_contexts_and_get_active(new_context_flag)[source]
class anomalydetection.backend.engine.ema_engine.EMADetector(window=100, threshold=2.0)[source]

Bases: anomalydetection.backend.engine.BaseEngine

EMADetector constructor :param window: window of samples to work with :param threshold: threshold for confidence

predict(value, **kwargs)[source]

Predict if the given value is anomalous.

Parameters:
  • value (float) – value to predict
  • kwargs – extra data to make the prediction
Return type:

AnomalyResult

Returns:

anomaly result

class anomalydetection.backend.engine.robust_z_engine.RobustDetector(window=100, threshold=0.9999)[source]

Bases: anomalydetection.backend.engine.BaseEngine, anomalydetection.common.logging.LoggingMixin

Anomaly detection engine based in robust statistics, median and median absolute deviation.

Parameters:
  • window – window of samples to work with
  • threshold – threshold for confidence
logger

Logger object.

Returns:a configured logger object
predict(value, **kwargs)[source]

Predict if the given value is anomalous.

Parameters:
  • value (float) – value to predict
  • kwargs – extra data to make the prediction
Return type:

AnomalyResult

Returns:

anomaly result

backend.interactor

class anomalydetection.backend.interactor.BaseEngineInteractor(engine_builder, message_handler)[source]

Bases: object

BaseEngineInteractor is responsible to hold the engine builder and the message handler. It’s also responsible for build engines for each application

BaseEngineInteractor constructor

Parameters:
get_engine(application)[source]

Return the engine for the application in a thread safe way

Parameters:application (str) – application name
Returns:its engine
class anomalydetection.backend.interactor.batch_engine.BatchEngineInteractor(batch, engine_builder, message_handler)[source]

Bases: anomalydetection.backend.interactor.BaseEngineInteractor, anomalydetection.common.logging.LoggingMixin

BatchEngineInteractor is an implementation for batch process an Observable

BatchEngineInteractor constructor

Parameters:
get_engine(application)

Return the engine for the application in a thread safe way

Parameters:application (str) – application name
Returns:its engine
logger

Logger object.

Returns:a configured logger object
map_with_engine(input_message)[source]
Return type:OutputMessage
process()[source]
Return type:Observable

backend.repository

class anomalydetection.backend.repository.BaseObservableRepository(repository)[source]

Bases: anomalydetection.backend.stream.BaseObservable

Use a repository as an Observable

BaseObservableRepository constructor

Parameters:repository (BaseRepository) – a repository
get_max()[source]
get_min()[source]
get_observable()[source]
Return type:Observable
map(x)

Map items in observable.

Parameters:x (Any) – input item
Return type:Any
Returns:output item
class anomalydetection.backend.repository.BaseRepository(conn)[source]

Bases: object

fetch(application, from_ts, to_ts)[source]

Fetch data from repository, should return ordered data

Parameters:
  • application – application name
  • from_ts – from timestamp
  • to_ts – to timestamp
Return type:

Iterable[+T_co]

Returns:

an iterable

get_applications()[source]

Return a list of distinct applications contained in the repository.

Return type:List[str]
Returns:a list of application names
initialize()[source]

Initialize the repository

insert(message)[source]

Insert an OutputMessage into the repository

Parameters:message (OutputMessage) – an output message
Return type:None
map(item)[source]

Map function to map elements returned by fetch method to OutputMessage

Parameters:item (Any) – an element in fetch iterable
Return type:OutputMessage
Returns:an OutputMessage

backend.repository.builder

class anomalydetection.backend.repository.builder.BaseRepositoryBuilder[source]

Bases: object

BaseBuilder, implement this to create Repository Builders.

build()[source]

Build a repository

Return type:BaseRepository
Returns:A BaseRepository implementation instance.
set(name, value)[source]
class anomalydetection.backend.repository.builder.RepositoryBuilderFactory[source]

Bases: object

static get(name)[source]
Return type:BaseRepositoryBuilder
static get_plugin(name)[source]
Return type:BaseRepositoryBuilder
static get_sqlite()[source]
Return type:SQLiteBuilder
class anomalydetection.backend.repository.builder.SQLiteBuilder(database=None)[source]

Bases: anomalydetection.backend.repository.builder.BaseRepositoryBuilder

build()[source]

Build a repository

Return type:BaseRepository
Returns:A BaseRepository implementation instance.
set(name, value)
set_database(database)[source]

backend.repository.observable

class anomalydetection.backend.repository.observable.ObservableRepository(repository, application=None, from_ts=None, to_ts=None)[source]

Bases: anomalydetection.backend.repository.BaseObservableRepository

Creates ObservableRepository that is capable to act as an observable

Parameters:
  • repository (BaseRepository) – the repository
  • application – application name
  • from_ts – from timestamp
  • to_ts – to timestamp
get_max()
get_min()
get_observable()
Return type:Observable
map(x)

Map items in observable.

Parameters:x (Any) – input item
Return type:Any
Returns:output item

backend.repository.sqlite

class anomalydetection.backend.repository.sqlite.SQLiteRepository(database)[source]

Bases: anomalydetection.backend.repository.BaseRepository

SQLiteRepository constructor

Parameters:database (str) – database path
fetch(application, from_ts, to_ts)[source]

Fetch data from repository, should return ordered data

Parameters:
  • application – application name
  • from_ts – from timestamp
  • to_ts – to timestamp
Return type:

Iterable[Row]

Returns:

an iterable

get_applications()[source]

Return a list of distinct applications contained in the repository.

Returns:a list of application names
initialize()[source]

Initialize the repository

insert(message)[source]

Insert an OutputMessage into the repository

Parameters:message (OutputMessage) – an output message
map(item)[source]

Map function to map elements returned by fetch method to OutputMessage

Parameters:item (Row) – an element in fetch iterable
Return type:OutputMessage
Returns:an OutputMessage

backend.sink

class anomalydetection.backend.sink.BaseSink[source]

Bases: rx.core.py3.observer.Observer

BaseSink, implement this to create Sinks

as_observer()

Hides the identity of an observer.

Returns an observer that hides the identity of the specified observer.

checked()

Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods. If a violation is detected, an Error is thrown from the offending observer method call.

Returns an observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.

classmethod from_notifier(handler)

Creates an observer from a notification callback.

Keyword arguments: :param handler: Action that handles a notification.

Returns:The observer object that invokes the specified handler using a

notification corresponding to each message it receives. :rtype: Observer

on_completed()
on_error(error)
on_next(value)
to_notifier()

Creates a notification callback from an observer.

Returns the action that forwards its input notification to the underlying observer.

backend.sink.repository

class anomalydetection.backend.sink.repository.RepositorySink(repository)[source]

Bases: anomalydetection.backend.sink.BaseSink, anomalydetection.common.logging.LoggingMixin

Creates a RepositorySink that is capable to sink OutputMessages into the given repository

Parameters:repository (BaseRepository) – a repository
as_observer()

Hides the identity of an observer.

Returns an observer that hides the identity of the specified observer.

checked()

Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods. If a violation is detected, an Error is thrown from the offending observer method call.

Returns an observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.

classmethod from_notifier(handler)

Creates an observer from a notification callback.

Keyword arguments: :param handler: Action that handles a notification.

Returns:The observer object that invokes the specified handler using a

notification corresponding to each message it receives. :rtype: Observer

logger

Logger object.

Returns:a configured logger object
on_completed()[source]
on_error(error)[source]
on_next(value)[source]
to_notifier()

Creates a notification callback from an observer.

Returns the action that forwards its input notification to the underlying observer.

backend.sink.stream

class anomalydetection.backend.sink.stream.StreamSink(stream_producer)[source]

Bases: anomalydetection.backend.sink.BaseSink, anomalydetection.common.logging.LoggingMixin

Creates a StreamSink that is capable to sink OutputMessages into the stream producer

Parameters:stream_producer (BaseStreamProducer) – an stream producer
as_observer()

Hides the identity of an observer.

Returns an observer that hides the identity of the specified observer.

checked()

Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods. If a violation is detected, an Error is thrown from the offending observer method call.

Returns an observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.

classmethod from_notifier(handler)

Creates an observer from a notification callback.

Keyword arguments: :param handler: Action that handles a notification.

Returns:The observer object that invokes the specified handler using a

notification corresponding to each message it receives. :rtype: Observer

logger

Logger object.

Returns:a configured logger object
on_completed()[source]
on_error(error)[source]
on_next(value)[source]
to_notifier()

Creates a notification callback from an observer.

Returns the action that forwards its input notification to the underlying observer.

backend.sink.websocket

class anomalydetection.backend.sink.websocket.WebSocketSink(name, url)[source]

Bases: anomalydetection.backend.sink.BaseSink, anomalydetection.common.logging.LoggingMixin

Implementation to Sink OutputMessage stream to a WebSocket

Parameters:
  • name (str) – name
  • url (str) – websocket url
as_observer()

Hides the identity of an observer.

Returns an observer that hides the identity of the specified observer.

checked()

Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods. If a violation is detected, an Error is thrown from the offending observer method call.

Returns an observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.

classmethod from_notifier(handler)

Creates an observer from a notification callback.

Keyword arguments: :param handler: Action that handles a notification.

Returns:The observer object that invokes the specified handler using a

notification corresponding to each message it receives. :rtype: Observer

logger

Logger object.

Returns:a configured logger object
on_completed()[source]
on_error(error)[source]
on_next(value)[source]
to_notifier()

Creates a notification callback from an observer.

Returns the action that forwards its input notification to the underlying observer.

backend.stream

class anomalydetection.backend.stream.BaseObservable[source]

Bases: object

get_observable()[source]
Return type:Observable
map(x)[source]

Map items in observable.

Parameters:x (Any) – input item
Return type:Any
Returns:output item
class anomalydetection.backend.stream.BaseStreamAggregation(agg_function=<AggregationFunction.NONE: 'none'>, agg_window_millis=0)[source]

Bases: object

BaseStreamAggregation class

Parameters:
  • agg_function (AggregationFunction) – aggregation function
  • agg_window_millis (int) – aggregation window in milliseconds
class anomalydetection.backend.stream.BaseStreamConsumer[source]

Bases: anomalydetection.backend.stream.BaseObservable

get_observable()[source]
Return type:Observable
map(x)

Map items in observable.

Parameters:x (Any) – input item
Return type:Any
Returns:output item
poll()[source]
Return type:Generator
class anomalydetection.backend.stream.BaseStreamProducer[source]

Bases: object

push(message)[source]
Return type:None
class anomalydetection.backend.stream.FileObservable(file)[source]

Bases: anomalydetection.backend.stream.BaseObservable

FileObservable to transform a file lines to an Observable

Parameters:file (str) – a path to local file
get_observable()[source]
Return type:Observable
map(x)

Map items in observable.

Parameters:x (Any) – input item
Return type:Any
Returns:output item

backend.stream.agg

class anomalydetection.backend.stream.agg.functions.AggregationFunction[source]

Bases: enum.Enum

An enumeration.

AVG = 'avg'
COUNT = 'count'
NONE = 'none'
P50 = 'percentile50'
P75 = 'percentile75'
P95 = 'percentile95'
P99 = 'percentile99'
SUM = 'sum'

backend.stream.builder

backend.stream.kafka

class anomalydetection.backend.stream.kafka.KafkaStreamConsumer(broker_servers, input_topic, group_id)[source]

Bases: anomalydetection.backend.stream.BaseStreamConsumer, anomalydetection.common.logging.LoggingMixin

KafkaStreamConsumer constructor

Parameters:
  • broker_servers (str) – broker servers
  • input_topic (str) – input topic
  • group_id (str) – consumer group id
get_observable()
Return type:Observable
logger

Logger object.

Returns:a configured logger object
map(x)

Map items in observable.

Parameters:x (Any) – input item
Return type:Any
Returns:output item
poll()[source]
Return type:Generator[+T_co, -T_contra, +V_co]
unsubscribe()[source]
class anomalydetection.backend.stream.kafka.KafkaStreamProducer(broker_servers, output_topic)[source]

Bases: anomalydetection.backend.stream.BaseStreamProducer, anomalydetection.common.logging.LoggingMixin

KafkaStreamProducer constructor

Parameters:
  • broker_servers (str) – broker servers
  • output_topic (str) – topic to write to
logger

Logger object.

Returns:a configured logger object
push(message)[source]
Return type:None
class anomalydetection.backend.stream.kafka.SparkKafkaStreamConsumer(broker_servers, input_topic, group_id, agg_function, agg_window_millis, spark_opts={}, multiprocessing=True)[source]

Bases: anomalydetection.backend.stream.BaseStreamConsumer, anomalydetection.backend.stream.BaseStreamAggregation, anomalydetection.common.logging.LoggingMixin

SparkKafkaStreamConsumer constructor

Parameters:
  • broker_servers (str) – broker servers
  • input_topic (str) – input topic
  • group_id (str) – consumer group id
  • agg_function (AggregationFunction) – aggregation function to apply
  • agg_window_millis (int) – aggregation window in milliseconds
  • spark_opts (dict) – spark options dict
  • multiprocessing – use multiprocessing instead of threading
get_observable()
Return type:Observable
logger

Logger object.

Returns:a configured logger object
map(x)

Map items in observable.

Parameters:x (Any) – input item
Return type:Any
Returns:output item
poll()[source]
Return type:Generator[+T_co, -T_contra, +V_co]
unsubscribe()[source]

backend.stream.pubsub