跳转至

4.实现modbustcp采集

说明

举例现在有三个仪表需要采集,协议为modbustcp协议,地址分别为1、2、3,功能码03,寄存器地址从0开始,数据类型为16位无符号;

接下来我们用python来实现数据的采集。

创建采集工程

  • 新建通道

首先我们需要创建一个采集通道,驱动选择python采集,通道类型选择tcp/ip,并配置相应的ip参数。

  • 新建设备

在刚刚的采集通道下,新建一个设备,设备名称自定义,通讯地址填1(对应仪表的通讯地址)。

  • 新建测点

在刚刚的采集设备下,新建5个采集测点(按需配置测点数量),寄存器地址分别为3、5、7...,步长为2递增;

采集测点新建完后,我们将采集设备复制出来2份,通讯地址配置为2和3。

加载工程参数

取通道参数

使用get_chl_param函数,通过通道名称来获取通道的主参数,并打开目标端口,如下述片段所示:

提示

通过get_chl_param函数获取到参数,建议根据实际情况将IP_PORT参数写死。

Python
    #==========================================================
    #读取目标通道的主参数,并且根据参数打开端口
    chl_name = "python采集通道"
    uart_info = c4py.UartInfo()
    c4py.get_chl_param(chl_name.encode(),1,ctypes.pointer(uart_info))   # 根据需要添加语句
    # 创建套接字并连接
    IP_PORT = ("172.20.1.161", 502)
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.settimeout(3)
    client.connect(IP_PORT)
    print("通道名称:" + chl_name + ";IP端口:" + "172.20.2.137" + ";超時:" + str(3))
    #将通道状态设置为正常
    c4py.set_chl_status(chl_name.encode(),c4py.RUN_STAT_NORMAL)

取设备参数

首先使用get_dev_count函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_dev_array函数取得设备参数。

Python
1
2
3
4
5
6
7
    #==========================================================
    #根据通道取其下的设备信息
    dev_count = c4py.get_dev_count(chl_name.encode())
    print("dev count:" + str(dev_count))
    #创建一个长度为 dev_count 的数组
    device_info_array = (c4py.DeviceInfo * dev_count)()
    c4py.get_dev_array(chl_name.encode(), device_info_array)

采集与解析

构造请求报文

通过设备的tcp序号,通讯地址、功能码、寄存器起始地址、测点个数,来构造请求报文,可以根据实际情况精简传入的参数;

然后通过ser.write函数写到目标串口去。

Python
# 构造请求buff
def build_send_buff(tcp_ser_high, tcp_ser_low, dev_comm_addr, func_code, reg_addr, reg_len):
    send_buff = (ctypes.c_ubyte * 256)()
    idx = 0
    send_buff[idx] = tcp_ser_high    # tcp序号高位
    idx += 1
    send_buff[idx] = tcp_ser_low    # tcp序号低位
    idx += 1
    send_buff[idx] = 0
    idx += 3
    send_buff[idx] = 6
    idx += 1
    send_buff[idx] = ctypes.c_ubyte(int(dev_comm_addr))  # 从站地址
    # print(send_buff[idx])
    idx += 1
    send_buff[idx] = func_code  # 功能码
    idx += 1
    send_buff[idx] = reg_addr >> 8  # 寄存器地址高位
    idx += 1
    send_buff[idx] = reg_addr & 0xFF  # 寄存器地址低位
    idx += 1
    send_buff[idx] = reg_len >> 8  # 要读取的长度高位
    idx += 1
    send_buff[idx] = reg_len & 0xFF  # 要读取的长度低位
    idx += 1
    # print(send_buff[:idx])
    return send_buff[:idx]  # 返回有效长度为 12 的数组

读取报文并解析

使用ser.read函数读取仪表返回到串口上的报文,然后对报文的合法性进行校验,例如长度校验、从站地址校验、CRC校验等,按需精简或者加入更多校验;

报文合法性校验通过以后,即可对报文中的数据解析,并置入目标测点。

