跳转至

6.实现SQL server转发

说明

举例现在有两个通道需要转发,协议为任意两个采集协议,转发为SQL server转发。

接下来我们用python来实现数据的转发。

构造转发工程

提示

通过python转发将不再创建通道。但需在代码中配置服务器,端口,数据库,用户名以及密码参数。

配置SQL server转发参数

使用pymssql模块,使用pymssql.connect()函数连接服务器参数。服务器名称和数据库名称根据实际情况自行修改。

Python
1
2
3
4
5
6
7
8
    # 连接数据库
    server = '172.20.1.161\MSSQLHAH1'    # 服务器名称
    database = 'MSSQL_SLAVE_HIS'         # 数据库名称
    conn = pymssql.connect(server=server,
                           port="1433",   # 端口号
                           user="sa", password="123456",  # sql server验证 用户名密码
                           database=database,
                           charset="utf8")   # 编码方式

建表

连接数据库后创建表结构,实际表结构根据实际需求创建,示例如下述片段所示:

Python
1
2
3
4
5
6
7
8
    # 建表
    def mssql_createTable():
        sql_createTable = 'CREATE TABLE MSSQL_SLAVE (Time VARCHAR(100), SN VARCHAR(100), Dev VARCHAR(100))'
        cur.execute(sql_createTable) 
        for i in range(1, 1001):
            columns = 'Reg' + str(i)
            cur.execute('ALTER TABLE MSSQL_SLAVE ADD {} VARCHAR(100)'.format(columns))
        conn.commit()

建表后,表结构如下图所示:

获取转发测点参数

取设备参数

首先使用get_dev_count函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_dev_array函数取得设备参数。

Python
1
2
3
4
5
6
7
    #==========================================================
    #根据采集通道取其下的设备信息
    dev_count = c4py.get_dev_count(chl_name.encode())
    print("通道名称:"+ chl_name + "; dev count:" + str(dev_count))
    # 创建一个长度为 dev_count 的数组
    device_info_array = (c4py.DeviceInfo * dev_count)()
    c4py.get_dev_array(chl_name.encode(), device_info_array)

取测点参数

首先使用get_tag_count函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_tag_array函数取得设备参数。

Python
1
2
3
4
5
6
7
    #==========================================================
    #根据设备取其下的测点信息
    tag_count = c4py.get_tag_count(device_info.dev_name)
    print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
    # 创建一个长度为 tag_count 的数组
    tag_info_array = (c4py.TagInfo * tag_count)()
    c4py.get_tag_array(device_info.dev_name, tag_info_array)

转发

整理数据写入表中

根据创建表时的字段名获取相应的参数,并用sql语句插入数据。

Python
# 整理数据写入数据库
def mssql_insertData(SN): 
    # 遍历通道   
    for chl_name in chl_list:
        dev_count = c4py.get_dev_count(chl_name.encode())
        # 创建一个长度为 dev_count 的数组
        device_info_array = (c4py.DeviceInfo * dev_count)()
        c4py.get_dev_array(chl_name.encode(), device_info_array)
        # 遍历设备
        for device_info in device_info_array:
            timestr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
            tag_count = c4py.get_tag_count(device_info.dev_name)
            print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
            # 创建一个长度为 tag_count 的数组
            tag_info_array = (c4py.TagInfo * tag_count)()
            c4py.get_tag_array(device_info.dev_name, tag_info_array)
            num = 0
            # 字段值
            col= ['Time', 'SN', 'Dev']
            # value值
            data = ["'{}'".format(timestr), "'{}'".format(SN), "'{}'".format(device_info.dev_name.decode())]
            # 遍历测点
            for tag_info in tag_info_array: 
                num += 1
                col.append('Reg' + str(num))
                # 数据处理 去掉中括号
                columns_num = ','.join(col)
                data.append("'{}'".format(str(ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value)))
                # 数据处理 去掉中括号
                insertdata = ','.join(data)
            # sql语句写入数据
            cur.execute("INSERT INTO MSSQL_SLAVE({}) VALUES({})".format(columns_num,insertdata))
            conn.commit()

转发成功后数据库表中数据如下图:

其它优化

