跳转至

8.实现redis转发

说明

举例现在有两个通道两个设备需转发,协议为任意两个采集协议。

接下来我们用python来实现redis数据存储。

获取转发测点参数

取设备参数

首先使用get_dev_count函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_dev_array函数取得设备参数。

Python
1
2
3
4
5
6
7
    #==========================================================
    #根据采集通道取其下的设备信息
    dev_count = c4py.get_dev_count(chl_name.encode())
    print("通道名称:"+ chl_name + "; dev count:" + str(dev_count))
    # 创建一个长度为 dev_count 的数组
    device_info_array = (c4py.DeviceInfo * dev_count)()
    c4py.get_dev_array(chl_name.encode(), device_info_array)

取测点参数

首先使用get_tag_count函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_tag_array函数取得设备参数。

Python
1
2
3
4
5
6
7
    #==========================================================
    #根据设备取其下的测点信息
    tag_count = c4py.get_tag_count(device_info.dev_name)
    print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
    # 创建一个长度为 tag_count 的数组
    tag_info_array = (c4py.TagInfo * tag_count)()
    c4py.get_tag_array(device_info.dev_name, tag_info_array)

构造转发数据

连接redis数据库

使用python的redis模块,使用redis.Redis()函数连接数据库。数据库名称根据需要自行修改。

Python
1
2
3
    # 创建连接池
    redis_pool_db0 = redis.ConnectionPool(host='172.20.1.161', port= 6379, db=0)
    conn_db0 = redis.Redis(connection_pool=redis_pool_db0)

构造转发数据

提示

redis是非关系型数据库,由key,value组成一个数据表。转发数据用hash格式写入。

设备名为表的key值:

Python
# 构造存储数据并写入redis 按照hash格式 键值对的方式写入数据
def now_data(chal_list, conn):
    for chl_name in chal_list:
        dev_count = c4py.get_dev_count(chl_name.encode())
        # 创建一个长度为 dev_count 的数组
        device_info_array = (c4py.DeviceInfo * dev_count)()
        c4py.get_dev_array(chl_name.encode(), device_info_array)
        for device_info in device_info_array:
            device_name = device_info.dev_name.decode()
            tag_count = c4py.get_tag_count(device_info.dev_name)
            # 创建一个长度为 tag_count 的数组
            tag_info_array = (c4py.TagInfo * tag_count)()
            c4py.get_tag_array(device_info.dev_name, tag_info_array)
            for tag_info in tag_info_array:
                tag_name = tag_info.base.tag_name.decode()
                tag_value = ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value
                # 写入hash格式的数据
                conn.hset('{}'.format(device_name),'{}'.format(tag_name), tag_value)
            # 休息一个帧间隔
            c4py.msleep(50)

hash格式表结构如下图所示:

数据操作

历史数据写入

获取测点名和测点值,整理数据后用字符串形式写入:

Python
# 构造存储数据并写入redis
def biuld_data(chal_list, conn):
    for chl_name in chal_list:
        dev_count = c4py.get_dev_count(chl_name.encode())
        # 创建一个长度为 dev_count 的数组
        device_info_array = (c4py.DeviceInfo * dev_count)()
        c4py.get_dev_array(chl_name.encode(), device_info_array)
        for device_info in device_info_array:
            tag_count = c4py.get_tag_count(device_info.dev_name)
            # 创建一个长度为 tag_count 的数组
            tag_info_array = (c4py.TagInfo * tag_count)()
            c4py.get_tag_array(device_info.dev_name, tag_info_array)
            timestr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
            data = [timestr]
            for tag_info in tag_info_array:
                tag_name = tag_info.base.tag_name.decode()
                tag_value = ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value
                data.append("{}".format(tag_name) + ':' + "{}".format(str(tag_value)))
            insert_data = ' '.join(data) + '\n'
            conn.append('{}'.format(device_info.dev_name.decode()), insert_data)
            # 查看数据
            # print('{}'.format(device_info.dev_name.decode()) + ':' + conn.get('{}'.format(device_info.dev_name.decode())).decode())
            c4py.msleep(50)

存储成功后数据库表中数据如下图:

其它优化

系统信号处理

注册系统信号处理函数,来接收程序退出消息,以释放资源。

