# -*- coding: utf-8 -*-
#
# Originally ported from https://github.com/smirmik/CAD
#
# Contextual Anomaly Detector
# Copyright (C) 2016 Mikhail Smirnov <smirmik@gmail.com>
#
# Anomaly Detection Framework
# Copyright (C) 2018 Bluekiri BigData Team <bigdata@bluekiri.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import operator
import sys
from anomalydetection.backend.engine import BaseEngine
from anomalydetection.backend.entities.output_message import AnomalyResult
[docs]class ContextOperator(object):
def __init__(self, max_left_semi_cntx_len):
self.max_left_semi_cntx_len = max_left_semi_cntx_len
self.facts_dics = [{}, {}]
self.semi_context_dics = [{}, {}]
self.semi_context_values_lists = [[], []]
self.crossed_semi_contexts_lists = [[], []]
self.contexts_values_list = []
self.new_context_id = False
[docs] def get_context_by_facts(self, new_contexts_list, zero_level=0):
num_added_contexts = 0
for left_facts, right_facts in new_contexts_list:
left_hash = hash(left_facts)
right_hash = hash(right_facts)
next_left_semi_context_number = len(self.semi_context_dics[0])
left_semi_context_i_d = self.semi_context_dics[0].setdefault(
left_hash, next_left_semi_context_number)
if left_semi_context_i_d == next_left_semi_context_number:
left_semi_context_values = [[], len(left_facts), 0, {}]
self.semi_context_values_lists[0].append(left_semi_context_values)
for fact in left_facts:
semi_context_list = self.facts_dics[0].setdefault(fact, [])
semi_context_list.append(left_semi_context_values)
next_right_semi_context_number = len(self.semi_context_dics[1])
right_semi_context_id = self.semi_context_dics[1].setdefault(
right_hash, next_right_semi_context_number)
if right_semi_context_id == next_right_semi_context_number:
right_semi_context_values = [[], len(right_facts), 0]
self.semi_context_values_lists[1].append(right_semi_context_values)
for fact in right_facts:
semi_context_list = self.facts_dics[1].setdefault(fact, [])
semi_context_list.append(right_semi_context_values)
next_free_context_id_number = len(self.contexts_values_list)
context_id = \
self.semi_context_values_lists[0][left_semi_context_i_d][3].setdefault(
right_semi_context_id, next_free_context_id_number)
if context_id == next_free_context_id_number:
num_added_contexts += 1
context_values = [0, zero_level, left_hash, right_hash]
self.contexts_values_list.append(context_values)
if zero_level:
self.new_context_id = context_id
return True
else:
context_values = self.contexts_values_list[context_id]
if zero_level:
context_values[1] = 1
return False
return num_added_contexts
[docs] def context_crosser(self,
left_or_right,
facts_list,
new_context_flag=False,
potential_new_contexts=list()):
num_new_contexts = None
if left_or_right == 0:
if len(potential_new_contexts) > 0:
num_new_contexts = self.get_context_by_facts(potential_new_contexts)
else:
num_new_contexts = 0
for semi_context_values in self.crossed_semi_contexts_lists[left_or_right]:
semi_context_values[0] = []
semi_context_values[2] = 0
for fact in facts_list:
for semi_context_values in self.facts_dics[left_or_right].get(fact, []):
semi_context_values[0].append(fact)
new_crossed_values = []
for semi_context_values in self.semi_context_values_lists[left_or_right]:
len_semi_context_values0 = len(semi_context_values[0])
semi_context_values[2] = len_semi_context_values0
if len_semi_context_values0 > 0:
new_crossed_values.append(semi_context_values)
self.crossed_semi_contexts_lists[left_or_right] = new_crossed_values
if left_or_right:
return self.update_contexts_and_get_active(new_context_flag)
else:
return num_new_contexts
[docs] def update_contexts_and_get_active(self, new_context_flag):
active_contexts = []
num_selected_context = 0
potential_new_context_list = []
for left_semi_cntx_vals in self.crossed_semi_contexts_lists[0]:
for right_semi_context_id, context_id in left_semi_cntx_vals[3].items():
if self.new_context_id != context_id:
context_values = self.contexts_values_list[context_id]
right_semi_context_value0, \
right_semi_context_value1, \
right_semi_context_value2 = \
self.semi_context_values_lists[1][right_semi_context_id]
less_than_max_len = \
left_semi_cntx_vals[2] <= self.max_left_semi_cntx_len
if left_semi_cntx_vals[1] == left_semi_cntx_vals[2]:
num_selected_context += 1
if right_semi_context_value2 > 0:
if right_semi_context_value1 == right_semi_context_value2:
context_values[0] += 1
active_contexts.append(
[context_id, context_values[0],
context_values[2],
context_values[3]])
elif context_values[1] \
and new_context_flag \
and less_than_max_len:
potential_new_context_list.append(
tuple(
[
tuple(left_semi_cntx_vals[0]),
tuple(right_semi_context_value0)
]
)
)
elif context_values[
1] and new_context_flag and right_semi_context_value2 > 0 and \
left_semi_cntx_vals[2] <= self.max_left_semi_cntx_len:
potential_new_context_list.append(tuple(
[tuple(left_semi_cntx_vals[0]),
tuple(right_semi_context_value0)]))
self.new_context_id = False
return active_contexts, num_selected_context, potential_new_context_list
[docs]class CADDetector(BaseEngine):
"""
Contextual Anomaly Detector - Open Source Edition
https://github.com/smirmik/CAD
"""
def __init__(self,
min_value=-sys.maxsize,
max_value=sys.maxsize,
threshold=0.95,
rest_period=30,
max_left_semi_contexts_length=8,
max_active_neurons_num=16,
num_norm_value_bits=3):
self.min_value = float(min_value)
self.max_value = float(max_value)
self.rest_period = rest_period
self.threshold = threshold
self.max_active_neurons_num = max_active_neurons_num
self.num_norm_value_bits = num_norm_value_bits
self.max_bin_value = 2 ** self.num_norm_value_bits - 1.0
self.full_value_range = self.max_value - self.min_value
if self.full_value_range == 0.0:
self.full_value_range = self.max_bin_value
self.min_value_step = self.full_value_range / self.max_bin_value
self.left_facts_group = tuple()
self.context_operator = ContextOperator(max_left_semi_contexts_length)
self.potential_new_contexts = []
self.last_predictioned_facts = []
self.result_values_history = [1.0]
[docs] def step(self, inp_facts):
curr_sens_facts = tuple(sorted(set(inp_facts)))
if len(self.left_facts_group) > 0 and len(curr_sens_facts) > 0:
pot_new_zero_level_context = tuple([self.left_facts_group,
curr_sens_facts])
new_context_flag = self.context_operator.get_context_by_facts(
[pot_new_zero_level_context], zero_level=1)
else:
pot_new_zero_level_context = False
new_context_flag = False
active_contexts, num_selected_context, potential_new_context_list = \
self.context_operator.context_crosser(
left_or_right=1,
facts_list=curr_sens_facts,
new_context_flag=new_context_flag)
num_uniq_pot_new_context = \
len(set(potential_new_context_list).union(
[pot_new_zero_level_context])
if pot_new_zero_level_context
else set(potential_new_context_list))
percent_selected_context_active = \
len(active_contexts) / float(num_selected_context) \
if num_selected_context > 0 else 0.0
active_contexts = sorted(active_contexts,
key=operator.itemgetter(1, 2, 3))
active_neurons = \
[activeContextInfo[0]
for activeContextInfo in active_contexts[-self.max_active_neurons_num:]]
curr_neur_facts = set([2 ** 31 + fact for fact in active_neurons])
self.left_facts_group = set()
self.left_facts_group.update(curr_sens_facts, curr_neur_facts)
self.left_facts_group = tuple(sorted(self.left_facts_group))
num_new_contexts = self.context_operator.context_crosser(
left_or_right=0,
facts_list=self.left_facts_group,
potential_new_contexts=potential_new_context_list)
num_new_contexts += 1 if new_context_flag else 0
percent_added_context_to_uniq_pot_new = \
num_new_contexts / float(num_uniq_pot_new_context) \
if new_context_flag and num_uniq_pot_new_context > 0 else 0.0
return percent_selected_context_active, percent_added_context_to_uniq_pot_new
[docs] def get_anomaly_score(self, input_data):
norm_input_value = \
int((input_data["value"] - self.min_value) / self.min_value_step)
bin_input_norm_value = \
bin(norm_input_value).lstrip("0b").rjust(self.num_norm_value_bits, "0")
out_sens = set(
[s_num * 2 + (1 if curr_symb == "1" else 0)
for s_num, curr_symb in enumerate(reversed(bin_input_norm_value))])
anomaly_value0, anomaly_value1 = self.step(out_sens)
current_anomaly_score = (1.0 - anomaly_value0 + anomaly_value1) / 2.0
less_than_threshold = \
max(self.result_values_history[-int(self.rest_period):]) < self.threshold
returned_anomaly_score = current_anomaly_score \
if less_than_threshold \
else 0.0
self.result_values_history.append(current_anomaly_score)
return returned_anomaly_score
[docs] def predict(self, value: float, **kwargs) -> AnomalyResult:
anomaly_score = self.get_anomaly_score(
{"timestamp": kwargs["ts"], "value": value})
return AnomalyResult(0.0,
0.0,
anomaly_score,
(anomaly_score >= self.threshold))