Python
#解析读到的buff
def parse_read_buff(device_info,read_buff,need_len):
    #打印recv_buff
    # print('接收<< ' + ' '.join(['%02X' % i for i in recv_buff])) 
    #长度校验
    if len(read_buff) != need_len:
        print("读取的长度和预期的长度不相等")
        return 0
    #地址校验
    if int(read_buff[6]) != int(device_info.dev_comm_addr):
        print("从站地址错误")
        return 0
    #可以再进行其它合法性校验,例如功能码是否匹配等

    #首字节索引为0,从索引9字节开始解析,总长为need_len,步长step为2
    step = 2
    for i in range(9, need_len, step):
        byte_pair = read_buff[i:i+step]
        #解析为大端,无符号,整型
        int_value = int.from_bytes(byte_pair, byteorder='big', signed=False)
        #解析为小端,无符号,整型
        int_value = int.from_bytes(byte_pair, byteorder='little', signed=False)

        #强转为double
        double_value = ctypes.c_double(int_value).value
        #设置到对应的测点
        c4py.dset_tag_value2(device_info.dev_name,str(i-6).encode(),double_value)
        # print('解析:: ' + ' '.join(['%02X' % i for i in byte_pair])) #打印byte_pair
    return 1

打印出来的报文如下图:

其它优化

1、可以根据parse_read_buff函数的返回值,来设置设备的通讯状态。

Python
1
2
3
4
5
6
    #解析数据
    ret = parse_read_buff(device_info,recv_buff,need_len)
    if (ret == 1):
        c4py.set_dev_status(device_info.dev_name,c4py.RUN_STAT_NORMAL)#设备状态正常
    else:
        c4py.set_dev_status(device_info.dev_name,c4py.RUN_STAT_FAULT)#设备状态异常

2、出现异常时,可以设置测点的质量戳,也可以选择是否清零测点的实时值。

Python
1
2
3
4
5
6
7
    #解析数据
    ret = parse_read_buff(device_info,recv_buff,need_len)
    if (ret == 1):
        c4py.set_dev_status(device_info.dev_name,c4py.RUN_STAT_NORMAL)#设备状态正常
    else:
        c4py.set_dev_status(device_info.dev_name,c4py.RUN_STAT_FAULT)#设备状态异常
        c4py.dset_tag_qos2(device_info.dev_name,b"",c4py.TAG_QUALITY_BAD,1)#设置质量戳bad,且清零测点的值

3、注册系统信号处理函数,来接收程序退出消息,以释放资源。

Python
#系统信号
app_exit_signal = 0

#系统信号处理函数
def handle_sigterm(signum, frame):
    global app_exit_signal
    app_exit_signal = signum

# 注册信号处理函数
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGUSR1, handle_sigterm)

c4py.py_load_so()
c4py.global_init()
while True:
    #do something
    if app_exit_signal != 0:
        print("app exit signal: [{}]".format(app_exit_signal))
        break
#释放资源
c4py.global_release()

tcp序号高低位参数说明

Python
# tcp序号高低位
low = 0
high = 0
# tcp低位序号[1,255]
def tcp_low():
    global low, high
    low += 1
    if low == 255:
        high += 1
    elif low == 256:
        low = 0
    return low
# tcp高位序号[0,255]
def tcp_high():
    global high
    if high == 256:
        high = 0
    return high

完整源码

Python
import socket
import ctypes
import c4py
import signal
import ptvsd


#系统信号
app_exit_signal = 0

#系统信号处理函数
def handle_sigterm(signum, frame):
    global app_exit_signal
    app_exit_signal = signum


# tcp序号高低位
low = 0
high = 0


def tcp_low():
    global low, high
    low += 1
    if low == 255:
        high += 1
    elif low == 256:
        low = 0
    return low


def tcp_high():
    global high
    if high == 256:
        high = 0
    return high
# 构造请求buff
def build_send_buff(tcp_ser_high, tcp_ser_low, dev_comm_addr, func_code, reg_addr, reg_len):
    send_buff = (ctypes.c_ubyte * 256)()
    idx = 0
    send_buff[idx] = tcp_ser_high    # tcp序号高位
    idx += 1
    send_buff[idx] = tcp_ser_low    # tcp序号低位
    idx += 1
    send_buff[idx] = 0
    idx += 3
    send_buff[idx] = 6
    idx += 1
    send_buff[idx] = ctypes.c_ubyte(int(dev_comm_addr))  # 从站地址
    # print(send_buff[idx])
    idx += 1
    send_buff[idx] = func_code  # 功能码
    idx += 1
    send_buff[idx] = reg_addr >> 8  # 寄存器地址高位
    idx += 1
    send_buff[idx] = reg_addr & 0xFF  # 寄存器地址低位
    idx += 1
    send_buff[idx] = reg_len >> 8  # 要读取的长度高位
    idx += 1
    send_buff[idx] = reg_len & 0xFF  # 要读取的长度低位
    idx += 1
    # print('发送>> ' + ' '.join(['%02X' % i for i in send_buff[:idx]])) #打印send_buff
    return send_buff[:idx]  # 返回有效长度为 12 的数组

