跳转至

7.实现sqlite历史数据存储

说明

举例现在有两个通道两个设备需存储数据,协议为任意两个采集协议。

接下来我们用python来实现数据存储。

构造数据存储脚本

创建sqlite数据库并连接

使用python自带的sqlite3模块,使用sqlite3.connect()函数连接数据库。数据库名称根据需要自行修改。

提示

连接时一定要把check_same_thread参数设置为False。防止数据库执行操作时线程冲突

Python
1
2
3
4
5
6
    # 数据库路径 最后为数据库名称
    route = r'/his/python/sqlite.db'
    # 连接数据库
    conn = sqlite3.connect(route, check_same_thread=False)
    # 创建游标
    cur = conn.cursor()

建表

连接数据库后创建表结构,示例中按设备名创建表,测点名为字段名。实际表结构根据实际需求创建,示例如下述片段所示:

Python
# 创建表
def create_table(chal_list):
    for chl_name in chal_list:
        dev_count = c4py.get_dev_count(chl_name.encode())
        # 创建一个长度为 dev_count 的数组
        device_info_array = (c4py.DeviceInfo * dev_count)()
        c4py.get_dev_array(chl_name.encode(), device_info_array)
        # 遍历设备,获取设备名建表
        for device_info in device_info_array:
            device_name = device_info.dev_name.decode()
            sql = 'CREATE TABLE {} (ID INTEGER PRIMARY KEY AUTOINCREMENT, TIME VARCHAR(100), CHAL VARCHAR(100))'.format(device_name)
            cur.execute(sql)   # 根据设备名建表
            conn.commit()
            # 创建一个长度为 tag_count 的数组
            tag_count = c4py.get_tag_count(device_info.dev_name)
            tag_info_array = (c4py.TagInfo * tag_count)()
            c4py.get_tag_array(device_info.dev_name, tag_info_array)
            # 遍历测点,获取测点名创建字段
            for tag_info in tag_info_array:
                tag_name = tag_info.base.tag_name.decode()
                sql_tag = 'ALTER TABLE {} ADD {} VARCHAR(100)'.format(device_name, tag_name)
                cur.execute(sql_tag)
                conn.commit()

建表后,表结构如下图所示:

获取转发测点参数

取设备参数

首先使用get_dev_count函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_dev_array函数取得设备参数。

Python
1
2
3
4
5
6
7
    #==========================================================
    #根据采集通道取其下的设备信息
    dev_count = c4py.get_dev_count(chl_name.encode())
    print("通道名称:"+ chl_name + "; dev count:" + str(dev_count))
    # 创建一个长度为 dev_count 的数组
    device_info_array = (c4py.DeviceInfo * dev_count)()
    c4py.get_dev_array(chl_name.encode(), device_info_array)

取测点参数

首先使用get_tag_count函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_tag_array函数取得设备参数。

Python
1
2
3
4
5
6
7
    #==========================================================
    #根据设备取其下的测点信息
    tag_count = c4py.get_tag_count(device_info.dev_name)
    print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
    # 创建一个长度为 tag_count 的数组
    tag_info_array = (c4py.TagInfo * tag_count)()
    c4py.get_tag_array(device_info.dev_name, tag_info_array)

数据操作

数据写入

获取测点字段名和测点的值,一一对应分别放入两个列表。保证测点名与测点值对应。

Python
# 整理数据并插入
def db_insertdata():
    for chl_name in chal_list:
        dev_count = c4py.get_dev_count(chl_name.encode())
        # 创建一个长度为 dev_count 的数组
        device_info_array = (c4py.DeviceInfo * dev_count)()
        c4py.get_dev_array(chl_name.encode(), device_info_array)
        # 遍历设备
        for device_info in device_info_array:
            timrstr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
            col = ['TIME', 'CHAL']
            data = ["'{}'".format(timrstr), "'{}'".format(chl_name)]
            tag_count = c4py.get_tag_count(device_info.dev_name)
            tag_info_array = (c4py.TagInfo * tag_count)()
            c4py.get_tag_array(device_info.dev_name, tag_info_array)
            # 遍历测点
            for tag_info in tag_info_array:
                tag_name = tag_info.base.tag_name.decode()   # 测点名称
                tag_value = ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value   # 测点值
                # 对应写入列表中。注意数据格式!!!
                col.append(tag_name)
                data.append("'{}'".format(tag_value))
            # 整理数据为字符串
            col_num = ','.join(col)
            data_insert = ','.join(data)
            # 插入数据
            sql_insert = 'INSERT INTO {}({}) VALUES({})'.format(device_info.dev_name.decode(), col_num, data_insert)
            cur.execute(sql_insert)
            conn.commit()

