7.实现sqlite历史数据存储
说明
举例现在有两个通道两个设备需存储数据,协议为任意两个采集协议。
接下来我们用python来实现数据存储。
构造数据存储脚本
创建sqlite数据库并连接
使用python自带的sqlite3模块,使用sqlite3.connect()
函数连接数据库。数据库名称根据需要自行修改。
提示
连接时一定要把check_same_thread
参数设置为False
。防止数据库执行操作时线程冲突
Python |
---|
| # 数据库路径 最后为数据库名称
route = r'/his/python/sqlite.db'
# 连接数据库
conn = sqlite3.connect(route, check_same_thread=False)
# 创建游标
cur = conn.cursor()
|
建表
连接数据库后创建表结构,示例中按设备名创建表,测点名为字段名。实际表结构根据实际需求创建,示例如下述片段所示:
Python |
---|
| # 创建表
def create_table(chal_list):
for chl_name in chal_list:
dev_count = c4py.get_dev_count(chl_name.encode())
# 创建一个长度为 dev_count 的数组
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
# 遍历设备,获取设备名建表
for device_info in device_info_array:
device_name = device_info.dev_name.decode()
sql = 'CREATE TABLE {} (ID INTEGER PRIMARY KEY AUTOINCREMENT, TIME VARCHAR(100), CHAL VARCHAR(100))'.format(device_name)
cur.execute(sql) # 根据设备名建表
conn.commit()
# 创建一个长度为 tag_count 的数组
tag_count = c4py.get_tag_count(device_info.dev_name)
tag_info_array = (c4py.TagInfo * tag_count)()
c4py.get_tag_array(device_info.dev_name, tag_info_array)
# 遍历测点,获取测点名创建字段
for tag_info in tag_info_array:
tag_name = tag_info.base.tag_name.decode()
sql_tag = 'ALTER TABLE {} ADD {} VARCHAR(100)'.format(device_name, tag_name)
cur.execute(sql_tag)
conn.commit()
|
建表后,表结构如下图所示:

获取转发测点参数
取设备参数
首先使用get_dev_count
函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_dev_array
函数取得设备参数。
Python |
---|
| #==========================================================
#根据采集通道取其下的设备信息
dev_count = c4py.get_dev_count(chl_name.encode())
print("通道名称:"+ chl_name + "; dev count:" + str(dev_count))
# 创建一个长度为 dev_count 的数组
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
|
取测点参数
首先使用get_tag_count
函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_tag_array
函数取得设备参数。
Python |
---|
| #==========================================================
#根据设备取其下的测点信息
tag_count = c4py.get_tag_count(device_info.dev_name)
print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
# 创建一个长度为 tag_count 的数组
tag_info_array = (c4py.TagInfo * tag_count)()
c4py.get_tag_array(device_info.dev_name, tag_info_array)
|
数据操作
数据写入
获取测点字段名和测点的值,一一对应分别放入两个列表。保证测点名与测点值对应。
Python |
---|
| # 整理数据并插入
def db_insertdata():
for chl_name in chal_list:
dev_count = c4py.get_dev_count(chl_name.encode())
# 创建一个长度为 dev_count 的数组
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
# 遍历设备
for device_info in device_info_array:
timrstr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
col = ['TIME', 'CHAL']
data = ["'{}'".format(timrstr), "'{}'".format(chl_name)]
tag_count = c4py.get_tag_count(device_info.dev_name)
tag_info_array = (c4py.TagInfo * tag_count)()
c4py.get_tag_array(device_info.dev_name, tag_info_array)
# 遍历测点
for tag_info in tag_info_array:
tag_name = tag_info.base.tag_name.decode() # 测点名称
tag_value = ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value # 测点值
# 对应写入列表中。注意数据格式!!!
col.append(tag_name)
data.append("'{}'".format(tag_value))
# 整理数据为字符串
col_num = ','.join(col)
data_insert = ','.join(data)
# 插入数据
sql_insert = 'INSERT INTO {}({}) VALUES({})'.format(device_info.dev_name.decode(), col_num, data_insert)
cur.execute(sql_insert)
conn.commit()
|
存储成功后数据库表中数据如下图:

