9.实现http转发
说明
举例现在有两个通道需要转发,协议为任意两个采集协议,转发为http转发。
接下来我们用python来实现数据的转发。
创建转发工程
提示
通过python转发将不再创建通道。但需在代码中配置IP,端口参数。
配置http转发IP,PORT参数
使用socket模块,定义连接服务器。
Python |
---|
| #==========================================================
HOST = '目标地址' # 服务器地址
PORT = 目标端口 # 服务器端口
BUF_SIZE = 2048
ADDR = (HOST, PORT) # 需组成元组
client = socket.socket()
# 连接
client.connect(ADDR)
|
加载工程参数
选择需转发的采集通道
创建一个列表,列表中输入需要转发的通道名称,要求字符串格式,并用逗号隔开。根据实际情况自行添加,示例为两个采集通道。如下述片段所示:
Python |
---|
| #==========================================================
chl_list = ['channel_1', 'channel_2'] # 创建需要转发的通道名称列表
|
取设备参数
首先使用get_dev_count
函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_dev_array
函数取得设备参数。
Python |
---|
| #==========================================================
#根据采集通道取其下的设备信息
dev_count = c4py.get_dev_count(chl_name.encode())
print("通道名称:"+ chl_name + "; dev count:" + str(dev_count))
# 创建一个长度为 dev_count 的数组
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
|
取测点参数
首先使用get_tag_count
函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_tag_array
函数取得设备参数。
Python |
---|
| #==========================================================
#根据设备取其下的测点信息
tag_count = c4py.get_tag_count(device_info.dev_name)
print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
# 创建一个长度为 tag_count 的数组
tag_info_array = (c4py.TagInfo * tag_count)()
c4py.get_tag_array(device_info.dev_name, tag_info_array)
|
转发
构造转发报文
集合通道设备测点信息构造json格式消息体,具体构造可按需求更改。
Python |
---|
| # 构造转发数据 sn为网关编号,chl_list为需要转发的通道列表
def build_send_buff(sn, chl_list):
dev = []
# 遍历通道
for chl_name in chl_list:
timestr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
dev_count = c4py.get_dev_count(chl_name.encode())
print("通道名称:"+ chl_name + "; dev count:" + str(dev_count))
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
# 遍历通道下的设备
for device_info in device_info_array:
tag_count = c4py.get_tag_count(device_info.dev_name)
print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
tag_info_array = (c4py.TagInfo * tag_count)()
c4py.get_tag_array(device_info.dev_name, tag_info_array)
tag = []
# 遍历设备下的测点
for tag_info in tag_info_array:
tag_value = ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value
tag_qos = ctypes.c_int.from_address(ctypes.addressof(tag_info.p_tag_quality.contents)).value
if tag_qos == 0:
q = 1
else:
q = 0
tag.append({'name': tag_info.base.tag_name.decode(), 'q': q, 'v': tag_value})
dev.append({'name': device_info.dev_name.decode(), 'tag': tag})
send_buff = {'tm': timestr,'sn': sn, 'chl_name': "python转发通道", 'dev': dev}
return send_buff
|
其它优化
1、原本读取到的质量为好时值为0,质量坏时值为100。现改成好为1,坏为0.
Python |
---|
| # 获取测点质量
tag_qos = ctypes.c_int.from_address(ctypes.addressof(tag_info.p_tag_quality.contents)).value
if tag_qos == 0:
q = 1
else:
q = 0
tag.append({'name': tag_info.base.tag_name.decode(), 'q': q, 'v': tag_value})
|
2、注册系统信号处理函数,来接收程序退出消息,以释放资源。
Python |
---|
| #系统信号
app_exit_signal = 0
#系统信号处理函数
def handle_sigterm(signum, frame):
global app_exit_signal
app_exit_signal = signum
# 注册信号处理函数
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGUSR1, handle_sigterm)
c4py.py_load_so()
c4py.global_init()
while True:
#do something
if app_exit_signal != 0:
print("app exit signal: [{}]".format(app_exit_signal))
break
#释放资源
c4py.global_release()
|
完整源码
Python |
---|
| import ctypes
import c4py
import signal
import ptvsd
import socket
import time
import json
# 系统信号
app_exit_signal = 0
# 系统信号处理函数
def handle_sigterm(signum, frame):
global app_exit_signal
app_exit_signal = signum
# 构造转发数据
def build_send_buff(sn, chl_list):
dev = []
# 遍历通道
for chl_name in chl_list:
timestr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
dev_count = c4py.get_dev_count(chl_name.encode())
print("通道名称:"+ chl_name + "; dev count:" + str(dev_count))
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
# 遍历通道下的设备
for device_info in device_info_array:
tag_count = c4py.get_tag_count(device_info.dev_name)
print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
tag_info_array = (c4py.TagInfo * tag_count)()
c4py.get_tag_array(device_info.dev_name, tag_info_array)
tag = []
# 遍历设备下的测点
for tag_info in tag_info_array:
tag_value = ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value
tag_qos = ctypes.c_int.from_address(ctypes.addressof(tag_info.p_tag_quality.contents)).value
if tag_qos == 0:
q = 1
else:
q = 0
tag.append({'name': tag_info.base.tag_name.decode(), 'q': q, 'v': tag_value})
dev.append({'name': device_info.dev_name.decode(), 'tag': tag})
send_buff = {'tm': timestr,'sn': sn, 'chl_name': "python转发通道", 'dev': dev}
return send_buff
if __name__ == "__main__":
# 注册信号处理函数
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGUSR1, handle_sigterm)
#==========================================================
#启动调试,等待调试器附加,正式运行时注释下述两行
ptvsd.enable_attach(address=('0.0.0.0', 5678))
ptvsd.wait_for_attach()
#==========================================================
#库初始化
c4py.py_load_so()
print("c4py version:" + c4py.C4PY_VERSION)
c4py.global_init()
#==========================================================
sn = "80C306D021964E03"
# 创建需要转发的通道
chl_list = ["channel_1", "channel_2"]
#==========================================================
# 创建套接字客户端
HOST = '目标地址' # 地址为字符串
PORT = 目标端口 # 端口
BUF_SIZE = 2048
ADDR = (HOST, PORT)
client = socket.socket()
# 连接
client.connect(ADDR)
while True:
data = json.dumps(build_send_buff(sn, chl_list), ensure_ascii=False) # 关闭ascii编码,防止出现乱码
if not data:
break
# 转发
client.send(data.encode('utf-8'))
time.sleep(2)
if app_exit_signal != 0:
print("app exit signal: [{}]".format(app_exit_signal))
break
# 释放资源
client.close()
c4py.global_release()
|