浏览代码

Added writing to Time Series Database KairosDB

pull/194/head
Johannes 7 年前
父节点
当前提交
649e4522bb
共有 4 个文件被更改,包括 102 次插入25 次删除
  1. +19
    -0
      config/logger.yaml
  2. +5
    -12
      modules/app_config.py
  3. +16
    -5
      modules/core/core.py
  4. +62
    -8
      modules/logs/endpoints.py

+ 19
- 0
config/logger.yaml 查看文件

@@ -0,0 +1,19 @@
version: 1
formatters:
simple:
format: '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: simple
stream: ext://sys.stdout
file:
class : logging.handlers.RotatingFileHandler
formatter: simple
filename: ./logs/app.log
maxBytes: 10000000
backupCount: 3
root:
level: DEBUG
handlers: [console, file]

+ 5
- 12
modules/app_config.py 查看文件

@@ -1,23 +1,16 @@
from flask import Flask, json, g
from flask_socketio import SocketIO


import json
import sys, os
from flask import Flask, render_template, redirect, json, g


from flask_socketio import SocketIO, emit

import logging


import yaml
import logging.config


from modules.core.core import CraftBeerPi, ActorBase, SensorBase from modules.core.core import CraftBeerPi, ActorBase, SensorBase
from modules.core.db import DBModel from modules.core.db import DBModel


app = Flask(__name__) app = Flask(__name__)


FORMAT = '%(asctime)-15s - %(levelname)s - %(message)s'
logging.config.dictConfig(yaml.load(open('./config/logger.yaml', 'r')))


logging.basicConfig(filename='./logs/app.log',level=logging.INFO, format=FORMAT)
app.config['SECRET_KEY'] = 'craftbeerpi' app.config['SECRET_KEY'] = 'craftbeerpi'
app.config['UPLOAD_FOLDER'] = './upload' app.config['UPLOAD_FOLDER'] = './upload'




+ 16
- 5
modules/core/core.py 查看文件

@@ -1,16 +1,14 @@
import inspect import inspect
import pprint


import sqlite3
from flask import make_response, g from flask import make_response, g
import datetime import datetime
from datetime import datetime from datetime import datetime
from flask.views import MethodView
from flask_classy import FlaskView, route


from time import localtime, strftime from time import localtime, strftime
from functools import wraps, update_wrapper from functools import wraps, update_wrapper


import requests
import json


from props import * from props import *


@@ -149,13 +147,26 @@ class SensorAPI(object):
self.save_to_file(id, value) self.save_to_file(id, value)


def save_to_file(self, id, value, prefix="sensor"): def save_to_file(self, id, value, prefix="sensor"):
filename = "./logs/%s_%s.log" % (prefix, str(id))
sensor_name = "%s_%s" % (prefix, str(id))
filename = "./logs/%slog" % sensor_name
formatted_time = strftime("%Y-%m-%d %H:%M:%S", localtime()) formatted_time = strftime("%Y-%m-%d %H:%M:%S", localtime())
msg = str(formatted_time) + "," +str(value) + "\n" msg = str(formatted_time) + "," +str(value) + "\n"


with open(filename, "a") as file: with open(filename, "a") as file:
file.write(msg) file.write(msg)


kairosdb_server = "http://192.168.178.20:5001"

data = [
dict(name=sensor_name, datapoints=[
[int(round(time.time() * 1000)), value]
], tags={
"cbpi": prefix
})
]

response = requests.post(kairosdb_server + "/api/v1/datapoints", json.dumps(data))

def log_action(self, text): def log_action(self, text):
filename = "./logs/action.log" filename = "./logs/action.log"
formatted_time = strftime("%Y-%m-%d %H:%M:%S", localtime()) formatted_time = strftime("%Y-%m-%d %H:%M:%S", localtime())


+ 62
- 8
modules/logs/endpoints.py 查看文件

@@ -1,11 +1,16 @@
import datetime import datetime
import os import os
from flask import Blueprint, request, send_from_directory, json
import time
import requests
import logging
from flask import request, send_from_directory, json
from flask_classy import FlaskView, route from flask_classy import FlaskView, route
from modules import cbpi from modules import cbpi




class LogView(FlaskView): class LogView(FlaskView):
def __init__(self):
self.logger = logging.getLogger(__name__)


