瀏覽代碼

added time series database KairosDB

pull/147/head
Johannes 7 年之前
父節點
當前提交
c205247001
共有 12 個檔案被更改,包括 241 行新增12 行删除
  1. +4
    -2
      modules/config/__init__.py
  2. +5
    -0
      modules/core/basetypes.py
  3. +11
    -10
      modules/logs/__init__.py
  4. +10
    -0
      modules/time_series_recorder/__init__.py
  5. +17
    -0
      modules/time_series_recorder/base_recorder.py
  6. +12
    -0
      modules/time_series_recorder/file_recorder.py
  7. +73
    -0
      modules/time_series_recorder/kairosdb_recorder.py
  8. +65
    -0
      modules/time_series_recorder/recorder_service.py
  9. +40
    -0
      modules/time_series_recorder/recorder_view.py
  10. +1
    -0
      requirements.txt
  11. +1
    -0
      run.py
  12. +2
    -0
      update/5_kairosdb_config.sql

+ 4
- 2
modules/config/__init__.py 查看文件

@@ -76,10 +76,9 @@ class ConfigView(BaseView):
@classmethod
def init_cache(cls):
with cls.api._app.app_context():
cls.api.cache[cls.cache_key] = {}
for key, value in cls.model.get_all().iteritems():
for key, value in cls.model.get_all().iteritems():
cls.post_init_callback(value)
cls.api.cache[cls.cache_key][value.name] = value
@@ -87,3 +86,6 @@ class ConfigView(BaseView):
def init(cbpi):
ConfigView.register(cbpi._app, route_base='/api/config')
ConfigView.init_cache()

+ 5
- 0
modules/core/basetypes.py 查看文件

@@ -50,6 +50,11 @@ class Sensor(Base):
unit = ""
@staticmethod
def chart(sensor):
result = [{"name": sensor.name, "data_type": "sensor", "data_id": sensor.id}]
return result
@classmethod
def init_global(cls):
pass


+ 11
- 10
modules/logs/__init__.py 查看文件