Python
#系统信号
app_exit_signal = 0

#系统信号处理函数
def handle_sigterm(signum, frame):
    global app_exit_signal
    app_exit_signal = signum

# 注册信号处理函数
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGUSR1, handle_sigterm)

c4py.py_load_so()
c4py.global_init()
while True:
    #do something
    if app_exit_signal != 0:
        print("app exit signal: [{}]".format(app_exit_signal))
        break
#释放资源
c4py.global_release()

完整源码

Python
import redis
import ctypes
import c4py
import signal
import ptvsd
import time


# 系统信号
app_exit_signal = 0

# 系统信号处理函数
def handle_sigterm(signum, frame):
    global app_exit_signal
    app_exit_signal = signum

# 构造存储数据并写入redis
def biuld_data(chal_list, conn):
    for chl_name in chal_list:
        dev_count = c4py.get_dev_count(chl_name.encode())
        # 创建一个长度为 dev_count 的数组
        device_info_array = (c4py.DeviceInfo * dev_count)()
        c4py.get_dev_array(chl_name.encode(), device_info_array)
        for device_info in device_info_array:
            tag_count = c4py.get_tag_count(device_info.dev_name)
            # 创建一个长度为 tag_count 的数组
            tag_info_array = (c4py.TagInfo * tag_count)()
            c4py.get_tag_array(device_info.dev_name, tag_info_array)
            timestr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
            data = [timestr]
            for tag_info in tag_info_array:
                tag_name = tag_info.base.tag_name.decode()
                tag_value = ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value
                data.append("{}".format(tag_name) + ':' + "{}".format(str(tag_value)))
            insert_data = ' '.join(data) + '\n'
            conn.append('{}'.format(device_info.dev_name.decode()), insert_data)
            # 查看数据
            # print('{}'.format(device_info.dev_name.decode()) + ':' + conn.get('{}'.format(device_info.dev_name.decode())).decode())
            c4py.msleep(50)

# 构造存储数据并写入redis 按照hash格式 键值对的方式写入数据
def now_data(chal_list, conn):
    for chl_name in chal_list:
        dev_count = c4py.get_dev_count(chl_name.encode())
        # 创建一个长度为 dev_count 的数组
        device_info_array = (c4py.DeviceInfo * dev_count)()
        c4py.get_dev_array(chl_name.encode(), device_info_array)
        for device_info in device_info_array:
            device_name = device_info.dev_name.decode()
            tag_count = c4py.get_tag_count(device_info.dev_name)
            # 创建一个长度为 tag_count 的数组
            tag_info_array = (c4py.TagInfo * tag_count)()
            c4py.get_tag_array(device_info.dev_name, tag_info_array)
            for tag_info in tag_info_array:
                tag_name = tag_info.base.tag_name.decode()
                tag_value = ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value
                conn.hset('{}'.format(device_name),'{}'.format(tag_name), tag_value)
            c4py.msleep(50)


if __name__ == "__main__":
    # 注册信号处理函数
    signal.signal(signal.SIGTERM, handle_sigterm)
    signal.signal(signal.SIGUSR1, handle_sigterm)
    #==========================================================
    #启动调试,等待调试器附加,正式运行时注释下述两行
    ptvsd.enable_attach(address=('0.0.0.0', 5678))
    ptvsd.wait_for_attach()
    #==========================================================
    #库初始化
    c4py.py_load_so()
    print("c4py version:" + c4py.C4PY_VERSION)
    c4py.global_init()
    #==========================================================
    chal_list = ['chal_1', 'chal_2']

    # 连接redis数据库
    redis_pool_db0 = redis.ConnectionPool(host='172.20.1.161', port= 6379, db=0)
    redis_pool_db1 = redis.ConnectionPool(host='172.20.1.161', port= 6379, db=1)
    # 写入数据
    while True:
        conn_db0 = redis.Redis(connection_pool=redis_pool_db0)
        biuld_data(chal_list,conn_db0)
        conn_db1 = redis.Redis(connection_pool=redis_pool_db1)
        now_data(chal_list, conn_db1)
        # 交互频率10秒
        c4py.msleep(10000)
        if app_exit_signal != 0:
            print("app exit signal: [{}]".format(app_exit_signal))
            break

    # 释放资源
    c4py.global_release()