数据删除
清除过期的历史数据。sql_delete
参数中,DATETIME()
函数可以写入参数,'-1 day'
为一天前,可以根据需要自行修改删除几天前的数据。
Python |
---|
| # 删除历史数据
def delete_hisdata():
for chl_name in chal_list:
dev_count = c4py.get_dev_count(chl_name.encode())
# 创建一个长度为 dev_count 的数组
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
# 遍历设备删除各个表的过期数据
for device_info in device_info_array:
# 删除一天前的数据
sql_delete = "DELETE FROM {} WHERE TIME < DATETIME('now', 'localtime', '-1 day')".format(device_info.dev_name.decode())
cur.execute(sql_delete)
conn.commit()
|
其它优化
系统信号处理
注册系统信号处理函数,来接收程序退出消息,以释放资源。
Python |
---|
| #系统信号
app_exit_signal = 0
#系统信号处理函数
def handle_sigterm(signum, frame):
global app_exit_signal
app_exit_signal = signum
# 注册信号处理函数
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGUSR1, handle_sigterm)
c4py.py_load_so()
c4py.global_init()
while True:
#do something
if app_exit_signal != 0:
print("app exit signal: [{}]".format(app_exit_signal))
break
#释放资源
c4py.global_release()
|
定时任务
示例中定每天00:30开始清理过期数据。运用schedule
模块完成定时。具体代码如下:
Python |
---|
| # 定时执行删除
schedule.every().day.at("00:30").do(delete_hisdata)
# 开启定时
while True:
schedule.run_pending()
time.sleep(5)
|
多线程
数据存储和删除为两个线程。所以用threading
模块定义两个线程。
主线程:
Python |
---|
| # 定义主线程方法 写入历史数据
def thread_main():
# 数据写入
while True:
db_insertdata()
time.sleep(60)
if app_exit_signal != 0:
print("app exit signal: [{}]".format(app_exit_signal))
break
conn.close()
# 释放资源
c4py.global_release()
|
第二线程:
Python |
---|
| # 定义副线程方法 定时删除历史数据
def thread_delete():
schedule.every().day.at("00:30").do(delete_hisdata)
while True:
schedule.run_pending()
time.sleep(5)
|
创建线程
Python |
---|
| # 创建主线程 线程执行为thread_main()函数
main = threading.Thread(target=thread_main, name='thread_main')
# 创建第二线程 线程执行为thread_delete()函数
delete = threading.Thread(target=thread_delete, name='thread_delete')
# 主线程开启线程守护
main.setDaemon(True)
# 启动线程
main.start()
delete.start()
|
完整源码
Python |
---|
| import sqlite3
import ctypes
import c4py
import signal
import ptvsd
import time
import threading
import schedule
# 系统信号
app_exit_signal = 0
# 系统信号处理函数
def handle_sigterm(signum, frame):
global app_exit_signal
app_exit_signal = signum
# 创建表 字段有(id, 时间 varchar(100),通道 varchar(100) 测点信息varchar(255))
def create_table(chal_list):
for chl_name in chal_list:
dev_count = c4py.get_dev_count(chl_name.encode())
# 创建一个长度为 dev_count 的数组
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
for device_info in device_info_array:
device_name = device_info.dev_name.decode()
sql = 'CREATE TABLE {} (ID INTEGER PRIMARY KEY AUTOINCREMENT, TIME VARCHAR(100), CHAL VARCHAR(100))'.format(device_name)
cur.execute(sql) # 根据设备名建表
conn.commit()
tag_count = c4py.get_tag_count(device_info.dev_name)
tag_info_array = (c4py.TagInfo * tag_count)()
c4py.get_tag_array(device_info.dev_name, tag_info_array)
for tag_info in tag_info_array:
tag_name = tag_info.base.tag_name.decode()
sql_tag = 'ALTER TABLE {} ADD {} VARCHAR(100)'.format(device_name, tag_name)
cur.execute(sql_tag)
conn.commit()
# 整理数据并插入
def db_insertdata():
for chl_name in chal_list:
dev_count = c4py.get_dev_count(chl_name.encode())
# 创建一个长度为 dev_count 的数组
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
for device_info in device_info_array:
timrstr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
col = ['TIME', 'CHAL']
data = ["'{}'".format(timrstr), "'{}'".format(chl_name)]
tag_count = c4py.get_tag_count(device_info.dev_name)
tag_info_array = (c4py.TagInfo * tag_count)()
c4py.get_tag_array(device_info.dev_name, tag_info_array)
for tag_info in tag_info_array:
tag_name = tag_info.base.tag_name.decode()
tag_value = ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value
col.append(tag_name)
data.append("'{}'".format(tag_value))
col_num = ','.join(col)
data_insert = ','.join(data)
sql_insert = 'INSERT INTO {}({}) VALUES({})'.format(device_info.dev_name.decode(), col_num, data_insert)
cur.execute(sql_insert)
conn.commit()
# 删除历史数据
def delete_hisdata():
for chl_name in chal_list:
dev_count = c4py.get_dev_count(chl_name.encode())
# 创建一个长度为 dev_count 的数组
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
for device_info in device_info_array:
# 删除一天前的数据
sql_delete = "DELETE FROM {} WHERE TIME < DATETIME('now', 'localtime', '-1 day')".format(device_info.dev_name.decode())
cur.execute(sql_delete)
conn.commit()
# 主线程 写入历史数据
def thread_main():
# 数据写入
while True:
db_insertdata()
time.sleep(60)
if app_exit_signal != 0:
print("app exit signal: [{}]".format(app_exit_signal))
break
conn.close()
# 释放资源
c4py.global_release()
# 副线程 定时删除历史数据
def thread_delete():
schedule.every().day.at("00:30").do(delete_hisdata)
while True:
schedule.run_pending()
time.sleep(5)
if __name__ == "__main__":
# 注册信号处理函数
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGUSR1, handle_sigterm)
#==========================================================
#启动调试,等待调试器附加,正式运行时注释下述两行
ptvsd.enable_attach(address=('0.0.0.0', 5678))
ptvsd.wait_for_attach()
#==========================================================
#库初始化
c4py.py_load_so()
print("c4py version:" + c4py.C4PY_VERSION)
c4py.global_init()
#==========================================================
# 创建数据库并连接
route = r'/his/python/sqlite.db'
conn = sqlite3.connect(route, check_same_thread=False)
cur = conn.cursor()
chal_list = ['chal_1', 'chal_2']
# 建表
create_table(chal_list)
# 创建主线程
main = threading.Thread(target=thread_main, name='thread_main')
# 创建第二线程
delete = threading.Thread(target=thread_delete, name='thread_delete')
main.setDaemon(True)
# 启动线程
main.start()
delete.start()
|