diff --git a/reactor-plc/plc.lua b/reactor-plc/plc.lua index 4ed8cb7..290ca64 100644 --- a/reactor-plc/plc.lua +++ b/reactor-plc/plc.lua @@ -269,7 +269,7 @@ plc.rps_init = function (reactor) end -- reactor PLC communications -plc.comms = function (id, modem, local_port, server_port, reactor, rps) +plc.comms = function (id, modem, local_port, server_port, reactor, rps, conn_watchdog) local self = { id = id, seq_num = 0, @@ -279,6 +279,7 @@ plc.comms = function (id, modem, local_port, server_port, reactor, rps) l_port = local_port, reactor = reactor, rps = rps, + conn_watchdog = conn_watchdog, scrammed = false, linked = false, status_cache = nil, @@ -398,7 +399,7 @@ plc.comms = function (id, modem, local_port, server_port, reactor, rps) -- keep alive ack local _send_keep_alive_ack = function (srv_time) - _send(RPLC_TYPES.KEEP_ALIVE, { srv_time, util.time() }) + _send(SCADA_MGMT_TYPES.KEEP_ALIVE, { srv_time, util.time() }) end -- general ack @@ -456,8 +457,8 @@ plc.comms = function (id, modem, local_port, server_port, reactor, rps) end -- close the connection to the server - local close = function (conn_watchdog) - conn_watchdog.cancel() + local close = function () + self.conn_watchdog.cancel() unlink() _send_mgmt(SCADA_MGMT_TYPES.CLOSE, {}) end @@ -478,7 +479,7 @@ plc.comms = function (id, modem, local_port, server_port, reactor, rps) local sys_status = { util.time(), -- timestamp - (not self.scrammed), -- enabled + (not self.scrammed), -- requested control state rps.is_tripped(), -- overridden degraded, -- degraded self.reactor.getHeatingRate(), -- heating rate @@ -542,7 +543,7 @@ plc.comms = function (id, modem, local_port, server_port, reactor, rps) end -- handle an RPLC packet - local handle_packet = function (packet, plc_state, setpoints, conn_watchdog) + local handle_packet = function (packet, plc_state, setpoints) if packet ~= nil then -- check sequence number if self.r_seq_num == nil then @@ -554,29 +555,13 @@ plc.comms = function (id, modem, local_port, server_port, reactor, rps) self.r_seq_num = packet.scada_frame.seq_num() end - -- feed the watchdog first so it doesn't uhh...eat our packets - conn_watchdog.feed() + -- feed the watchdog first so it doesn't uhh...eat our packets :) + self.conn_watchdog.feed() -- handle packet if packet.scada_frame.protocol() == PROTOCOLS.RPLC then if self.linked then - if packet.type == RPLC_TYPES.KEEP_ALIVE then - -- keep alive request received, echo back - if packet.length == 1 then - local timestamp = packet.data[1] - local trip_time = util.time() - timestamp - - if trip_time > 500 then - log.warning("PLC KEEP_ALIVE trip time > 500ms (" .. trip_time .. ")") - end - - -- log.debug("RPLC RTT = ".. trip_time .. "ms") - - _send_keep_alive_ack(timestamp) - else - log.debug("RPLC keep alive packet length mismatch") - end - elseif packet.type == RPLC_TYPES.LINK_REQ then + if packet.type == RPLC_TYPES.LINK_REQ then -- link request confirmation if packet.length == 1 then log.debug("received unsolicited link request response") @@ -694,15 +679,34 @@ plc.comms = function (id, modem, local_port, server_port, reactor, rps) log.debug("discarding non-link packet before linked") end elseif packet.scada_frame.protocol() == PROTOCOLS.SCADA_MGMT then - -- handle session close - if packet.type == SCADA_MGMT_TYPES.CLOSE then - conn_watchdog.cancel() + if packet.type == SCADA_MGMT_TYPES.KEEP_ALIVE then + -- keep alive request received, echo back + if packet.length == 1 then + local timestamp = packet.data[1] + local trip_time = util.time() - timestamp + + if trip_time > 500 then + log.warning("PLC KEEP_ALIVE trip time > 500ms (" .. trip_time .. "ms)") + end + + -- log.debug("RPLC RTT = ".. trip_time .. "ms") + + _send_keep_alive_ack(timestamp) + else + log.debug("SCADA keep alive packet length mismatch") + end + elseif packet.type == SCADA_MGMT_TYPES.CLOSE then + -- handle session close + self.conn_watchdog.cancel() unlink() println_ts("server connection closed by remote host") log.warning("server connection closed by remote host") else log.warning("received unknown SCADA_MGMT packet type " .. packet.type) end + else + -- should be unreachable assuming packet is from parse_packet() + log.error("illegal packet type " .. protocol, true) end end end diff --git a/reactor-plc/startup.lua b/reactor-plc/startup.lua index e1b07ab..1f3604f 100644 --- a/reactor-plc/startup.lua +++ b/reactor-plc/startup.lua @@ -11,7 +11,7 @@ local config = require("config") local plc = require("plc") local threads = require("threads") -local R_PLC_VERSION = "alpha-v0.6.4" +local R_PLC_VERSION = "alpha-v0.6.5" local print = util.print local println = util.println @@ -102,30 +102,35 @@ function init() -- init reactor protection system smem_sys.rps = plc.rps_init(smem_dev.reactor) - log.debug("rps init") + log.debug("init> rps init") if __shared_memory.networked then - -- start comms - smem_sys.plc_comms = plc.comms(config.REACTOR_ID, smem_dev.modem, config.LISTEN_PORT, config.SERVER_PORT, smem_dev.reactor, smem_sys.rps) - log.debug("comms init") - -- comms watchdog, 3 second timeout smem_sys.conn_watchdog = util.new_watchdog(3) - log.debug("conn watchdog started") + log.debug("init> conn watchdog started") + + -- start comms + smem_sys.plc_comms = plc.comms(config.REACTOR_ID, smem_dev.modem, config.LISTEN_PORT, config.SERVER_PORT, smem_dev.reactor, smem_sys.rps, smem_sys.conn_watchdog) + log.debug("init> comms init") else println("boot> starting in offline mode"); - log.debug("running without networking") + log.debug("init> running without networking") end os.queueEvent("clock_start") println("boot> completed"); + log.debug("init> boot completed") else println("boot> system in degraded state, awaiting devices...") - log.warning("booted in a degraded state, awaiting peripheral connections...") + log.warning("init> booted in a degraded state, awaiting peripheral connections...") end end +---------------------------------------- +-- start system +---------------------------------------- + -- initialize PLC init() diff --git a/rtu/rtu.lua b/rtu/rtu.lua index 3048e42..83359d7 100644 --- a/rtu/rtu.lua +++ b/rtu/rtu.lua @@ -125,14 +125,15 @@ rtu.init_unit = function () } end -rtu.comms = function (modem, local_port, server_port) +rtu.comms = function (modem, local_port, server_port, conn_watchdog) local self = { seq_num = 0, r_seq_num = nil, txn_id = 0, modem = modem, s_port = server_port, - l_port = local_port + l_port = local_port, + conn_watchdog = conn_watchdog } -- open modem @@ -153,8 +154,21 @@ rtu.comms = function (modem, local_port, server_port) self.seq_num = self.seq_num + 1 end + -- keep alive ack + local _send_keep_alive_ack = function (srv_time) + _send(SCADA_MGMT_TYPES.KEEP_ALIVE, { srv_time, util.time() }) + end + -- PUBLIC FUNCTIONS -- + -- send a MODBUS TCP packet + local send_modbus = function (m_pkt) + local s_pkt = comms.scada_packet() + s_pkt.make(self.seq_num, PROTOCOLS.MODBUS_TCP, m_pkt.raw_sendable()) + self.modem.transmit(self.s_port, self.l_port, s_pkt.raw_sendable()) + self.seq_num = self.seq_num + 1 + end + -- reconnect a newly connected modem local reconnect_modem = function (modem) self.modem = modem @@ -165,12 +179,47 @@ rtu.comms = function (modem, local_port, server_port) end end - -- send a MODBUS TCP packet - local send_modbus = function (m_pkt) - local s_pkt = comms.scada_packet() - s_pkt.make(self.seq_num, PROTOCOLS.MODBUS_TCP, m_pkt.raw_sendable()) - self.modem.transmit(self.s_port, self.l_port, s_pkt.raw_sendable()) - self.seq_num = self.seq_num + 1 + -- unlink from the server + local unlink = function (rtu_state) + rtu_state.linked = false + self.r_seq_num = nil + end + + -- close the connection to the server + local close = function (rtu_state) + self.conn_watchdog.cancel() + unlink(rtu_state) + _send(SCADA_MGMT_TYPES.CLOSE, {}) + end + + -- send capability advertisement + local send_advertisement = function (units) + local advertisement = {} + + for i = 1, #units do + local unit = units[i] + local type = comms.rtu_t_to_advert_type(unit.type) + + if type ~= nil then + if type == RTU_ADVERT_TYPES.REDSTONE then + insert(advertisement, { + type = type, + index = unit.index, + reactor = unit.for_reactor, + rsio = unit.device + }) + else + insert(advertisement, { + type = type, + index = unit.index, + reactor = unit.for_reactor, + rsio = nil + }) + end + end + end + + _send(SCADA_MGMT_TYPES.RTU_ADVERT, advertisement) end -- parse a MODBUS/SCADA packet @@ -203,7 +252,7 @@ rtu.comms = function (modem, local_port, server_port) end -- handle a MODBUS/SCADA packet - local handle_packet = function(packet, units, rtu_state, conn_watchdog) + local handle_packet = function(packet, units, rtu_state) if packet ~= nil then local seq_ok = true @@ -218,7 +267,7 @@ rtu.comms = function (modem, local_port, server_port) end -- feed watchdog on valid sequence number - conn_watchdog.feed() + self.conn_watchdog.feed() local protocol = packet.scada_frame.protocol() @@ -257,10 +306,28 @@ rtu.comms = function (modem, local_port, server_port) send_modbus(reply) elseif protocol == PROTOCOLS.SCADA_MGMT then -- SCADA management packet - if packet.type == SCADA_MGMT_TYPES.CLOSE then + if packet.type == SCADA_MGMT_TYPES.KEEP_ALIVE then + -- keep alive request received, echo back + if packet.length == 1 then + local timestamp = packet.data[1] + local trip_time = util.time() - timestamp + + if trip_time > 500 then + log.warning("RTU KEEP_ALIVE trip time > 500ms (" .. trip_time .. "ms)") + end + + -- log.debug("RTU RTT = ".. trip_time .. "ms") + + _send_keep_alive_ack(timestamp) + else + log.debug("SCADA keep alive packet length mismatch") + end + elseif packet.type == SCADA_MGMT_TYPES.CLOSE then -- close connection - conn_watchdog.cancel() + self.conn_watchdog.cancel() unlink(rtu_state) + println_ts("server connection closed by remote host") + log.warning("server connection closed by remote host") elseif packet.type == SCADA_MGMT_TYPES.REMOTE_LINKED then -- acknowledgement rtu_state.linked = true @@ -279,50 +346,6 @@ rtu.comms = function (modem, local_port, server_port) end end - -- send capability advertisement - local send_advertisement = function (units) - local advertisement = {} - - for i = 1, #units do - local unit = units[i] - local type = comms.rtu_t_to_advert_type(unit.type) - - if type ~= nil then - if type == RTU_ADVERT_TYPES.REDSTONE then - insert(advertisement, { - type = type, - index = unit.index, - reactor = unit.for_reactor, - rsio = unit.device - }) - else - insert(advertisement, { - type = type, - index = unit.index, - reactor = unit.for_reactor, - rsio = nil - }) - end - end - end - - _send(SCADA_MGMT_TYPES.RTU_ADVERT, advertisement) - end - - local send_heartbeat = function () - _send(SCADA_MGMT_TYPES.RTU_HEARTBEAT, {}) - end - - local unlink = function (rtu_state) - rtu_state.linked = false - self.r_seq_num = nil - end - - local close = function (rtu_state) - unlink(rtu_state) - _send(SCADA_MGMT_TYPES.CLOSE, {}) - end - return { send_modbus = send_modbus, reconnect_modem = reconnect_modem, diff --git a/rtu/startup.lua b/rtu/startup.lua index 4edcbc9..5c4a6f4 100644 --- a/rtu/startup.lua +++ b/rtu/startup.lua @@ -22,7 +22,7 @@ local imatrix_rtu = require("dev.imatrix_rtu") local turbine_rtu = require("dev.turbine_rtu") local turbinev_rtu = require("dev.turbinev_rtu") -local RTU_VERSION = "alpha-v0.6.0" +local RTU_VERSION = "alpha-v0.6.1" local rtu_t = types.rtu_t @@ -80,8 +80,6 @@ if smem_dev.modem == nil then return end -smem_sys.rtu_comms = rtu.comms(smem_dev.modem, config.LISTEN_PORT, config.SERVER_PORT) - ---------------------------------------- -- interpret config and init units ---------------------------------------- @@ -230,14 +228,18 @@ end -- start system ---------------------------------------- +-- start connection watchdog +smem_sys.conn_watchdog = util.new_watchdog(5) +log.debug("boot> conn watchdog started") + +-- setup comms +smem_sys.rtu_comms = rtu.comms(smem_dev.modem, config.LISTEN_PORT, config.SERVER_PORT, smem_sys.conn_watchdog) +log.debug("boot> comms init") + -- init threads local main_thread = threads.thread__main(__shared_memory) local comms_thread = threads.thread__comms(__shared_memory) --- start connection watchdog -smem_sys.conn_watchdog = util.new_watchdog(5) -log.debug("init> conn watchdog started") - -- assemble thread list local _threads = { main_thread.exec, comms_thread.exec } for i = 1, #units do diff --git a/rtu/threads.lua b/rtu/threads.lua index 12f90f7..db27c61 100644 --- a/rtu/threads.lua +++ b/rtu/threads.lua @@ -180,8 +180,7 @@ threads.thread__comms = function (smem) elseif msg.qtype == mqueue.TYPE.PACKET then -- received a packet -- handle the packet (rtu_state passed to allow setting link flag) - -- (conn_watchdog passed to allow feeding watchdog) - rtu_comms.handle_packet(msg.message, units, rtu_state, conn_watchdog) + rtu_comms.handle_packet(msg.message, units, rtu_state) end -- quick yield @@ -211,7 +210,6 @@ threads.thread__unit_comms = function (smem, unit) -- load in from shared memory local rtu_state = smem.rtu_state - local packet_queue = unit.pkt_queue local last_update = util.time() diff --git a/scada-common/comms.lua b/scada-common/comms.lua index 7a41ff7..8685390 100644 --- a/scada-common/comms.lua +++ b/scada-common/comms.lua @@ -18,16 +18,15 @@ local PROTOCOLS = { } local RPLC_TYPES = { - KEEP_ALIVE = 0, -- keep alive packets - LINK_REQ = 1, -- linking requests - STATUS = 2, -- reactor/system status - MEK_STRUCT = 3, -- mekanism build structure - MEK_BURN_RATE = 4, -- set burn rate - RPS_ENABLE = 5, -- enable reactor - RPS_SCRAM = 6, -- SCRAM reactor - RPS_STATUS = 7, -- RPS status - RPS_ALARM = 8, -- RPS alarm broadcast - RPS_RESET = 9 -- clear RPS trip (if in bad state, will trip immediately) + LINK_REQ = 0, -- linking requests + STATUS = 1, -- reactor/system status + MEK_STRUCT = 2, -- mekanism build structure + MEK_BURN_RATE = 3, -- set burn rate + RPS_ENABLE = 4, -- enable reactor + RPS_SCRAM = 5, -- SCRAM reactor + RPS_STATUS = 6, -- RPS status + RPS_ALARM = 7, -- RPS alarm broadcast + RPS_RESET = 8 -- clear RPS trip (if in bad state, will trip immediately) } local RPLC_LINKING = { @@ -37,11 +36,10 @@ local RPLC_LINKING = { } local SCADA_MGMT_TYPES = { - PING = 0, -- generic ping + KEEP_ALIVE = 0, -- keep alive packet w/ RTT CLOSE = 1, -- close a connection - REMOTE_LINKED = 2, -- remote device linked - RTU_ADVERT = 3, -- RTU capability advertisement - RTU_HEARTBEAT = 4 -- RTU heartbeat + RTU_ADVERT = 2, -- RTU capability advertisement + REMOTE_LINKED = 3 -- remote device linked } local RTU_ADVERT_TYPES = { diff --git a/supervisor/session/plc.lua b/supervisor/session/plc.lua index 1206ef6..22726d3 100644 --- a/supervisor/session/plc.lua +++ b/supervisor/session/plc.lua @@ -243,24 +243,7 @@ plc.new_session = function (id, for_reactor, in_queue, out_queue) self.plc_conn_watchdog.feed() -- handle packet by type - if pkt.type == RPLC_TYPES.KEEP_ALIVE then - -- keep alive reply - if pkt.length == 2 then - local srv_start = pkt.data[1] - local plc_send = pkt.data[2] - local srv_now = util.time() - self.last_rtt = srv_now - srv_start - - if self.last_rtt > 500 then - log.warning(log_header .. "PLC KEEP_ALIVE round trip time > 500ms (" .. self.last_rtt .. ")") - end - - -- log.debug(log_header .. "RPLC RTT = ".. self.last_rtt .. "ms") - -- log.debug(log_header .. "RPLC TT = ".. (srv_now - plc_send) .. "ms") - else - log.debug(log_header .. "RPLC keep alive packet length mismatch") - end - elseif pkt.type == RPLC_TYPES.STATUS then + if pkt.type == RPLC_TYPES.STATUS then -- status packet received, update data if pkt.length >= 5 then self.sDB.last_status_update = pkt.data[1] @@ -366,7 +349,24 @@ plc.new_session = function (id, for_reactor, in_queue, out_queue) log.debug(log_header .. "handler received unsupported RPLC packet type " .. pkt.type) end elseif pkt.scada_frame.protocol() == PROTOCOLS.SCADA_MGMT then - if pkt.type == SCADA_MGMT_TYPES.CLOSE then + if pkt.type == SCADA_MGMT_TYPES.KEEP_ALIVE then + -- keep alive reply + if pkt.length == 2 then + local srv_start = pkt.data[1] + local plc_send = pkt.data[2] + local srv_now = util.time() + self.last_rtt = srv_now - srv_start + + if self.last_rtt > 500 then + log.warning(log_header .. "PLC KEEP_ALIVE round trip time > 500ms (" .. self.last_rtt .. "ms)") + end + + -- log.debug(log_header .. "PLC RTT = ".. self.last_rtt .. "ms") + -- log.debug(log_header .. "PLC TT = ".. (srv_now - plc_send) .. "ms") + else + log.debug(log_header .. "SCADA keep alive packet length mismatch") + end + elseif pkt.type == SCADA_MGMT_TYPES.CLOSE then -- close the session self.connected = false else @@ -497,7 +497,7 @@ plc.new_session = function (id, for_reactor, in_queue, out_queue) periodics.keep_alive = periodics.keep_alive + elapsed if periodics.keep_alive >= PERIODICS.KEEP_ALIVE then - _send(RPLC_TYPES.KEEP_ALIVE, { util.time() }) + _send_mgmt(SCADA_MGMT_TYPES.KEEP_ALIVE, { util.time() }) periodics.keep_alive = 0 end diff --git a/supervisor/session/rtu.lua b/supervisor/session/rtu.lua index 1cf4e76..90f6e95 100644 --- a/supervisor/session/rtu.lua +++ b/supervisor/session/rtu.lua @@ -35,7 +35,7 @@ rtu.new_session = function (id, in_queue, out_queue) } -- send a MODBUS TCP packet - local send_modbus = function (m_pkt) + local _send_modbus = function (m_pkt) local s_pkt = comms.scada_packet() s_pkt.make(self.seq_num, PROTOCOLS.MODBUS_TCP, m_pkt.raw_sendable()) self.modem.transmit(self.s_port, self.l_port, s_pkt.raw_sendable()) @@ -66,16 +66,31 @@ rtu.new_session = function (id, in_queue, out_queue) self.r_seq_num = pkt.scada_frame.seq_num() end + -- feed watchdog + self.rtu_conn_watchdog.feed() + -- process packet if pkt.scada_frame.protocol() == PROTOCOLS.MODBUS_TCP then - -- feed watchdog - self.rtu_conn_watchdog.feed() - elseif pkt.scada_frame.protocol() == PROTOCOLS.SCADA_MGMT then - -- feed watchdog - self.rtu_conn_watchdog.feed() - if pkt.type == SCADA_MGMT_TYPES.CLOSE then + if pkt.type == SCADA_MGMT_TYPES.KEEP_ALIVE then + -- keep alive reply + if pkt.length == 2 then + local srv_start = pkt.data[1] + local rtu_send = pkt.data[2] + local srv_now = util.time() + self.last_rtt = srv_now - srv_start + + if self.last_rtt > 500 then + log.warning(log_header .. "RTU KEEP_ALIVE round trip time > 500ms (" .. self.last_rtt .. "ms)") + end + + -- log.debug(log_header .. "RTU RTT = ".. self.last_rtt .. "ms") + -- log.debug(log_header .. "RTU TT = ".. (srv_now - rtu_send) .. "ms") + else + log.debug(log_header .. "SCADA keep alive packet length mismatch") + end + elseif pkt.type == SCADA_MGMT_TYPES.CLOSE then -- close the session self.connected = false elseif pkt.type == SCADA_MGMT_TYPES.RTU_ADVERT then @@ -84,8 +99,6 @@ rtu.new_session = function (id, in_queue, out_queue) local unit = packet.data[i] unit end - elseif pkt.type == SCADA_MGMT_TYPES.RTU_HEARTBEAT then - -- periodic RTU heartbeat else log.debug(log_header .. "handler received unsupported SCADA_MGMT packet type " .. pkt.type) end @@ -162,7 +175,7 @@ rtu.new_session = function (id, in_queue, out_queue) periodics.keep_alive = periodics.keep_alive + elapsed if periodics.keep_alive >= PERIODICS.KEEP_ALIVE then - -- _send(RPLC_TYPES.KEEP_ALIVE, { util.time() }) + _send_mgmt(SCADA_MGMT_TYPES.KEEP_ALIVE, { util.time() }) periodics.keep_alive = 0 end diff --git a/supervisor/startup.lua b/supervisor/startup.lua index a7ebcfe..825e30c 100644 --- a/supervisor/startup.lua +++ b/supervisor/startup.lua @@ -14,7 +14,7 @@ local svsessions = require("session.svsessions") local config = require("config") local supervisor = require("supervisor") -local SUPERVISOR_VERSION = "alpha-v0.3.3" +local SUPERVISOR_VERSION = "alpha-v0.3.4" local print = util.print local println = util.println