存储成功后数据库表中数据如下图:

数据删除

清除过期的历史数据。sql_delete参数中,DATETIME()函数可以写入参数,'-1 day'为一天前,可以根据需要自行修改删除几天前的数据。

Python
# 删除历史数据
def delete_hisdata():
    for chl_name in chal_list:
        dev_count = c4py.get_dev_count(chl_name.encode())
        # 创建一个长度为 dev_count 的数组
        device_info_array = (c4py.DeviceInfo * dev_count)()
        c4py.get_dev_array(chl_name.encode(), device_info_array)
        # 遍历设备删除各个表的过期数据
        for device_info in device_info_array:
            # 删除一天前的数据
            sql_delete = "DELETE FROM {} WHERE TIME < DATETIME('now', 'localtime', '-1 day')".format(device_info.dev_name.decode())
            cur.execute(sql_delete)
            conn.commit()

其它优化

系统信号处理

注册系统信号处理函数,来接收程序退出消息,以释放资源。

Python
#系统信号
app_exit_signal = 0

#系统信号处理函数
def handle_sigterm(signum, frame):
    global app_exit_signal
    app_exit_signal = signum

# 注册信号处理函数
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGUSR1, handle_sigterm)

c4py.py_load_so()
c4py.global_init()
while True:
    #do something
    if app_exit_signal != 0:
        print("app exit signal: [{}]".format(app_exit_signal))
        break
#释放资源
c4py.global_release()

定时任务

示例中定每天00:30开始清理过期数据。运用schedule模块完成定时。具体代码如下:

Python
1
2
3
4
5
6
    #  定时执行删除
    schedule.every().day.at("00:30").do(delete_hisdata)
    # 开启定时
    while True:
        schedule.run_pending()
        time.sleep(5)

多线程

数据存储和删除为两个线程。所以用threading模块定义两个线程。

主线程:

Python
# 定义主线程方法 写入历史数据
def thread_main():
    # 数据写入
    while True:
        db_insertdata()
        time.sleep(60)

        if app_exit_signal != 0:
            print("app exit signal: [{}]".format(app_exit_signal))
            break

    conn.close()
    # 释放资源
    c4py.global_release()

第二线程:

Python
1
2
3
4
5
6
7
# 定义副线程方法 定时删除历史数据
def thread_delete():
    schedule.every().day.at("00:30").do(delete_hisdata)

    while True:
        schedule.run_pending()
        time.sleep(5)

创建线程

Python
1
2
3
4
5
6
7
8
9
    # 创建主线程 线程执行为thread_main()函数
    main = threading.Thread(target=thread_main, name='thread_main')
    # 创建第二线程 线程执行为thread_delete()函数
    delete = threading.Thread(target=thread_delete, name='thread_delete')
    # 主线程开启线程守护
    main.setDaemon(True)
    # 启动线程
    main.start()
    delete.start()

完整源码

Python
import sqlite3
import ctypes
import c4py
import signal
import ptvsd
import time
import threading
import schedule


# 系统信号
app_exit_signal = 0

# 系统信号处理函数
def handle_sigterm(signum, frame):
    global app_exit_signal
    app_exit_signal = signum

