python 实现Modebus 通信(pyModbusTCP )
admin 阅读: 2024-03-24
后台-插件-广告管理-内容页头部广告(手机) |
尽管python 并不适合实时控制,但是它编写程序实在是太方便了。至少在程序快速成型和验证过程中可以使用Python 来实现,另一方面,Python 强大的数值计算和图形显示能力也能够实现modbus 设备的远程监控,数据采集,甚至AI 训练。Python 在算法开法,仿真等场景是高效率的编程工具。
这里我们介绍Python 实现ModbusTCP的例子:
客户端(Client)
- from pyModbusTCP.client import ModbusClient # Modbus TCP Client
- import time
- from pyModbusTCP import utils
- import numpy as np
- import matplotlib.pyplot as plt
- # TCP auto connect on modbus request, close after it
- ModbusBMS = ModbusClient(host="localhost", port=502, unit_id=1, auto_open=True, auto_close=False)
- #time.sleep(5)
- vaw=1
- x = np.arange(0,1000,1,dtype=np.int16)
- y=np.arange(-10,10,0.02,dtype=np.float32)
- if __name__ == '__main__':
- while True:
- reg_l=ModbusBMS.read_input_registers(0,2)
- val=utils.word_list_to_long(reg_l)
- print(utils.decode_ieee(val[0],False))
- y=np.append(y,utils.decode_ieee(val[0],False))
- y=np.delete(y, 0, axis=0)
- plt.clf()
- plt.plot(x, y, ls="-", lw=2, label="plot figure")
- plt.legend()
- plt.show()
- plt.pause(0.01)
注意的是 auto_open和auto_close两个标记,它指明每次请求modbusTCP 时是否会自动打开和关闭TCP/IP 连接。如果auto_close=True 表示每次都会自动关闭连接,经测试,它会引起2秒中的延时。
上述例子中使用plt 显示实时数据,在这里是一sin 曲线。是np.append 和np.delete 维持一个实时数据队列。
ModbusBMS具有下列几种读取modbus 的方法:
- ModbusBMS.read_holding_registers
- ModbusBMS.read_input_registers
- ModbusBMS.read_coils
- ModbusBMS.read_discrete_inputs
- ModbusBMS.write_single_register
- ModbusBMS.write_single_coil
- ModbusBMS.write_multiple_registers
- ModbusBMS.write_multiple_coils
服务器端(Server)
- import argparse
- from pyModbusTCP.server import ModbusServer, DataBank
- from pyModbusTCP import utils
- from datetime import datetime
- import numpy as np
- Fs = 8000
- f = 50
- x=0
- coil_state=True
- class MyDataBank(DataBank):
- """A custom ModbusServerDataBank for override get_holding_registers method."""
- def __init__(self):
- # turn off allocation of memory for standard modbus object types
- # only "holding registers" space will be replaced by dynamic build values.
- super().__init__(virtual_mode=True)
- def get_coils(self, address, number=1, srv_info=None):
- global coil_state
- coil_state=not coil_state
- return coil_state
- def get_holding_registers(self, address, number=1, srv_info=None):
- """Get virtual holding registers."""
- # populate virtual registers dict with current datetime values
- now = datetime.now()
- return now.second
- def get_input_registers(self, address, number=1, srv_info=None):
- global x
- wave=np.sin(2 * np.pi * f * x / Fs)*10
- x=x+1
- b32_l=[utils.encode_ieee(wave,False)]
- b16_l = utils.long_list_to_word(b32_l)
- print(b16_l)
- return b16_l
- if __name__ == '__main__':
- # parse args
- parser = argparse.ArgumentParser()
- parser.add_argument('-H', '--host', type=str, default='localhost', help='Host (default: localhost)')
- parser.add_argument('-p', '--port', type=int, default=502, help='TCP port (default: 502)')
- args = parser.parse_args()
- # init modbus server and start it
- server = ModbusServer(host=args.host, port=args.port, data_bank=MyDataBank())
- server.start()
上面的程序都经过测试。
声明
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。
在线投稿:投稿 站长QQ:1888636
后台-插件-广告管理-内容页尾部广告(手机) |