From c6987f6f67b31f7b4f55ab4d080ee8ca6b1cdfc6 Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Wed, 11 May 2022 12:03:15 -0400 Subject: [PATCH] #47 RTU luadoc, bugfixes --- rtu/modbus.lua | 56 +++++++++++---- rtu/rtu.lua | 183 ++++++++++++++++++++++++++++-------------------- rtu/startup.lua | 18 +++-- rtu/threads.lua | 54 +++++++------- 4 files changed, 192 insertions(+), 119 deletions(-) diff --git a/rtu/modbus.lua b/rtu/modbus.lua index b64910f..0e5e16c 100644 --- a/rtu/modbus.lua +++ b/rtu/modbus.lua @@ -7,14 +7,22 @@ local MODBUS_FCODE = types.MODBUS_FCODE local MODBUS_EXCODE = types.MODBUS_EXCODE -- new modbus comms handler object +---@param rtu_dev rtu RTU device +---@param use_parallel_read boolean whether or not to use parallel calls when reading modbus.new = function (rtu_dev, use_parallel_read) local self = { rtu = rtu_dev, use_parallel = use_parallel_read } + ---@class modbus + local public = {} + local insert = table.insert + ---@param c_addr_start integer + ---@param count integer + ---@return boolean ok, table readings local _1_read_coils = function (c_addr_start, count) local tasks = {} local readings = {} @@ -58,6 +66,9 @@ modbus.new = function (rtu_dev, use_parallel_read) return return_ok, readings end + ---@param di_addr_start integer + ---@param count integer + ---@return boolean ok, table readings local _2_read_discrete_inputs = function (di_addr_start, count) local tasks = {} local readings = {} @@ -101,6 +112,9 @@ modbus.new = function (rtu_dev, use_parallel_read) return return_ok, readings end + ---@param hr_addr_start integer + ---@param count integer + ---@return boolean ok, table readings local _3_read_multiple_holding_registers = function (hr_addr_start, count) local tasks = {} local readings = {} @@ -144,6 +158,9 @@ modbus.new = function (rtu_dev, use_parallel_read) return return_ok, readings end + ---@param ir_addr_start integer + ---@param count integer + ---@return boolean ok, table readings local _4_read_input_registers = function (ir_addr_start, count) local tasks = {} local readings = {} @@ -187,6 +204,9 @@ modbus.new = function (rtu_dev, use_parallel_read) return return_ok, readings end + ---@param c_addr integer + ---@param value any + ---@return boolean ok, MODBUS_EXCODE|nil local _5_write_single_coil = function (c_addr, value) local response = nil local _, coils, _, _ = self.rtu.io_count() @@ -206,6 +226,9 @@ modbus.new = function (rtu_dev, use_parallel_read) return return_ok, response end + ---@param hr_addr integer + ---@param value any + ---@return boolean ok, MODBUS_EXCODE|nil local _6_write_single_holding_register = function (hr_addr, value) local response = nil local _, _, _, hold_regs = self.rtu.io_count() @@ -222,9 +245,12 @@ modbus.new = function (rtu_dev, use_parallel_read) response = MODBUS_EXCODE.ILLEGAL_DATA_ADDR end - return return_ok + return return_ok, response end + ---@param c_addr_start integer + ---@param values any + ---@return boolean ok, MODBUS_EXCODE|nil local _15_write_multiple_coils = function (c_addr_start, values) local response = nil local _, coils, _, _ = self.rtu.io_count() @@ -249,6 +275,9 @@ modbus.new = function (rtu_dev, use_parallel_read) return return_ok, response end + ---@param hr_addr_start integer + ---@param values any + ---@return boolean ok, MODBUS_EXCODE|nil local _16_write_multiple_holding_registers = function (hr_addr_start, values) local response = nil local _, _, _, hold_regs = self.rtu.io_count() @@ -274,7 +303,9 @@ modbus.new = function (rtu_dev, use_parallel_read) end -- validate a request without actually executing it - local check_request = function (packet) + ---@param packet modbus_frame + ---@return boolean return_code, modbus_packet reply + public.check_request = function (packet) local return_code = true local response = { MODBUS_EXCODE.ACKNOWLEDGE } @@ -314,7 +345,9 @@ modbus.new = function (rtu_dev, use_parallel_read) end -- handle a MODBUS TCP packet and generate a reply - local handle_packet = function (packet) + ---@param packet modbus_frame + ---@return boolean return_code, modbus_packet reply + public.handle_packet = function (packet) local return_code = true local response = nil @@ -369,7 +402,8 @@ modbus.new = function (rtu_dev, use_parallel_read) end -- return a SERVER_DEVICE_BUSY error reply - local reply__srv_device_busy = function (packet) + ---@return modbus_packet reply + public.reply__srv_device_busy = function (packet) -- reply back with error flag and exception code local reply = comms.modbus_packet() local fcode = bit.bor(packet.func_code, MODBUS_FCODE.ERROR_FLAG) @@ -379,7 +413,8 @@ modbus.new = function (rtu_dev, use_parallel_read) end -- return a NEG_ACKNOWLEDGE error reply - local reply__neg_ack = function (packet) + ---@return modbus_packet reply + public.reply__neg_ack = function (packet) -- reply back with error flag and exception code local reply = comms.modbus_packet() local fcode = bit.bor(packet.func_code, MODBUS_FCODE.ERROR_FLAG) @@ -389,7 +424,8 @@ modbus.new = function (rtu_dev, use_parallel_read) end -- return a GATEWAY_PATH_UNAVAILABLE error reply - local reply__gw_unavailable = function (packet) + ---@return modbus_packet reply + public.reply__gw_unavailable = function (packet) -- reply back with error flag and exception code local reply = comms.modbus_packet() local fcode = bit.bor(packet.func_code, MODBUS_FCODE.ERROR_FLAG) @@ -398,13 +434,7 @@ modbus.new = function (rtu_dev, use_parallel_read) return reply end - return { - check_request = check_request, - handle_packet = handle_packet, - reply__srv_device_busy = reply__srv_device_busy, - reply__neg_ack = reply__neg_ack, - reply__gw_unavailable = reply__gw_unavailable - } + return public end return modbus diff --git a/rtu/rtu.lua b/rtu/rtu.lua index f7b0432..ab47b0a 100644 --- a/rtu/rtu.lua +++ b/rtu/rtu.lua @@ -19,6 +19,7 @@ local println = util.println local print_ts = util.print_ts local println_ts = util.println_ts +-- create a new RTU rtu.init_unit = function () local self = { discrete_inputs = {}, @@ -28,28 +29,36 @@ rtu.init_unit = function () io_count_cache = { 0, 0, 0, 0 } } + ---@class rtu + local public = {} + local insert = table.insert local _count_io = function () self.io_count_cache = { #self.discrete_inputs, #self.coils, #self.input_regs, #self.holding_regs } end - -- return : IO count table - local io_count = function () + -- return IO counts + ---@return integer discrete_inputs, integer coils, integer input_regs, integer holding_regs + public.io_count = function () return self.io_count_cache[0], self.io_count_cache[1], self.io_count_cache[2], self.io_count_cache[3] end -- discrete inputs: single bit read-only - -- return : count of discrete inputs - local connect_di = function (f) + -- connect discrete input + ---@param f function + ---@return integer count count of discrete inputs + public.connect_di = function (f) insert(self.discrete_inputs, f) _count_io() return #self.discrete_inputs end - -- return : value, access fault - local read_di = function (di_addr) + -- read discrete input + ---@param di_addr integer + ---@return any value, boolean access_fault + public.read_di = function (di_addr) ppm.clear_fault() local value = self.discrete_inputs[di_addr]() return value, ppm.is_faulted() @@ -57,22 +66,30 @@ rtu.init_unit = function () -- coils: single bit read-write - -- return : count of coils - local connect_coil = function (f_read, f_write) + -- connect coil + ---@param f_read function + ---@param f_write function + ---@return integer count count of coils + public.connect_coil = function (f_read, f_write) insert(self.coils, { read = f_read, write = f_write }) _count_io() return #self.coils end - -- return : value, access fault - local read_coil = function (coil_addr) + -- read coil + ---@param coil_addr integer + ---@return any value, boolean access_fault + public.read_coil = function (coil_addr) ppm.clear_fault() local value = self.coils[coil_addr].read() return value, ppm.is_faulted() end - -- return : access fault - local write_coil = function (coil_addr, value) + -- write coil + ---@param coil_addr integer + ---@param value any + ---@return boolean access_fault + public.write_coil = function (coil_addr, value) ppm.clear_fault() self.coils[coil_addr].write(value) return ppm.is_faulted() @@ -80,15 +97,19 @@ rtu.init_unit = function () -- input registers: multi-bit read-only - -- return : count of input registers - local connect_input_reg = function (f) + -- connect input register + ---@param f function + ---@return integer count count of input registers + public.connect_input_reg = function (f) insert(self.input_regs, f) _count_io() return #self.input_regs end - -- return : value, access fault - local read_input_reg = function (reg_addr) + -- read input register + ---@param reg_addr integer + ---@return any value, boolean access_fault + public.read_input_reg = function (reg_addr) ppm.clear_fault() local value = self.coils[reg_addr]() return value, ppm.is_faulted() @@ -96,42 +117,43 @@ rtu.init_unit = function () -- holding registers: multi-bit read-write - -- return : count of holding registers - local connect_holding_reg = function (f_read, f_write) + -- connect holding register + ---@param f_read function + ---@param f_write function + ---@return integer count count of holding registers + public.connect_holding_reg = function (f_read, f_write) insert(self.holding_regs, { read = f_read, write = f_write }) _count_io() return #self.holding_regs end - -- return : value, access fault - local read_holding_reg = function (reg_addr) + -- read holding register + ---@param reg_addr integer + ---@return any value, boolean access_fault + public.read_holding_reg = function (reg_addr) ppm.clear_fault() local value = self.coils[reg_addr].read() return value, ppm.is_faulted() end - -- return : access fault - local write_holding_reg = function (reg_addr, value) + -- write holding register + ---@param reg_addr integer + ---@param value any + ---@return boolean access_fault + public.write_holding_reg = function (reg_addr, value) ppm.clear_fault() self.coils[reg_addr].write(value) return ppm.is_faulted() end - return { - io_count = io_count, - connect_di = connect_di, - read_di = read_di, - connect_coil = connect_coil, - read_coil = read_coil, - write_coil = write_coil, - connect_input_reg = connect_input_reg, - read_input_reg = read_input_reg, - connect_holding_reg = connect_holding_reg, - read_holding_reg = read_holding_reg, - write_holding_reg = write_holding_reg - } + return public end +-- RTU Communications +---@param modem table +---@param local_port integer +---@param server_port integer +---@param conn_watchdog watchdog rtu.comms = function (modem, local_port, server_port, conn_watchdog) local self = { seq_num = 0, @@ -143,6 +165,9 @@ rtu.comms = function (modem, local_port, server_port, conn_watchdog) conn_watchdog = conn_watchdog } + ---@class rtu_comms + local public = {} + local insert = table.insert -- open modem @@ -152,6 +177,9 @@ rtu.comms = function (modem, local_port, server_port, conn_watchdog) -- PRIVATE FUNCTIONS -- + -- send a scada management packet + ---@param msg_type SCADA_MGMT_TYPES + ---@param msg any local _send = function (msg_type, msg) local s_pkt = comms.scada_packet() local m_pkt = comms.mgmt_packet() @@ -164,6 +192,7 @@ rtu.comms = function (modem, local_port, server_port, conn_watchdog) end -- keep alive ack + ---@param srv_time integer local _send_keep_alive_ack = function (srv_time) _send(SCADA_MGMT_TYPES.KEEP_ALIVE, { srv_time, util.time() }) end @@ -171,7 +200,8 @@ rtu.comms = function (modem, local_port, server_port, conn_watchdog) -- PUBLIC FUNCTIONS -- -- send a MODBUS TCP packet - local send_modbus = function (m_pkt) + ---@param m_pkt modbus_packet + public.send_modbus = function (m_pkt) local s_pkt = comms.scada_packet() s_pkt.make(self.seq_num, PROTOCOLS.MODBUS_TCP, m_pkt.raw_sendable()) self.modem.transmit(self.s_port, self.l_port, s_pkt.raw_sendable()) @@ -179,7 +209,9 @@ rtu.comms = function (modem, local_port, server_port, conn_watchdog) end -- reconnect a newly connected modem - local reconnect_modem = function (modem) + ---@param modem table +---@diagnostic disable-next-line: redefined-local + public.reconnect_modem = function (modem) self.modem = modem -- open modem @@ -189,42 +221,43 @@ rtu.comms = function (modem, local_port, server_port, conn_watchdog) end -- unlink from the server - local unlink = function (rtu_state) + ---@param rtu_state rtu_state + public.unlink = function (rtu_state) rtu_state.linked = false self.r_seq_num = nil end -- close the connection to the server - local close = function (rtu_state) + ---@param rtu_state rtu_state + public.close = function (rtu_state) self.conn_watchdog.cancel() - unlink(rtu_state) + public.unlink(rtu_state) _send(SCADA_MGMT_TYPES.CLOSE, {}) end -- send capability advertisement - local send_advertisement = function (units) + ---@param units table + public.send_advertisement = function (units) local advertisement = {} for i = 1, #units do - local unit = units[i] + local unit = units[i] ---@type rtu_unit_registry_entry local type = comms.rtu_t_to_advert_type(unit.type) if type ~= nil then + ---@class rtu_advertisement + local advert = { + type = type, ---@type integer + index = unit.index, ---@type integer + reactor = unit.reactor, ---@type integer + rsio = nil ---@type table|nil + } + if type == RTU_ADVERT_TYPES.REDSTONE then - insert(advertisement, { - type = type, - index = unit.index, - reactor = unit.for_reactor, - rsio = unit.device - }) - else - insert(advertisement, { - type = type, - index = unit.index, - reactor = unit.for_reactor, - rsio = nil - }) + advert.rsio = unit.device end + + insert(advertisement, advert) end end @@ -232,7 +265,13 @@ rtu.comms = function (modem, local_port, server_port, conn_watchdog) end -- parse a MODBUS/SCADA packet - local parse_packet = function(side, sender, reply_to, message, distance) + ---@param side string + ---@param sender integer + ---@param reply_to integer + ---@param message any + ---@param distance integer + ---@return modbus_frame|mgmt_frame|nil packet + public.parse_packet = function(side, sender, reply_to, message, distance) local pkt = nil local s_pkt = comms.scada_packet() @@ -261,10 +300,11 @@ rtu.comms = function (modem, local_port, server_port, conn_watchdog) end -- handle a MODBUS/SCADA packet - local handle_packet = function(packet, units, rtu_state) + ---@param packet modbus_frame|mgmt_frame + ---@param units table + ---@param rtu_state rtu_state + public.handle_packet = function(packet, units, rtu_state) if packet ~= nil then - local seq_ok = true - -- check sequence number if self.r_seq_num == nil then self.r_seq_num = packet.scada_frame.seq_num() @@ -281,26 +321,27 @@ rtu.comms = function (modem, local_port, server_port, conn_watchdog) local protocol = packet.scada_frame.protocol() if protocol == PROTOCOLS.MODBUS_TCP then + local return_code = false local reply = modbus.reply__neg_ack(packet) -- handle MODBUS instruction if packet.unit_id <= #units then - local unit = units[packet.unit_id] + local unit = units[packet.unit_id] ---@type rtu_unit_registry_entry if unit.name == "redstone_io" then -- immediately execute redstone RTU requests - local return_code, reply = unit.modbus_io.handle_packet(packet) + return_code, reply = unit.modbus_io.handle_packet(packet) if not return_code then log.warning("requested MODBUS operation failed") end else -- check validity then pass off to unit comms thread - local return_code, reply = unit.modbus_io.check_request(packet) + return_code, reply = unit.modbus_io.check_request(packet) if return_code then -- check if an operation is already in progress for this unit if unit.modbus_busy then reply = unit.modbus_io.reply__srv_device_busy(packet) else - unit.pkt_queue.push(packet) + unit.pkt_queue.push_packet(packet) end else log.warning("cannot perform requested MODBUS operation") @@ -312,7 +353,7 @@ rtu.comms = function (modem, local_port, server_port, conn_watchdog) log.error("MODBUS packet requesting non-existent unit") end - send_modbus(reply) + public.send_modbus(reply) elseif protocol == PROTOCOLS.SCADA_MGMT then -- SCADA management packet if packet.type == SCADA_MGMT_TYPES.KEEP_ALIVE then @@ -334,7 +375,7 @@ rtu.comms = function (modem, local_port, server_port, conn_watchdog) elseif packet.type == SCADA_MGMT_TYPES.CLOSE then -- close connection self.conn_watchdog.cancel() - unlink(rtu_state) + public.unlink(rtu_state) println_ts("server connection closed by remote host") log.warning("server connection closed by remote host") elseif packet.type == SCADA_MGMT_TYPES.REMOTE_LINKED then @@ -343,7 +384,7 @@ rtu.comms = function (modem, local_port, server_port, conn_watchdog) self.r_seq_num = nil elseif packet.type == SCADA_MGMT_TYPES.RTU_ADVERT then -- request for capabilities again - send_advertisement(units) + public.send_advertisement(units) else -- not supported log.warning("RTU got unexpected SCADA message type " .. packet.type) @@ -355,15 +396,7 @@ rtu.comms = function (modem, local_port, server_port, conn_watchdog) end end - return { - send_modbus = send_modbus, - reconnect_modem = reconnect_modem, - parse_packet = parse_packet, - handle_packet = handle_packet, - send_advertisement = send_advertisement, - unlink = unlink, - close = close - } + return public end return rtu diff --git a/rtu/startup.lua b/rtu/startup.lua index ad31460..3b34670 100644 --- a/rtu/startup.lua +++ b/rtu/startup.lua @@ -22,7 +22,7 @@ local imatrix_rtu = require("rtu.dev.imatrix_rtu") local turbine_rtu = require("rtu.dev.turbine_rtu") local turbinev_rtu = require("rtu.dev.turbinev_rtu") -local RTU_VERSION = "alpha-v0.6.2" +local RTU_VERSION = "alpha-v0.6.3" local rtu_t = types.rtu_t @@ -45,8 +45,10 @@ println(">> RTU " .. RTU_VERSION .. " <<") -- mount connected devices ppm.mount_all() +---@class rtu_shared_memory local __shared_memory = { -- RTU system state flags + ---@class rtu_state rtu_state = { linked = false, shutdown = false @@ -59,9 +61,9 @@ local __shared_memory = { -- system objects rtu_sys = { - rtu_comms = nil, - conn_watchdog = nil, - units = {} + rtu_comms = nil, ---@type rtu_comms + conn_watchdog = nil, ---@type watchdog + units = {} ---@type table }, -- message queues @@ -140,7 +142,8 @@ for reactor_idx = 1, #rtu_redstone do end end - table.insert(units, { + ---@class rtu_unit_registry_entry + local unit = { name = "redstone_io", type = rtu_t.redstone, index = 1, @@ -151,7 +154,9 @@ for reactor_idx = 1, #rtu_redstone do modbus_busy = false, pkt_queue = nil, thread = nil - }) + } + + table.insert(units, unit) log.debug("init> initialized RTU unit #" .. #units .. ": redstone_io (redstone) [1] for reactor " .. rtu_redstone[reactor_idx].for_reactor) end @@ -201,6 +206,7 @@ for i = 1, #rtu_devices do end if rtu_iface ~= nil then + ---@class rtu_unit_registry_entry local rtu_unit = { name = rtu_devices[i].name, type = rtu_type, diff --git a/rtu/threads.lua b/rtu/threads.lua index 447178b..27f5c27 100644 --- a/rtu/threads.lua +++ b/rtu/threads.lua @@ -1,11 +1,9 @@ -local comms = require("scada-common.comms") local log = require("scada-common.log") local mqueue = require("scada-common.mqueue") local ppm = require("scada-common.ppm") local types = require("scada-common.types") local util = require("scada-common.util") -local redstone_rtu = require("rtu.dev.redstone_rtu") local boiler_rtu = require("rtu.dev.boiler_rtu") local boilerv_rtu = require("rtu.dev.boilerv_rtu") local energymachine_rtu = require("rtu.dev.energymachine_rtu") @@ -24,12 +22,11 @@ local println = util.println local print_ts = util.print_ts local println_ts = util.println_ts -local psleep = util.psleep - local MAIN_CLOCK = 2 -- (2Hz, 40 ticks) local COMMS_SLEEP = 150 -- (150ms, 3 ticks) -- main thread +---@param smem rtu_shared_memory threads.thread__main = function (smem) -- execute thread local exec = function () @@ -42,7 +39,7 @@ threads.thread__main = function (smem) local rtu_state = smem.rtu_state local rtu_dev = smem.rtu_dev local rtu_comms = smem.rtu_sys.rtu_comms - local conn_watchdog = smem.rtu_sys.conn_watchdog ---@type watchdog + local conn_watchdog = smem.rtu_sys.conn_watchdog local units = smem.rtu_sys.units -- start clock @@ -116,7 +113,7 @@ threads.thread__main = function (smem) else -- relink lost peripheral to correct unit entry for i = 1, #units do - local unit = units[i] + local unit = units[i] ---@type rtu_unit_registry_entry -- find disconnected device to reconnect if unit.name == param1 then @@ -137,7 +134,7 @@ threads.thread__main = function (smem) unit.rtu = imatrix_rtu.new(device) end - unit.modbus_io = modbus.new(unit.rtu) + unit.modbus_io = modbus.new(unit.rtu, true) println_ts("reconnected the " .. unit.type .. " on interface " .. unit.name) end @@ -159,6 +156,7 @@ threads.thread__main = function (smem) end -- communications handler thread +---@param smem rtu_shared_memory threads.thread__comms = function (smem) -- execute thread local exec = function () @@ -179,14 +177,16 @@ threads.thread__comms = function (smem) while comms_queue.ready() and not rtu_state.shutdown do local msg = comms_queue.pop() - if msg.qtype == mqueue.TYPE.COMMAND then - -- received a command - elseif msg.qtype == mqueue.TYPE.DATA then - -- received data - elseif msg.qtype == mqueue.TYPE.PACKET then - -- received a packet - -- handle the packet (rtu_state passed to allow setting link flag) - rtu_comms.handle_packet(msg.message, units, rtu_state) + if msg ~= nil then + if msg.qtype == mqueue.TYPE.COMMAND then + -- received a command + elseif msg.qtype == mqueue.TYPE.DATA then + -- received data + elseif msg.qtype == mqueue.TYPE.PACKET then + -- received a packet + -- handle the packet (rtu_state passed to allow setting link flag) + rtu_comms.handle_packet(msg.message, units, rtu_state) + end end -- quick yield @@ -209,6 +209,8 @@ threads.thread__comms = function (smem) end -- per-unit communications handler thread +---@param smem rtu_shared_memory +---@param unit rtu_unit_registry_entry threads.thread__unit_comms = function (smem, unit) -- execute thread local exec = function () @@ -227,16 +229,18 @@ threads.thread__unit_comms = function (smem, unit) while packet_queue.ready() and not rtu_state.shutdown do local msg = packet_queue.pop() - if msg.qtype == mqueue.TYPE.COMMAND then - -- received a command - elseif msg.qtype == mqueue.TYPE.DATA then - -- received data - elseif msg.qtype == mqueue.TYPE.PACKET then - -- received a packet - unit.modbus_busy = true - local return_code, reply = unit.modbus_io.handle_packet(msg.message) - rtu_comms.send_modbus(reply) - unit.modbus_busy = false + if msg ~= nil then + if msg.qtype == mqueue.TYPE.COMMAND then + -- received a command + elseif msg.qtype == mqueue.TYPE.DATA then + -- received data + elseif msg.qtype == mqueue.TYPE.PACKET then + -- received a packet + unit.modbus_busy = true + local _, reply = unit.modbus_io.handle_packet(msg.message) + rtu_comms.send_modbus(reply) + unit.modbus_busy = false + end end -- quick yield