5.实现mqtt转发
说明
举例现在有两个通道需要转发,协议为任意两个采集协议,转发为mqtt转发,功能码为实时数据。
接下来我们用python来实现数据的转发。
创建转发工程
提示
通过python转发将不再创建通道。但需在代码中配置IP,端口参数。
配置mqtt转发IP,PORT参数
使用paho-mqtt模块,使用mqtt_connect
定义连接服务器参数。
提示
根据需要,自行按照mqttClient.tls_set()
函数添加证书。添加证书后必须执行mqttClient.tls_insecure_set(True)
语句。
Python |
---|
| #==========================================================
def mqtt_connect():
client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
mqttClient = mqtt.Client(client_id, transport='tcp') #for paho-mqtt==1.6.1
# mqttClient = mqtt.Client(client_id=client_id,protocol=mqtt.MQTTv311,transport='tcp') #for paho-mqtt==2.1.0
mqttClient.on_connect = on_connect
host = "47.97.230.45"
port = 4020
mqttClient.username_pw_set("", "")
mqttClient.tls_set(ca_certs='/data/project/1008-ca.pem', certfile='/data/project/1008.pem', keyfile='/data/project/1008.key')
mqttClient.tls_insecure_set(True)
mqttClient.connect(host, port, 60)
mqttClient.loop_start()
return mqttClient
|
加载工程参数
选择需转发的采集通道
创建一个列表,列表中输入需要转发的通道名称,要求字符串格式,并用逗号隔开。根据实际情况自行添加,示例为两个采集通道。如下述片段所示:
Python |
---|
| #==========================================================
chl_list = ['chal_LQ', 'python采集通道'] # 创建需要转发的通道
|
取设备参数
首先使用get_dev_count
函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_dev_array
函数取得设备参数。
Python |
---|
| #==========================================================
#根据采集通道取其下的设备信息
dev_count = c4py.get_dev_count(chl_name.encode())
print("通道名称:"+ chl_name + "; dev count:" + str(dev_count))
# 创建一个长度为 dev_count 的数组
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
|
取测点参数
首先使用get_tag_count
函数得到通道下的设备数量,然后创建相应数量的设备数组,再使用get_tag_array
函数取得设备参数。
Python |
---|
| #==========================================================
#根据设备取其下的测点信息
tag_count = c4py.get_tag_count(device_info.dev_name)
print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
# 创建一个长度为 tag_count 的数组
tag_info_array = (c4py.TagInfo * tag_count)()
c4py.get_tag_array(device_info.dev_name, tag_info_array)
|
转发
构造转发报文
通过mqtt接口文档构造消息体。示例中需要的参数有"tp","sn","tm","chl_name","dev"。实际根据所用mqtt的数据格式自行修改。
Python |
---|
| # 构造mqtt发布数据
def build_send_buff(type, sn):
dev = []
chl_list = ['chal_LQ', 'python采集通道'] # 创建需要转发的通道
# 遍历通道
for chl_name in chl_list:
timestr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
#根据采集通道取其下的设备信息
dev_count = c4py.get_dev_count(chl_name.encode())
print("通道名称:"+ chl_name + "; dev count:" + str(dev_count))
# 创建一个长度为 dev_count 的数组
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
#==========================================================
# 遍历通道下的设备
for device_info in device_info_array:
tag_count = c4py.get_tag_count(device_info.dev_name)
print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
# 创建一个长度为 tag_count 的数组
tag_info_array = (c4py.TagInfo * tag_count)()
c4py.get_tag_array(device_info.dev_name, tag_info_array)
tag = []
# 遍历设备下的测点
for tag_info in tag_info_array:
# 获取测点值
tag_value = ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value
# 获取测点质量
tag_qos = ctypes.c_int.from_address(ctypes.addressof(tag_info.p_tag_quality.contents)).value
if tag_qos == 0:
q = 1
else:
q = 0
tag.append({'name': tag_info.base.tag_name.decode(), 'q': q, 'v': tag_value})
dev.append({'name': device_info.dev_name.decode(), 'tag': tag})
send_buff = {'tp': type,'sn': sn, 'tm': timestr, 'chl_name': "python转发通道", 'dev': dev}
return send_buff
|
用mqtt.fx连接服务器后订阅到的消息如下图:
其它优化
1、原本读取到的质量为好时值为0,质量坏时值为100。现改成好为1,坏为0.
Python |
---|
| # 获取测点质量
tag_qos = ctypes.c_int.from_address(ctypes.addressof(tag_info.p_tag_quality.contents)).value
if tag_qos == 0:
q = 1
else:
q = 0
tag.append({'name': tag_info.base.tag_name.decode(), 'q': q, 'v': tag_value})
|
2、注册系统信号处理函数,来接收程序退出消息,以释放资源。
Python |
---|
| #系统信号
app_exit_signal = 0
#系统信号处理函数
def handle_sigterm(signum, frame):
global app_exit_signal
app_exit_signal = signum
# 注册信号处理函数
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGUSR1, handle_sigterm)
c4py.py_load_so()
c4py.global_init()
while True:
#do something
if app_exit_signal != 0:
print("app exit signal: [{}]".format(app_exit_signal))
break
#释放资源
c4py.global_release()
|
完整源码
Python |
---|
| import ctypes
import c4py
import signal
import ptvsd
import paho.mqtt.client as mqtt
import json
import time
# 系统信号
app_exit_signal = 0
# 系统信号处理函数
def handle_sigterm(signum, frame):
global app_exit_signal
app_exit_signal = signum
# 回调函数,获取mqtt连接状态
def on_connect(client, userdata, flags, rc):
rc_status = ["连接成功", "协议版本错误", "无效客户端", "服务器无法使用", "用户名密码错误", "无授权"]
result = "connet:" + rc_status[rc]
if rc == 1:
# 将通道状态设置为正常
print("connect:" + rc_status[rc])
c4py.set_chl_status(chl_name.encode(),c4py.RUN_STAT_NORMAL)
else:
print("connect:" + rc_status[rc])
# 连接服务器
def mqtt_connect():
client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
mqttClient = mqtt.Client(client_id, transport='tcp') #for paho-mqtt==1.6.1
# mqttClient = mqtt.Client(client_id=client_id,protocol=mqtt.MQTTv311,transport='tcp') #for paho-mqtt==2.1.0
mqttClient.on_connect = on_connect
host = "47.97.230.45"
port = 4020
mqttClient.username_pw_set("", "")
# mqttClient.tls_insecure_set(True)
mqttClient.tls_set(ca_certs='/data/project/1008-ca.pem', certfile='/data/project/1008.pem', keyfile='/data/project/1008.key')
mqttClient.tls_insecure_set(True)
mqttClient.connect(host, port, 60)
mqttClient.loop_start()
return mqttClient
# 构造mqtt发布数据
def build_send_buff(type, sn):
dev = []
chl_list = ['chal_LQ', 'python采集通道'] # 创建需要转发的通道
# 遍历通道
for chl_name in chl_list:
timestr = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
#根据采集通道取其下的设备信息
dev_count = c4py.get_dev_count(chl_name.encode())
print("通道名称:"+ chl_name + "; dev count:" + str(dev_count))
# 创建一个长度为 dev_count 的数组
device_info_array = (c4py.DeviceInfo * dev_count)()
c4py.get_dev_array(chl_name.encode(), device_info_array)
#==========================================================
# 遍历通道下的设备
for device_info in device_info_array:
tag_count = c4py.get_tag_count(device_info.dev_name)
print("设备名称:"+ device_info.dev_name.decode() + ";tag count:"+ str(tag_count))
# 创建一个长度为 tag_count 的数组
tag_info_array = (c4py.TagInfo * tag_count)()
c4py.get_tag_array(device_info.dev_name, tag_info_array)
tag = []
# 遍历设备下的测点
for tag_info in tag_info_array:
# 获取测点值
tag_value = ctypes.c_float.from_address(ctypes.addressof(tag_info.p_tag_value.contents)).value
# 获取测点质量
tag_qos = ctypes.c_int.from_address(ctypes.addressof(tag_info.p_tag_quality.contents)).value
if tag_qos == 0:
q = 1
else:
q = 0
tag.append({'name': tag_info.base.tag_name.decode(), 'q': q, 'v': tag_value})
dev.append({'name': device_info.dev_name.decode(), 'tag': tag})
send_buff = {'tp': type,'sn': sn, 'tm': timestr, 'chl_name': "python转发通道", 'dev': dev}
return send_buff
if __name__ == "__main__":
# 注册信号处理函数
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGUSR1, handle_sigterm)
#==========================================================
#启动调试,等待调试器附加,正式运行时注释下述两行
ptvsd.enable_attach(address=('0.0.0.0', 5678))
ptvsd.wait_for_attach()
#==========================================================
#库初始化
c4py.py_load_so()
print("c4py version:" + c4py.C4PY_VERSION)
c4py.global_init()
#==========================================================
chl_name = "python采集通道"
# 设置参数 连接mqtt服务器
mqttClient = mqtt_connect()
c4py.set_chl_status(chl_name.encode(),c4py.RUN_STAT_NORMAL)
print("通道名称:" + chl_name + ";IP端口:" + "47.97.230.45:4020" + ";超时:" + str(5))
sn = '80C306D021964E03'
type = 2
while True:
send_buff = build_send_buff(type, sn)
print(json.dumps(send_buff))
# 发布
mqttClient.publish("/lqgyy/pub/{}".format(sn), json.dumps(send_buff), 2)
c4py.msleep(5000)
if app_exit_signal != 0:
print("app exit signal: [{}]".format(app_exit_signal))
break
# 关闭连接
mqttClient.loop_stop()
c4py.global_release()
|