6.实现SQL server转发
说明
举例现在有两个通道需要转发,协议为任意两个采集协议,转发为SQL server转发。
接下来我们用python来实现数据的转发。
构造转发工程
提示
通过python转发将不再创建通道。但需在代码中配置服务器,端口,数据库,用户名以及密码参数。
配置SQL server转发参数
使用pymssql模块,使用pymssql.connect()
函数连接服务器参数。服务器名称和数据库名称根据实际情况自行修改。
Python |
---|
| # 连接数据库
server = '172.20.1.161\MSSQLHAH1' # 服务器名称
database = 'MSSQL_SLAVE_HIS' # 数据库名称
conn = pymssql.connect(server=server,
port="1433", # 端口号
user="sa", password="123456", # sql server验证 用户名密码
database=database,
charset="utf8") # 编码方式
|
建表
连接数据库后创建表结构,实际表结构根据实际需求创建,示例如下述片段所示:
Python |
---|
| # 建表
def mssql_createTable():
sql_createTable = 'CREATE TABLE MSSQL_SLAVE (Time VARCHAR(100), SN VARCHAR(100), Dev VARCHAR(100))'
cur.execute(sql_createTable)
for i in range(1, 1001):
columns = 'Reg' + str(i)
cur.execute('ALTER TABLE MSSQL_SLAVE ADD {} VARCHAR(100)'.format(columns))
conn.commit()
|
建表后,表结构如下图所示:
获取转发测点参数
取设备参数
首先使用get_dev_count
函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_dev_array
函数取得设备参数。
Python |
---|
| #==========================================================
#根据采集通道取其下的设备信息
dev_count = c4py.get_dev_count(chl_name.encode())
print("通道名称:"+ chl_name + "; dev count:" + str(dev_count))
# 创建一个长度为 dev_count 的数组
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
|
取测点参数
首先使用get_tag_count
函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_tag_array
函数取得设备参数。
Python |
---|
| #==========================================================
#根据设备取其下的测点信息
tag_count = c4py.get_tag_count(device_info.dev_name)
print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
# 创建一个长度为 tag_count 的数组
tag_info_array = (c4py.TagInfo * tag_count)()
c4py.get_tag_array(device_info.dev_name, tag_info_array)
|
转发
整理数据写入表中
根据创建表时的字段名获取相应的参数,并用sql语句插入数据。
Python |
---|
| # 整理数据写入数据库
def mssql_insertData(SN):
# 遍历通道
for chl_name in chl_list:
dev_count = c4py.get_dev_count(chl_name.encode())
# 创建一个长度为 dev_count 的数组
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
# 遍历设备
for device_info in device_info_array:
timestr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
tag_count = c4py.get_tag_count(device_info.dev_name)
print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
# 创建一个长度为 tag_count 的数组
tag_info_array = (c4py.TagInfo * tag_count)()
c4py.get_tag_array(device_info.dev_name, tag_info_array)
num = 0
# 字段值
col= ['Time', 'SN', 'Dev']
# value值
data = ["'{}'".format(timestr), "'{}'".format(SN), "'{}'".format(device_info.dev_name.decode())]
# 遍历测点
for tag_info in tag_info_array:
num += 1
col.append('Reg' + str(num))
# 数据处理 去掉中括号
columns_num = ','.join(col)
data.append("'{}'".format(str(ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value)))
# 数据处理 去掉中括号
insertdata = ','.join(data)
# sql语句写入数据
cur.execute("INSERT INTO MSSQL_SLAVE({}) VALUES({})".format(columns_num,insertdata))
conn.commit()
|
转发成功后数据库表中数据如下图:
其它优化
1、注册系统信号处理函数,来接收程序退出消息,以释放资源。
Python |
---|
| #系统信号
app_exit_signal = 0
#系统信号处理函数
def handle_sigterm(signum, frame):
global app_exit_signal
app_exit_signal = signum
# 注册信号处理函数
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGUSR1, handle_sigterm)
c4py.py_load_so()
c4py.global_init()
while True:
#do something
if app_exit_signal != 0:
print("app exit signal: [{}]".format(app_exit_signal))
break
#释放资源
c4py.global_release()
|
2、可以根据需要自行添加定时任务,如定时删除数据等。只需增加一个线程即可。
完整源码
Python |
---|
| import pymssql
import ctypes
import c4py
import signal
import ptvsd
import time
# 系统信号
app_exit_signal = 0
# 系统信号处理函数
def handle_sigterm(signum, frame):
global app_exit_signal
app_exit_signal = signum
# 建表
def mssql_createTable():
sql_createTable = 'CREATE TABLE MSSQL_SLAVE (Time VARCHAR(100), SN VARCHAR(100), Dev VARCHAR(100))'
cur.execute(sql_createTable)
for i in range(1, 1001): # 每个设备支持1000个测点
columns = 'Reg' + str(i)
cur.execute('ALTER TABLE MSSQL_SLAVE ADD {} VARCHAR(100)'.format(columns))
conn.commit() # 也可以在配置连接时配置自动commit,如果没配置一定要加这一句
# 整理数据写入数据库
def mssql_insertData(SN):
for chl_name in chl_list:
dev_count = c4py.get_dev_count(chl_name.encode())
# 创建一个长度为 dev_count 的数组
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
for device_info in device_info_array:
timestr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
tag_count = c4py.get_tag_count(device_info.dev_name)
print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
# 创建一个长度为 tag_count 的数组
tag_info_array = (c4py.TagInfo * tag_count)()
c4py.get_tag_array(device_info.dev_name, tag_info_array)
num = 0
# 字段值
col= ['Time', 'SN', 'Dev']
# value值
data = ["'{}'".format(timestr), "'{}'".format(SN), "'{}'".format(device_info.dev_name.decode())]
for tag_info in tag_info_array:
num += 1
col.append('Reg' + str(num))
# 数据处理 去掉中括号
columns_num = ','.join(col)
data.append("'{}'".format(str(ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value)))
# 数据处理 去掉中括号
insertdata = ','.join(data)
# print(insertdata)
# print(columns_num)
cur.execute("INSERT INTO MSSQL_SLAVE({}) VALUES({})".format(columns_num,insertdata))
conn.commit()
if __name__ == "__main__":
# 注册信号处理函数
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGUSR1, handle_sigterm)
#==========================================================
#启动调试,等待调试器附加,正式运行时注释下述两行
ptvsd.enable_attach(address=('0.0.0.0', 5678))
ptvsd.wait_for_attach()
#==========================================================
#库初始化
c4py.py_load_so()
print("c4py version:" + c4py.C4PY_VERSION)
c4py.global_init()
#==========================================================
# 连接数据库
server = '172.20.1.161\MSSQLHAH1' # 服务器名称
database = 'MSSQL_SLAVE_HIS' # 数据库名称
conn = pymssql.connect(server=server,
port="1433",
user="sa", password="123456",
database=database,
charset="utf8")
# 创建游标
cur = conn.cursor()
# 建表
mssql_createTable()
# 选择需要转发的通道
chl_list = ['chal_1', 'chal_2']
SN = '80C306D021964E03'
# 数据写入
while True:
mssql_insertData(SN)
c4py.msleep(5000)
if app_exit_signal != 0:
print("app exit signal: [{}]".format(app_exit_signal))
break
# 释放资源
conn.close()
c4py.global_release()
|