@@ -38,17 +38,18 @@ class LogView(FlaskView):
description: List of all log files
"""
filename = "./logs/action.log"
if os.path.isfile(filename) == False:
return
import csv
array = []
with open(filename, 'rb') as f:
reader = csv.reader(f)
for row in reader:
try:
array.append([int((datetime.datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S") - datetime.datetime(1970, 1, 1)).total_seconds()) * 1000, row[1]])
except:
pass
if os.path.isfile(filename):
import csv
array = []
with open(filename, 'rb') as f:
reader = csv.reader(f)
for row in reader:
try:
array.append([int((datetime.datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S") - datetime.datetime(1970, 1, 1)).total_seconds()) * 1000, row[1]])
except:
pass
return json.dumps(array)
@route('/<file>', methods=["DELETE"])


+ 10
- 0
modules/time_series_recorder/__init__.py 查看文件

@@ -0,0 +1,10 @@


from modules.core.core import cbpi
from modules.time_series_recorder.recorder_service import RecorderService
from modules.time_series_recorder.recorder_view import RecorderView


@cbpi.addon.core.initializer(order=1000)
def init(cbpi):
RecorderView.register(cbpi._app, route_base='/api/recorder')

+ 17
- 0
modules/time_series_recorder/base_recorder.py 查看文件

@@ -0,0 +1,17 @@
import abc


class BaseRecorder:
__metaclass__ = abc.ABCMeta

@abc.abstractmethod
def load_time_series(self, ts_type, ts_id):
raise NotImplementedError("Method not implemented")

@abc.abstractmethod
def check_status(self):
raise NotImplementedError("Method not implemented")

@abc.abstractmethod
def store_datapoint(self, name, value, tags):
raise NotImplementedError("Method not implemented")

+ 12
- 0
modules/time_series_recorder/file_recorder.py 查看文件

@@ -0,0 +1,12 @@
from modules.time_series_recorder.base_recorder import BaseRecorder


class FileRecorder(BaseRecorder):
def check_status(self):
pass

def load_time_series(self, ts_type, ts_id):
pass

def store_datapoint(self, name, value, tags):
pass

+ 73
- 0
modules/time_series_recorder/kairosdb_recorder.py 查看文件

@@ -0,0 +1,73 @@
import logging
import time

import requests
from flask import json

from modules.time_series_recorder.base_recorder import BaseRecorder


class KairosdbRecorder(BaseRecorder):
kairosdb_ip = "http://192.168.178.20:"
logger = logging.getLogger(__name__)

kairosdb_api_status = "/api/v1/health/status"
kairosdb_api_query = "/api/v1/datapoints/query"

def __init__(self, port):
self.kairosdb_url = self.kairosdb_ip + port

def load_time_series(self, ts_type, ts_id):
time_series_name = self.compose_time_series_name(ts_type, ts_id)

data = dict(metrics=[
{
"tags": {},
"name": time_series_name,
"aggregators": [
{
"name": "avg",
"align_sampling": True,
"sampling": {
"value": "1",
"unit": "minutes"
},
"align_start_time": True
}
]
}
],
cache_time=0,
start_relative={
"value": "30",
"unit": "days"
})

response = requests.post(self.kairosdb_url + self.kairosdb_api_query, json.dumps(data))
if response.ok:
self.logger.debug("Fetching time series for [{0}] took [{1}]".format(time_series_name, response.elapsed))
self.logger.debug("Time series for [{0}] is [{1}]".format(time_series_name, response.json()))

return response.json()["queries"][0]["results"][0]["values"]
else:
self.logger.warning("Failed to fetch time series for [{0}]. Response [{1}]", time_series_name, response)

def check_status(self):
response = requests.get(self.kairosdb_url + self.kairosdb_api_status)

return response.json()

def store_datapoint(self, name, value, tags):
data = [
dict(name=name, datapoints=[
[int(round(time.time() * 1000)), value]
], tags={
"cbpi": tags
})
]

response = requests.post(self.kairosdb_url + "/api/v1/datapoints", json.dumps(data))

@staticmethod
def compose_time_series_name(ts_type, ts_id):
return "{0}_{1}".format(ts_type, ts_id)

+ 65
- 0
modules/time_series_recorder/recorder_service.py 查看文件

@@ -0,0 +1,65 @@
import logging

from modules.core.core import cbpi
from modules.time_series_recorder.file_recorder import FileRecorder
from modules.time_series_recorder.kairosdb_recorder import KairosdbRecorder


class RecorderService:

_logger = logging.getLogger(__name__)

def __init__(self):
print "init"
self.config_event_listener()
self.recorder = self.determine_recorder_impl()

def check_status(self):
return self.recorder.check_status()

def load_time_series(self, type_short, type_id):

if type_short == "s":
type_full = "sensors"
sensor = cbpi.cache.get(type_full).get(type_id)
chart = cbpi.sensor.get_sensors(sensor.type).get("class").chart(sensor)
elif type_short == "k":
type_full = "kettle"
kettle = cbpi.cache.get(type_full).get(type_id)
chart = cbpi.brewing.get_controller(kettle.logic).get("class").chart(kettle)
elif type_short == "f":
type_full = "fermenter"
fermenter = cbpi.cache.get(type_full).get(type_id)
chart = cbpi.fermentation.get_controller(fermenter.logic).get("class").chart(fermenter)
else:
chart = []

return map(self.convert_chart_data_to_json, chart)

def convert_chart_data_to_json(self, chart_data):
return {"name": chart_data["name"],
"data": self.recorder.load_time_series(chart_data["data_type"], chart_data["data_id"])}

def hallo2(self, **kwargs):
print kwargs
cbpi.beep()
# if name == "kairos_db" or name == "kairos_db_port":
# print name
# print cbpi.cache["config"][name].__dict__["value"]

def config_event_listener(self):
if "CONFIG_UPDATE" not in cbpi.eventbus:
cbpi.eventbus["CONFIG_UPDATE"] = []
cbpi.eventbus["CONFIG_UPDATE"].append({"function": self.hallo2, "async": False})

@staticmethod
def determine_recorder_impl():
use_kairosdb = cbpi.cache["config"]["kairos_db"].__dict__["value"]
kairosdb_port = cbpi.cache["config"]["kairos_db_port"].__dict__["value"]

if use_kairosdb:
recorder = KairosdbRecorder(kairosdb_port)
else:
recorder = FileRecorder()

return recorder

+ 40
- 0
modules/time_series_recorder/recorder_view.py 查看文件

@@ -0,0 +1,40 @@
import logging

from flask import jsonify
from flask_api import status
from flask_classy import FlaskView
from flask_classy import route

from modules.time_series_recorder.recorder_service import RecorderService


class RecorderView(FlaskView):
_logger = logging.getLogger(__name__)
_service = None

def __init__(self):
if RecorderView._service is None:
RecorderView._service = RecorderService()

@route('/status', methods=["GET"])
def check_status(self):
recorder_status = RecorderView._service.check_status()
return self.make_json_response(recorder_status)

@route('/<type_short>/<int:type_id>', methods=["GET"])
def load_time_series(self, type_short, type_id):
acceptable_types = ("s", "k", "f")

if type_short in acceptable_types:
time_series = RecorderView._service.load_time_series(type_short, type_id)
return self.make_json_response(time_series)
else:
content = "Invalid type. Only {0} are allowed.".format(acceptable_types)
return content, status.HTTP_400_BAD_REQUEST

@staticmethod
def make_json_response(content):
response = jsonify(content)
response.headers["content-type"] = "application/json; charset=UTF-8"

return response

+ 1
- 0
requirements.txt 查看文件

@@ -1,4 +1,5 @@
Flask==0.11.1
Flask-API==1.0
Flask-SocketIO==2.6.2
flask_login==0.4.0
flask_swagger==0.2.13


+ 1
- 0
run.py 查看文件

@@ -23,5 +23,6 @@ from modules.base_plugins.steps import *
from modules.example_plugins.WebViewJquery import *
from modules.example_plugins.WebViewReactJs import *
from modules.example_plugins.swagger import *
from modules.time_series_recorder import *

cbpi.run()

+ 2
- 0
update/5_kairosdb_config.sql 查看文件

@@ -0,0 +1,2 @@
INSERT OR IGNORE INTO config VALUES ('kairos_db', 'NO', 'select', 'Use timeseries database KairosDB for storing sensor values. For now you have to install KairosDB by yourself.', '["YES","NO"]' );
INSERT OR IGNORE INTO config VALUES ('kairos_db_port', 5001, 'number', 'Port for KairosDB. We assume the DB is running on your PI, so IP-Address is 127.0.0.1.', NULL );

Loading…
取消
儲存