From c205247001f8be85f1f243942a31a1ccfad41b91 Mon Sep 17 00:00:00 2001 From: Johannes Date: Tue, 1 May 2018 18:48:16 +0200 Subject: [PATCH] added time series database KairosDB --- modules/config/__init__.py | 6 +- modules/core/basetypes.py | 5 ++ modules/logs/__init__.py | 21 +++--- modules/time_series_recorder/__init__.py | 10 +++ modules/time_series_recorder/base_recorder.py | 17 +++++ modules/time_series_recorder/file_recorder.py | 12 +++ .../time_series_recorder/kairosdb_recorder.py | 73 +++++++++++++++++++ .../time_series_recorder/recorder_service.py | 65 +++++++++++++++++ modules/time_series_recorder/recorder_view.py | 40 ++++++++++ requirements.txt | 1 + run.py | 1 + update/5_kairosdb_config.sql | 2 + 12 files changed, 241 insertions(+), 12 deletions(-) create mode 100644 modules/time_series_recorder/__init__.py create mode 100644 modules/time_series_recorder/base_recorder.py create mode 100644 modules/time_series_recorder/file_recorder.py create mode 100644 modules/time_series_recorder/kairosdb_recorder.py create mode 100644 modules/time_series_recorder/recorder_service.py create mode 100644 modules/time_series_recorder/recorder_view.py diff --git a/modules/config/__init__.py b/modules/config/__init__.py index 253e188..0df5991 100755 --- a/modules/config/__init__.py +++ b/modules/config/__init__.py @@ -76,10 +76,9 @@ class ConfigView(BaseView): @classmethod def init_cache(cls): - with cls.api._app.app_context(): cls.api.cache[cls.cache_key] = {} - for key, value in cls.model.get_all().iteritems(): + for key, value in cls.model.get_all().iteritems(): cls.post_init_callback(value) cls.api.cache[cls.cache_key][value.name] = value @@ -87,3 +86,6 @@ class ConfigView(BaseView): def init(cbpi): ConfigView.register(cbpi._app, route_base='/api/config') ConfigView.init_cache() + + + diff --git a/modules/core/basetypes.py b/modules/core/basetypes.py index 44e768c..7b42edf 100755 --- a/modules/core/basetypes.py +++ b/modules/core/basetypes.py @@ -50,6 +50,11 @@ class Sensor(Base): unit = "" + @staticmethod + def chart(sensor): + result = [{"name": sensor.name, "data_type": "sensor", "data_id": sensor.id}] + return result + @classmethod def init_global(cls): pass diff --git a/modules/logs/__init__.py b/modules/logs/__init__.py index 0193261..10191b1 100755 --- a/modules/logs/__init__.py +++ b/modules/logs/__init__.py @@ -38,17 +38,18 @@ class LogView(FlaskView): description: List of all log files """ filename = "./logs/action.log" - if os.path.isfile(filename) == False: - return - import csv array = [] - with open(filename, 'rb') as f: - reader = csv.reader(f) - for row in reader: - try: - array.append([int((datetime.datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S") - datetime.datetime(1970, 1, 1)).total_seconds()) * 1000, row[1]]) - except: - pass + if os.path.isfile(filename): + import csv + array = [] + with open(filename, 'rb') as f: + reader = csv.reader(f) + for row in reader: + try: + array.append([int((datetime.datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S") - datetime.datetime(1970, 1, 1)).total_seconds()) * 1000, row[1]]) + except: + pass + return json.dumps(array) @route('/', methods=["DELETE"]) diff --git a/modules/time_series_recorder/__init__.py b/modules/time_series_recorder/__init__.py new file mode 100644 index 0000000..472af9a --- /dev/null +++ b/modules/time_series_recorder/__init__.py @@ -0,0 +1,10 @@ + + +from modules.core.core import cbpi +from modules.time_series_recorder.recorder_service import RecorderService +from modules.time_series_recorder.recorder_view import RecorderView + + +@cbpi.addon.core.initializer(order=1000) +def init(cbpi): + RecorderView.register(cbpi._app, route_base='/api/recorder') diff --git a/modules/time_series_recorder/base_recorder.py b/modules/time_series_recorder/base_recorder.py new file mode 100644 index 0000000..3631926 --- /dev/null +++ b/modules/time_series_recorder/base_recorder.py @@ -0,0 +1,17 @@ +import abc + + +class BaseRecorder: + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def load_time_series(self, ts_type, ts_id): + raise NotImplementedError("Method not implemented") + + @abc.abstractmethod + def check_status(self): + raise NotImplementedError("Method not implemented") + + @abc.abstractmethod + def store_datapoint(self, name, value, tags): + raise NotImplementedError("Method not implemented") diff --git a/modules/time_series_recorder/file_recorder.py b/modules/time_series_recorder/file_recorder.py new file mode 100644 index 0000000..0e053e8 --- /dev/null +++ b/modules/time_series_recorder/file_recorder.py @@ -0,0 +1,12 @@ +from modules.time_series_recorder.base_recorder import BaseRecorder + + +class FileRecorder(BaseRecorder): + def check_status(self): + pass + + def load_time_series(self, ts_type, ts_id): + pass + + def store_datapoint(self, name, value, tags): + pass diff --git a/modules/time_series_recorder/kairosdb_recorder.py b/modules/time_series_recorder/kairosdb_recorder.py new file mode 100644 index 0000000..90fc830 --- /dev/null +++ b/modules/time_series_recorder/kairosdb_recorder.py @@ -0,0 +1,73 @@ +import logging +import time + +import requests +from flask import json + +from modules.time_series_recorder.base_recorder import BaseRecorder + + +class KairosdbRecorder(BaseRecorder): + kairosdb_ip = "http://192.168.178.20:" + logger = logging.getLogger(__name__) + + kairosdb_api_status = "/api/v1/health/status" + kairosdb_api_query = "/api/v1/datapoints/query" + + def __init__(self, port): + self.kairosdb_url = self.kairosdb_ip + port + + def load_time_series(self, ts_type, ts_id): + time_series_name = self.compose_time_series_name(ts_type, ts_id) + + data = dict(metrics=[ + { + "tags": {}, + "name": time_series_name, + "aggregators": [ + { + "name": "avg", + "align_sampling": True, + "sampling": { + "value": "1", + "unit": "minutes" + }, + "align_start_time": True + } + ] + } + ], + cache_time=0, + start_relative={ + "value": "30", + "unit": "days" + }) + + response = requests.post(self.kairosdb_url + self.kairosdb_api_query, json.dumps(data)) + if response.ok: + self.logger.debug("Fetching time series for [{0}] took [{1}]".format(time_series_name, response.elapsed)) + self.logger.debug("Time series for [{0}] is [{1}]".format(time_series_name, response.json())) + + return response.json()["queries"][0]["results"][0]["values"] + else: + self.logger.warning("Failed to fetch time series for [{0}]. Response [{1}]", time_series_name, response) + + def check_status(self): + response = requests.get(self.kairosdb_url + self.kairosdb_api_status) + + return response.json() + + def store_datapoint(self, name, value, tags): + data = [ + dict(name=name, datapoints=[ + [int(round(time.time() * 1000)), value] + ], tags={ + "cbpi": tags + }) + ] + + response = requests.post(self.kairosdb_url + "/api/v1/datapoints", json.dumps(data)) + + @staticmethod + def compose_time_series_name(ts_type, ts_id): + return "{0}_{1}".format(ts_type, ts_id) diff --git a/modules/time_series_recorder/recorder_service.py b/modules/time_series_recorder/recorder_service.py new file mode 100644 index 0000000..c3b234c --- /dev/null +++ b/modules/time_series_recorder/recorder_service.py @@ -0,0 +1,65 @@ +import logging + +from modules.core.core import cbpi +from modules.time_series_recorder.file_recorder import FileRecorder +from modules.time_series_recorder.kairosdb_recorder import KairosdbRecorder + + +class RecorderService: + + _logger = logging.getLogger(__name__) + + def __init__(self): + print "init" + self.config_event_listener() + self.recorder = self.determine_recorder_impl() + + def check_status(self): + return self.recorder.check_status() + + def load_time_series(self, type_short, type_id): + + if type_short == "s": + type_full = "sensors" + sensor = cbpi.cache.get(type_full).get(type_id) + chart = cbpi.sensor.get_sensors(sensor.type).get("class").chart(sensor) + elif type_short == "k": + type_full = "kettle" + kettle = cbpi.cache.get(type_full).get(type_id) + chart = cbpi.brewing.get_controller(kettle.logic).get("class").chart(kettle) + elif type_short == "f": + type_full = "fermenter" + fermenter = cbpi.cache.get(type_full).get(type_id) + chart = cbpi.fermentation.get_controller(fermenter.logic).get("class").chart(fermenter) + else: + chart = [] + + return map(self.convert_chart_data_to_json, chart) + + def convert_chart_data_to_json(self, chart_data): + return {"name": chart_data["name"], + "data": self.recorder.load_time_series(chart_data["data_type"], chart_data["data_id"])} + + def hallo2(self, **kwargs): + print kwargs + cbpi.beep() + # if name == "kairos_db" or name == "kairos_db_port": + # print name + # print cbpi.cache["config"][name].__dict__["value"] + + def config_event_listener(self): + if "CONFIG_UPDATE" not in cbpi.eventbus: + cbpi.eventbus["CONFIG_UPDATE"] = [] + cbpi.eventbus["CONFIG_UPDATE"].append({"function": self.hallo2, "async": False}) + + @staticmethod + def determine_recorder_impl(): + use_kairosdb = cbpi.cache["config"]["kairos_db"].__dict__["value"] + kairosdb_port = cbpi.cache["config"]["kairos_db_port"].__dict__["value"] + + if use_kairosdb: + recorder = KairosdbRecorder(kairosdb_port) + else: + recorder = FileRecorder() + + return recorder diff --git a/modules/time_series_recorder/recorder_view.py b/modules/time_series_recorder/recorder_view.py new file mode 100644 index 0000000..24b32b5 --- /dev/null +++ b/modules/time_series_recorder/recorder_view.py @@ -0,0 +1,40 @@ +import logging + +from flask import jsonify +from flask_api import status +from flask_classy import FlaskView +from flask_classy import route + +from modules.time_series_recorder.recorder_service import RecorderService + + +class RecorderView(FlaskView): + _logger = logging.getLogger(__name__) + _service = None + + def __init__(self): + if RecorderView._service is None: + RecorderView._service = RecorderService() + + @route('/status', methods=["GET"]) + def check_status(self): + recorder_status = RecorderView._service.check_status() + return self.make_json_response(recorder_status) + + @route('//', methods=["GET"]) + def load_time_series(self, type_short, type_id): + acceptable_types = ("s", "k", "f") + + if type_short in acceptable_types: + time_series = RecorderView._service.load_time_series(type_short, type_id) + return self.make_json_response(time_series) + else: + content = "Invalid type. Only {0} are allowed.".format(acceptable_types) + return content, status.HTTP_400_BAD_REQUEST + + @staticmethod + def make_json_response(content): + response = jsonify(content) + response.headers["content-type"] = "application/json; charset=UTF-8" + + return response diff --git a/requirements.txt b/requirements.txt index 26291a4..48e3711 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ Flask==0.11.1 +Flask-API==1.0 Flask-SocketIO==2.6.2 flask_login==0.4.0 flask_swagger==0.2.13 diff --git a/run.py b/run.py index 659fb35..72985af 100755 --- a/run.py +++ b/run.py @@ -23,5 +23,6 @@ from modules.base_plugins.steps import * from modules.example_plugins.WebViewJquery import * from modules.example_plugins.WebViewReactJs import * from modules.example_plugins.swagger import * +from modules.time_series_recorder import * cbpi.run() diff --git a/update/5_kairosdb_config.sql b/update/5_kairosdb_config.sql index e69de29..35e0af8 100644 --- a/update/5_kairosdb_config.sql +++ b/update/5_kairosdb_config.sql @@ -0,0 +1,2 @@ +INSERT OR IGNORE INTO config VALUES ('kairos_db', 'NO', 'select', 'Use timeseries database KairosDB for storing sensor values. For now you have to install KairosDB by yourself.', '["YES","NO"]' ); +INSERT OR IGNORE INTO config VALUES ('kairos_db_port', 5001, 'number', 'Port for KairosDB. We assume the DB is running on your PI, so IP-Address is 127.0.0.1.', NULL ); \ No newline at end of file