1、注册系统信号处理函数,来接收程序退出消息,以释放资源。

Python
#系统信号
app_exit_signal = 0

#系统信号处理函数
def handle_sigterm(signum, frame):
    global app_exit_signal
    app_exit_signal = signum

# 注册信号处理函数
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGUSR1, handle_sigterm)

c4py.py_load_so()
c4py.global_init()
while True:
    #do something
    if app_exit_signal != 0:
        print("app exit signal: [{}]".format(app_exit_signal))
        break
#释放资源
c4py.global_release()

2、可以根据需要自行添加定时任务,如定时删除数据等。只需增加一个线程即可。

完整源码

Python
import pymssql
import ctypes
import c4py
import signal
import ptvsd
import time

# 系统信号
app_exit_signal = 0

# 系统信号处理函数
def handle_sigterm(signum, frame):
    global app_exit_signal
    app_exit_signal = signum

# 建表
def mssql_createTable():
    sql_createTable = 'CREATE TABLE MSSQL_SLAVE (Time VARCHAR(100), SN VARCHAR(100), Dev VARCHAR(100))'
    cur.execute(sql_createTable) 
    for i in range(1, 1001):    # 每个设备支持1000个测点
        columns = 'Reg' + str(i)
        cur.execute('ALTER TABLE MSSQL_SLAVE ADD {} VARCHAR(100)'.format(columns))
    conn.commit()  # 也可以在配置连接时配置自动commit,如果没配置一定要加这一句

# 整理数据写入数据库
def mssql_insertData(SN):    
    for chl_name in chl_list:
        dev_count = c4py.get_dev_count(chl_name.encode())
        # 创建一个长度为 dev_count 的数组
        device_info_array = (c4py.DeviceInfo * dev_count)()
        c4py.get_dev_array(chl_name.encode(), device_info_array)
        for device_info in device_info_array:
            timestr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
            tag_count = c4py.get_tag_count(device_info.dev_name)
            print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
            # 创建一个长度为 tag_count 的数组
            tag_info_array = (c4py.TagInfo * tag_count)()
            c4py.get_tag_array(device_info.dev_name, tag_info_array)
            num = 0
            # 字段值
            col= ['Time', 'SN', 'Dev']
            # value值
            data = ["'{}'".format(timestr), "'{}'".format(SN), "'{}'".format(device_info.dev_name.decode())]
            for tag_info in tag_info_array: 
                num += 1
                col.append('Reg' + str(num))
                # 数据处理 去掉中括号
                columns_num = ','.join(col)
                data.append("'{}'".format(str(ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value)))
                # 数据处理 去掉中括号
                insertdata = ','.join(data)
                # print(insertdata)
                # print(columns_num)
            cur.execute("INSERT INTO MSSQL_SLAVE({}) VALUES({})".format(columns_num,insertdata))
            conn.commit()


if __name__ == "__main__":
    # 注册信号处理函数
    signal.signal(signal.SIGTERM, handle_sigterm)
    signal.signal(signal.SIGUSR1, handle_sigterm)
    #==========================================================
    #启动调试,等待调试器附加,正式运行时注释下述两行
    ptvsd.enable_attach(address=('0.0.0.0', 5678))
    ptvsd.wait_for_attach()
    #==========================================================
    #库初始化
    c4py.py_load_so()
    print("c4py version:" + c4py.C4PY_VERSION)
    c4py.global_init()
    #==========================================================
    # 连接数据库
    server = '172.20.1.161\MSSQLHAH1'    # 服务器名称
    database = 'MSSQL_SLAVE_HIS'         # 数据库名称
    conn = pymssql.connect(server=server,
                           port="1433",
                           user="sa", password="123456",
                           database=database,
                           charset="utf8")
    # 创建游标
    cur = conn.cursor()
    # 建表
    mssql_createTable()
    # 选择需要转发的通道
    chl_list = ['chal_1', 'chal_2']
    SN = '80C306D021964E03'
    # 数据写入
    while True:
        mssql_insertData(SN)
        c4py.msleep(5000)

        if app_exit_signal != 0:
            print("app exit signal: [{}]".format(app_exit_signal))
            break
    # 释放资源
    conn.close()
    c4py.global_release()