跳转至

9.实现http转发

说明

举例现在有两个通道需要转发,协议为任意两个采集协议,转发为http转发。

接下来我们用python来实现数据的转发。

创建转发工程

提示

通过python转发将不再创建通道。但需在代码中配置IP,端口参数。

配置http转发IP,PORT参数

使用socket模块,定义连接服务器。

Python
1
2
3
4
5
6
7
8
    #==========================================================
    HOST = '目标地址'  # 服务器地址
    PORT = 目标端口   # 服务器端口
    BUF_SIZE = 2048
    ADDR = (HOST, PORT)  # 需组成元组
    client = socket.socket()
    # 连接
    client.connect(ADDR)

加载工程参数

选择需转发的采集通道

创建一个列表,列表中输入需要转发的通道名称,要求字符串格式,并用逗号隔开。根据实际情况自行添加,示例为两个采集通道。如下述片段所示:

Python
    #==========================================================
    chl_list = ['channel_1', 'channel_2']   # 创建需要转发的通道名称列表

取设备参数

首先使用get_dev_count函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_dev_array函数取得设备参数。

Python
1
2
3
4
5
6
7
    #==========================================================
    #根据采集通道取其下的设备信息
    dev_count = c4py.get_dev_count(chl_name.encode())
    print("通道名称:"+ chl_name + "; dev count:" + str(dev_count))
    # 创建一个长度为 dev_count 的数组
    device_info_array = (c4py.DeviceInfo * dev_count)()
    c4py.get_dev_array(chl_name.encode(), device_info_array)

取测点参数

首先使用get_tag_count函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_tag_array函数取得设备参数。

Python
1
2
3
4
5
6
7
    #==========================================================
    #根据设备取其下的测点信息
    tag_count = c4py.get_tag_count(device_info.dev_name)
    print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
    # 创建一个长度为 tag_count 的数组
    tag_info_array = (c4py.TagInfo * tag_count)()
    c4py.get_tag_array(device_info.dev_name, tag_info_array)

转发

构造转发报文

集合通道设备测点信息构造json格式消息体,具体构造可按需求更改。

Python
# 构造转发数据  sn为网关编号,chl_list为需要转发的通道列表
def build_send_buff(sn, chl_list): 
    dev = []
    # 遍历通道
    for chl_name in chl_list:
        timestr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
        dev_count = c4py.get_dev_count(chl_name.encode())
        print("通道名称:"+ chl_name + "; dev count:" + str(dev_count))
        device_info_array = (c4py.DeviceInfo * dev_count)()
        c4py.get_dev_array(chl_name.encode(), device_info_array)        
        # 遍历通道下的设备
        for device_info in device_info_array:
            tag_count = c4py.get_tag_count(device_info.dev_name)
            print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
            tag_info_array = (c4py.TagInfo * tag_count)()
            c4py.get_tag_array(device_info.dev_name, tag_info_array)
            tag = []
            # 遍历设备下的测点
            for tag_info in tag_info_array:
                tag_value = ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value
                tag_qos = ctypes.c_int.from_address(ctypes.addressof(tag_info.p_tag_quality.contents)).value
                if tag_qos == 0:
                    q = 1
                else:
                    q = 0
                tag.append({'name': tag_info.base.tag_name.decode(), 'q': q, 'v': tag_value})
            dev.append({'name': device_info.dev_name.decode(), 'tag': tag})
    send_buff = {'tm': timestr,'sn': sn, 'chl_name': "python转发通道", 'dev': dev}
    return send_buff

其它优化

1、原本读取到的质量为好时值为0,质量坏时值为100。现改成好为1,坏为0.

Python
1
2
3
4
5
6
7
    # 获取测点质量
    tag_qos = ctypes.c_int.from_address(ctypes.addressof(tag_info.p_tag_quality.contents)).value
    if tag_qos == 0:
        q = 1
    else:
        q = 0
    tag.append({'name': tag_info.base.tag_name.decode(), 'q': q, 'v': tag_value})

2、注册系统信号处理函数,来接收程序退出消息,以释放资源。

Python
#系统信号
app_exit_signal = 0

#系统信号处理函数
def handle_sigterm(signum, frame):
    global app_exit_signal
    app_exit_signal = signum

# 注册信号处理函数
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGUSR1, handle_sigterm)

c4py.py_load_so()
c4py.global_init()
while True:
    #do something
    if app_exit_signal != 0:
        print("app exit signal: [{}]".format(app_exit_signal))
        break
#释放资源
c4py.global_release()

完整源码

Python
import ctypes
import c4py
import signal
import ptvsd
import socket
import time
import json


# 系统信号
app_exit_signal = 0

# 系统信号处理函数
def handle_sigterm(signum, frame):
    global app_exit_signal
    app_exit_signal = signum

# 构造转发数据
def build_send_buff(sn, chl_list): 
    dev = []
    # 遍历通道
    for chl_name in chl_list:
        timestr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
        dev_count = c4py.get_dev_count(chl_name.encode())
        print("通道名称:"+ chl_name + "; dev count:" + str(dev_count))
        device_info_array = (c4py.DeviceInfo * dev_count)()
        c4py.get_dev_array(chl_name.encode(), device_info_array)        
        # 遍历通道下的设备
        for device_info in device_info_array:
            tag_count = c4py.get_tag_count(device_info.dev_name)
            print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
            tag_info_array = (c4py.TagInfo * tag_count)()
            c4py.get_tag_array(device_info.dev_name, tag_info_array)
            tag = []
            # 遍历设备下的测点
            for tag_info in tag_info_array:
                tag_value = ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value
                tag_qos = ctypes.c_int.from_address(ctypes.addressof(tag_info.p_tag_quality.contents)).value
                if tag_qos == 0:
                    q = 1
                else:
                    q = 0
                tag.append({'name': tag_info.base.tag_name.decode(), 'q': q, 'v': tag_value})
            dev.append({'name': device_info.dev_name.decode(), 'tag': tag})
    send_buff = {'tm': timestr,'sn': sn, 'chl_name': "python转发通道", 'dev': dev}
    return send_buff


if __name__ == "__main__":
    # 注册信号处理函数
    signal.signal(signal.SIGTERM, handle_sigterm)
    signal.signal(signal.SIGUSR1, handle_sigterm)
    #==========================================================
    #启动调试,等待调试器附加,正式运行时注释下述两行
    ptvsd.enable_attach(address=('0.0.0.0', 5678))
    ptvsd.wait_for_attach()
    #==========================================================
    #库初始化
    c4py.py_load_so()
    print("c4py version:" + c4py.C4PY_VERSION)
    c4py.global_init()
    #==========================================================
    sn = "80C306D021964E03"
    # 创建需要转发的通道
    chl_list = ["channel_1", "channel_2"]
    #==========================================================
    # 创建套接字客户端
    HOST = '目标地址'   # 地址为字符串
    PORT = 目标端口     # 端口
    BUF_SIZE = 2048
    ADDR = (HOST, PORT)
    client = socket.socket()
    # 连接
    client.connect(ADDR)
    while True:
        data = json.dumps(build_send_buff(sn, chl_list), ensure_ascii=False)  # 关闭ascii编码,防止出现乱码
        if not data:
            break
        # 转发
        client.send(data.encode('utf-8'))
        time.sleep(2)

        if app_exit_signal != 0:
            print("app exit signal: [{}]".format(app_exit_signal))
            break

    # 释放资源
    client.close()
    c4py.global_release()