Source code for anomalydetection.backend.repository.sqlite

# -*- coding:utf-8 -*- #
#
# Anomaly Detection Framework
# Copyright (C) 2018 Bluekiri BigData Team <bigdata@bluekiri.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import datetime
import sqlite3
from sqlite3 import Row
from typing import Iterable

from anomalydetection.backend.entities.output_message import AnomalyResult
from anomalydetection.backend.entities.output_message import OutputMessage

from anomalydetection.backend.repository import BaseRepository
from anomalydetection.backend.stream import AggregationFunction


[docs]class SQLiteRepository(BaseRepository): def __init__(self, database: str) -> None: """ SQLiteRepository constructor :param database: database path """ super().__init__(database) self.conn_string = database
[docs] def initialize(self): # Check if table exists self.conn = sqlite3.connect(self.conn_string) cur = self.conn.cursor() check = cur.execute(""" SELECT COUNT(*) FROM sqlite_master WHERE 1 AND type='table' AND name='predictions'""").fetchone() if not check or not check[0]: cur.execute(""" CREATE TABLE predictions ( application text, ts text, agg_function text, agg_value real, agg_window_millis integer, ar_value_upper_limit real, ar_anomaly_probability real, ar_value_lower_limit real, ar_is_anomaly real)""") self.conn.commit() self.conn.close()
[docs] def map(self, item: Row) -> OutputMessage: anom_results = AnomalyResult( value_lower_limit=item[7], value_upper_limit=item[5], anomaly_probability=item[6], is_anomaly=item[8] ) fmt = "%Y-%m-%d %H:%M:%S" return OutputMessage(item[0], anom_results, agg_window_millis=item[4], agg_function=AggregationFunction(item[2]), agg_value=item[3], ts=datetime.datetime.strptime(item[1][0:19], fmt))
[docs] def get_applications(self): stmt = """ SELECT DISTINCT(application) FROM predictions """ self.conn = sqlite3.connect(self.conn_string) cur = self.conn.cursor() cursor = cur.execute(stmt) elements = cursor.fetchall() self.conn.close() elements = [x[0] for x in elements] return sorted(elements)
[docs] def fetch(self, application, from_ts, to_ts) -> Iterable[Row]: stmt = """ SELECT application, ts, agg_function, agg_value, agg_window_millis, ar_value_upper_limit, ar_anomaly_probability, ar_value_lower_limit, ar_is_anomaly FROM predictions WHERE 1 AND ts BETWEEN ? AND ? """ params = (from_ts, to_ts) if application: stmt = stmt + """AND application = ?""" params = (from_ts, to_ts, application) stmt = stmt + """ ORDER BY ts ASC """ self.conn = sqlite3.connect(self.conn_string) cur = self.conn.cursor() cursor = cur.execute(stmt, params) elements = cursor.fetchall() self.conn.close() return elements
[docs] def insert(self, message: OutputMessage): anomaly_results = message.anomaly_results anomaly_value = [anomaly_results.value_upper_limit, anomaly_results.anomaly_probability, anomaly_results.value_lower_limit, int(anomaly_results.is_anomaly)] root_value = ["'%s'" % message.application, "'%s'" % message.ts, "'%s'" % message.agg_function.value, message.agg_value, int(message.agg_window_millis)] values = ", ".join(map(str, root_value + anomaly_value)) self.conn = sqlite3.connect(self.conn_string) cur = self.conn.cursor() cur.execute("""INSERT INTO predictions VALUES (%s)""" % values) self.conn.commit() self.conn.close()