# 创建表  字段有(id, 时间 varchar(100),通道 varchar(100) 测点信息varchar(255))
def create_table(chal_list):
    for chl_name in chal_list:
        dev_count = c4py.get_dev_count(chl_name.encode())
        # 创建一个长度为 dev_count 的数组
        device_info_array = (c4py.DeviceInfo * dev_count)()
        c4py.get_dev_array(chl_name.encode(), device_info_array)
        for device_info in device_info_array:
            device_name = device_info.dev_name.decode()
            sql = 'CREATE TABLE {} (ID INTEGER PRIMARY KEY AUTOINCREMENT, TIME VARCHAR(100), CHAL VARCHAR(100))'.format(device_name)
            cur.execute(sql)   # 根据设备名建表
            conn.commit()
            tag_count = c4py.get_tag_count(device_info.dev_name)
            tag_info_array = (c4py.TagInfo * tag_count)()
            c4py.get_tag_array(device_info.dev_name, tag_info_array)
            for tag_info in tag_info_array:
                tag_name = tag_info.base.tag_name.decode()
                sql_tag = 'ALTER TABLE {} ADD {} VARCHAR(100)'.format(device_name, tag_name)
                cur.execute(sql_tag)
                conn.commit()

# 整理数据并插入
def db_insertdata():
    for chl_name in chal_list:
        dev_count = c4py.get_dev_count(chl_name.encode())
        # 创建一个长度为 dev_count 的数组
        device_info_array = (c4py.DeviceInfo * dev_count)()
        c4py.get_dev_array(chl_name.encode(), device_info_array)
        for device_info in device_info_array:
            timrstr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
            col = ['TIME', 'CHAL']
            data = ["'{}'".format(timrstr), "'{}'".format(chl_name)]
            tag_count = c4py.get_tag_count(device_info.dev_name)
            tag_info_array = (c4py.TagInfo * tag_count)()
            c4py.get_tag_array(device_info.dev_name, tag_info_array)
            for tag_info in tag_info_array:
                tag_name = tag_info.base.tag_name.decode()
                tag_value = ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value
                col.append(tag_name)
                data.append("'{}'".format(tag_value))
            col_num = ','.join(col)
            data_insert = ','.join(data)
            sql_insert = 'INSERT INTO {}({}) VALUES({})'.format(device_info.dev_name.decode(), col_num, data_insert)
            cur.execute(sql_insert)
            conn.commit()

# 删除历史数据
def delete_hisdata():
    for chl_name in chal_list:
        dev_count = c4py.get_dev_count(chl_name.encode())
        # 创建一个长度为 dev_count 的数组
        device_info_array = (c4py.DeviceInfo * dev_count)()
        c4py.get_dev_array(chl_name.encode(), device_info_array)
        for device_info in device_info_array:
            # 删除一天前的数据
            sql_delete = "DELETE FROM {} WHERE TIME < DATETIME('now', 'localtime', '-1 day')".format(device_info.dev_name.decode())
            cur.execute(sql_delete)
            conn.commit()

# 主线程 写入历史数据
def thread_main():
    # 数据写入
    while True:
        db_insertdata()
        time.sleep(60)

        if app_exit_signal != 0:
            print("app exit signal: [{}]".format(app_exit_signal))
            break

    conn.close()
    # 释放资源
    c4py.global_release()

# 副线程 定时删除历史数据
def thread_delete():
    schedule.every().day.at("00:30").do(delete_hisdata)

    while True:
        schedule.run_pending()
        time.sleep(5)

if __name__ == "__main__":
    # 注册信号处理函数
    signal.signal(signal.SIGTERM, handle_sigterm)
    signal.signal(signal.SIGUSR1, handle_sigterm)
    #==========================================================
    #启动调试,等待调试器附加,正式运行时注释下述两行
    ptvsd.enable_attach(address=('0.0.0.0', 5678))
    ptvsd.wait_for_attach()
    #==========================================================
    #库初始化
    c4py.py_load_so()
    print("c4py version:" + c4py.C4PY_VERSION)
    c4py.global_init()
    #==========================================================
    # 创建数据库并连接
    route = r'/his/python/sqlite.db'
    conn = sqlite3.connect(route, check_same_thread=False)
    cur = conn.cursor()
    chal_list = ['chal_1', 'chal_2']
    # 建表
    create_table(chal_list)
    # 创建主线程
    main = threading.Thread(target=thread_main, name='thread_main')
    # 创建第二线程
    delete = threading.Thread(target=thread_delete, name='thread_delete')
    main.setDaemon(True)
    # 启动线程
    main.start()
    delete.start()