@route('/', methods=['GET']) @route('/', methods=['GET'])
def get_all_logfiles(self): def get_all_logfiles(self):
@@ -18,18 +23,24 @@ class LogView(FlaskView):
@route('/actions') @route('/actions')
def actions(self): def actions(self):
filename = "./logs/action.log" filename = "./logs/action.log"
if os.path.isfile(filename) == False:
return
if not os.path.isfile(filename):
self.logger.warn("File does not exist [%s]", filename)
return json.dumps([])
import csv import csv
array = [] array = []
with open(filename, 'rb') as f: with open(filename, 'rb') as f:
reader = csv.reader(f) reader = csv.reader(f)
for row in reader: for row in reader:
try: try:
array.append([int((datetime.datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S") - datetime.datetime(1970, 1, 1)).total_seconds()) * 1000, row[1]])
time.mktime(time.strptime(row[0], "%Y-%m-%d %H:%M:%S"))
array.append([int(time.mktime(time.strptime(row[0], "%Y-%m-%d %H:%M:%S")) * 1000), row[1]])
except: except:
pass pass
return json.dumps(array)

json_dumps = json.dumps(array)
self.logger.debug("Loaded action.log [%s]", json_dumps)

return json_dumps


@route('/<file>', methods=["DELETE"]) @route('/<file>', methods=["DELETE"])
def clearlog(self, file): def clearlog(self, file):
@@ -49,7 +60,43 @@ class LogView(FlaskView):
cbpi.notify("Failed to delete log", "", type="danger") cbpi.notify("Failed to delete log", "", type="danger")
return ('', 204) return ('', 204)


def querry_tsdb(self, type, id):
kairosdb_server = "http://192.168.178.20:5001"

data = dict(metrics=[
{
"tags": {},
"name": "%s_%s" % (type, id),
"aggregators": [
{
"name": "avg",
"align_sampling": True,
"sampling": {
"value": "1",
"unit": "minutes"
},
"align_start_time": True
}
]
}
],
cache_time=0,
start_relative={
"value": "1",
"unit": "days"
})

response = requests.post(kairosdb_server + "/api/v1/datapoints/query", json.dumps(data))
if response.ok:
self.logger.debug("Fetching time series for [%s_%s] took [%s]", type, id, response.elapsed)
self.logger.debug("Time series for [%s_%s] is [%s]", type, id, response.json())
return response.json()["queries"][0]["results"][0]["values"]
else:
self.logger.warning("Failed to fetch time series for [%s_%s]. Response [%s]", type, id, response)

def read_log_as_json(self, type, id): def read_log_as_json(self, type, id):
return self.querry_tsdb(type, id)

filename = "./logs/%s_%s.log" % (type, id) filename = "./logs/%s_%s.log" % (type, id)
if os.path.isfile(filename) == False: if os.path.isfile(filename) == False:
return return
@@ -60,13 +107,18 @@ class LogView(FlaskView):
reader = csv.reader(f) reader = csv.reader(f)
for row in reader: for row in reader:
try: try:
array.append([int((datetime.datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S") - datetime.datetime(1970, 1, 1)).total_seconds()) * 1000, float(row[1])])
array.append([int((datetime.datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S") - datetime.datetime(1970,
1,
1)).total_seconds()) * 1000,
float(row[1])])
except: except:
pass pass
print(array)
return array return array


def convert_chart_data_to_json(self, chart_data): def convert_chart_data_to_json(self, chart_data):
return {"name": chart_data["name"], "data": self.read_log_as_json(chart_data["data_type"], chart_data["data_id"])}
return {"name": chart_data["name"],
"data": self.read_log_as_json(chart_data["data_type"], chart_data["data_id"])}


@route('/<t>/<int:id>', methods=["POST"]) @route('/<t>/<int:id>', methods=["POST"])
def get_logs_as_json(self, t, id): def get_logs_as_json(self, t, id):
@@ -82,7 +134,8 @@ class LogView(FlaskView):


if t == "f": if t == "f":
fermenter = cbpi.cache.get("fermenter").get(id) fermenter = cbpi.cache.get("fermenter").get(id)
result = map(self.convert_chart_data_to_json, cbpi.get_fermentation_controller(fermenter.logic).get("class").chart(fermenter))
result = map(self.convert_chart_data_to_json,
cbpi.get_fermentation_controller(fermenter.logic).get("class").chart(fermenter))


return json.dumps(result) return json.dumps(result)


@@ -99,6 +152,7 @@ class LogView(FlaskView):


return True if pattern.match(name) else False return True if pattern.match(name) else False



@cbpi.initalizer() @cbpi.initalizer()
def init(app): def init(app):
""" """


正在加载...
取消
保存