#解析读到的buff
def parse_read_buff(device_info,read_buff,need_len):
    #打印recv_buff
    # print('接收<< ' + ' '.join(['%02X' % i for i in recv_buff])) 
    #长度校验
    if len(read_buff) != need_len:
        print("读取的长度和预期的长度不相等")
        return 0
    #地址校验
    if int(read_buff[6]) != int(device_info.dev_comm_addr):
        print("从站地址错误")
        return 0
    #可以再进行其它合法性校验,例如功能码是否匹配等

    #首字节索引为0,从索引9字节开始解析,总长为need_len,步长step为2
    step = 2
    for i in range(9, need_len, step):
        byte_pair = read_buff[i:i+step]
        #解析为大端,无符号,整型
        int_value = int.from_bytes(byte_pair, byteorder='big', signed=False)
        #解析为小端,无符号,整型
        # int_value = int.from_bytes(byte_pair, byteorder='little', signed=False)

        #强转为double
        double_value = ctypes.c_double(int_value).value
        # print(double_value)
        #设置到对应的测点
        c4py.dset_tag_value2(device_info.dev_name,str(i-6).encode(),double_value)
        print('解析:: ' + ' '.join(['%02X' % i for i in byte_pair])) #打印byte_pair
    return 1

if __name__ == "__main__":
    # 注册信号处理函数
    signal.signal(signal.SIGTERM, handle_sigterm)
    signal.signal(signal.SIGUSR1, handle_sigterm)
    #==========================================================
    #启动调试,等待调试器附加,正式运行时注释下述两行
    ptvsd.enable_attach(address=('0.0.0.0', 5678))
    ptvsd.wait_for_attach()
    #==========================================================
    #库初始化
    c4py.py_load_so()
    print("c4py version:" + c4py.C4PY_VERSION)
    c4py.global_init()
    #==========================================================
    #设置通道名称和对端IP
    chl_name = "python采集通道"
    uart_info = c4py.UartInfo()
    c4py.get_chl_param(chl_name.encode(),1,ctypes.pointer(uart_info))
    IP_PORT = ("172.20.1.161", 502)
    # 创建套接字并连接
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # client.settimeout(3)
    client.connect(IP_PORT)
    print("通道名称:" + chl_name + ";IP端口:" + "172.20.2.137" + ";超時:" + str(3))
    #将通道状态设置为正常
    c4py.set_chl_status(chl_name.encode(),c4py.RUN_STAT_NORMAL)
    #==========================================================
    #根据通道取其下的设备信息
    dev_count = c4py.get_dev_count(chl_name.encode())
    print("dev count:" + str(dev_count))
    #创建一个长度为 dev_count 的数组
    device_info_array = (c4py.DeviceInfo * dev_count)()
    c4py.get_dev_array(chl_name.encode(), device_info_array)
    #==========================================================
    #遍历设备,开始通信
    while True:
        for device_info in device_info_array:
            print("设备名称:" + device_info.dev_name.decode() + ";通讯地址:" + device_info.dev_comm_addr.decode() + ".")
            reg_addr = 0 #起始寄存器地址
            reg_len = 5 #要读取的寄存器个数
            need_len = reg_len * 2 + 9 #期望返回的长度
            #构造请求报文
            send_buff = build_send_buff(tcp_high(),tcp_low() ,device_info.dev_comm_addr, 0x03, reg_addr, reg_len)
            #发送数据
            client.send(bytes(send_buff))
            print("发送 >>" + ' '.join(['%02X' % i for i in send_buff]))
            # 接收数据
            client.settimeout(3)
            try:
                recv_buff = client.recv(need_len)
                print('接收<< ' + ' '.join(['%02X' % i for i in recv_buff]))
            except socket.timeout:
                recv_buff = b''
            #解析数据
            ret = parse_read_buff(device_info,recv_buff,need_len)
            if (ret == 1):
                c4py.set_dev_status(device_info.dev_name,c4py.RUN_STAT_NORMAL)#设备状态正常
            else:
                c4py.set_dev_status(device_info.dev_name,c4py.RUN_STAT_FAULT)#设备状态异常
                c4py.dset_tag_qos2(device_info.dev_name,b"",c4py.TAG_QUALITY_BAD,1)#设置质量戳bad,且清零测点的值
            #休息一个帧间隔
            c4py.msleep(50)

        #休息一个交互频率
        c4py.msleep(1000)
        if app_exit_signal != 0:
            print("app exit signal: [{}]".format(app_exit_signal))
            break

    # 关闭连接
    client.close
    c4py.global_release()