From 68011d6734fa8526f6dc851d506333d8e77007d8 Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Wed, 27 Apr 2022 12:21:10 -0400 Subject: [PATCH 1/8] #32 new threaded PLC code --- reactor-plc/startup.lua | 31 ++- reactor-plc/threads.lua | 273 ++++++++++++++---------- {supervisor => scada-common}/mqueue.lua | 23 +- scada-common/util.lua | 7 +- supervisor/session/plc.lua | 2 +- supervisor/session/svsessions.lua | 2 +- supervisor/startup.lua | 4 +- 7 files changed, 210 insertions(+), 132 deletions(-) rename {supervisor => scada-common}/mqueue.lua (78%) diff --git a/reactor-plc/startup.lua b/reactor-plc/startup.lua index a4ac6a8..e448e55 100644 --- a/reactor-plc/startup.lua +++ b/reactor-plc/startup.lua @@ -6,12 +6,13 @@ os.loadAPI("scada-common/log.lua") os.loadAPI("scada-common/util.lua") os.loadAPI("scada-common/ppm.lua") os.loadAPI("scada-common/comms.lua") +os.loadAPI("scada-common/mqueue.lua") os.loadAPI("config.lua") os.loadAPI("plc.lua") os.loadAPI("threads.lua") -local R_PLC_VERSION = "alpha-v0.3.3" +local R_PLC_VERSION = "alpha-v0.4.0" local print = util.print local println = util.println @@ -28,30 +29,42 @@ ppm.mount_all() -- shared memory across threads local __shared_memory = { + -- networked setting networked = config.NETWORKED, + -- PLC system state flags plc_state = { init_ok = true, + shutdown = false, scram = true, degraded = false, no_reactor = false, no_modem = false }, - plc_devices = { + -- core PLC devices + plc_dev = { reactor = ppm.get_fission_reactor(), modem = ppm.get_wireless_modem() }, - system = { + -- system control objects + plc_sys = { iss = nil, plc_comms = nil, conn_watchdog = nil + }, + + -- message queues + q = { + mq_main = mqueue.new(), + mq_iss = mqueue.new(), + mq_comms = mqeuue.new() } } -local smem_dev = __shared_memory.plc_devices -local smem_sys = __shared_memory.system +local smem_dev = __shared_memory.plc_dev +local smem_sys = __shared_memory.plc_sys local plc_state = __shared_memory.plc_state @@ -112,12 +125,12 @@ end init() -- init threads -local main_thread = threads.thread__main(__shared_memory, init) -local iss_thread = threads.thread__iss(__shared_memory) --- local comms_thread = plc.thread__comms(__shared_memory) +local main_thread = threads.thread__main(__shared_memory, init) +local iss_thread = threads.thread__iss(__shared_memory) +local comms_thread = threads.thread__comms(__shared_memory) -- run threads -parallel.waitForAll(main_thread.exec, iss_thread.exec) +parallel.waitForAll(main_thread.exec, iss_thread.exec, comms_thread.exec) -- send an alarm: plc_comms.send_alarm(ALARMS.PLC_SHUTDOWN) ? println_ts("exited") diff --git a/reactor-plc/threads.lua b/reactor-plc/threads.lua index e9f0d3d..eb5468d 100644 --- a/reactor-plc/threads.lua +++ b/reactor-plc/threads.lua @@ -8,36 +8,37 @@ local println = util.println local print_ts = util.print_ts local println_ts = util.println_ts -local async_wait = util.async_wait +local MAIN_CLOCK = 1 -- (1Hz, 20 ticks) +local ISS_CLOCK = 0.5 -- (2Hz, 10 ticks) +local COMMS_CLOCK = 0.25 -- (4Hz, 5 ticks) -local MAIN_CLOCK = 0.5 -- (2Hz, 10 ticks) -local ISS_CLOCK = 0.25 -- (4Hz, 5 ticks) however this is AFTER all the ISS checks, so it is a pause between calls, not start-to-start - -local ISS_EVENT = { +local MQ__ISS_CMD = { SCRAM = 1, DEGRADED_SCRAM = 2, TRIP_TIMEOUT = 3 } +local MQ__COMM_CMD = { + SEND_STATUS = 1 +} + -- main thread -function thread__main(shared_memory, init) +function thread__main(smem, init) -- execute thread local exec = function () -- send status updates at 2Hz (every 10 server ticks) (every loop tick) -- send link requests at 0.5Hz (every 40 server ticks) (every 4 loop ticks) local LINK_TICKS = 4 - - local loop_clock = nil local ticks_to_update = 0 + local loop_clock = nil -- load in from shared memory - local networked = shared_memory.networked - local plc_state = shared_memory.plc_state - local plc_devices = shared_memory.plc_devices - - local iss = shared_memory.system.iss - local plc_comms = shared_memory.system.plc_comms - local conn_watchdog = shared_memory.system.conn_watchdog + local networked = smem.networked + local plc_state = smem.plc_state + local plc_dev = smem.plc_dev + local iss = smem.plc_sys.iss + local plc_comms = smem.plc_sys.plc_comms + local conn_watchdog = smem.plc_sys.conn_watchdog -- debug local last_update = util.time() @@ -56,10 +57,7 @@ function thread__main(shared_memory, init) -- send updated data if not plc_state.no_modem then if plc_comms.is_linked() then - async_wait(function () - plc_comms.send_status(iss_tripped, plc_state.degraded) - plc_comms.send_iss_status() - end) + smem.q.mq_comms.push_command(MQ__COMM_CMD.SEND_STATUS) else if ticks_to_update == 0 then plc_comms.send_link_req() @@ -80,15 +78,15 @@ function thread__main(shared_memory, init) -- feed the watchdog first so it doesn't uhh...eat our packets conn_watchdog.feed() - -- handle the packet (plc_state passed to allow clearing SCRAM flag) - async_wait(function () - local packet = plc_comms.parse_packet(param1, param2, param3, param4, param5) - plc_comms.handle_packet(packet, plc_state) - end) + -- handle the packet + local packet = plc_comms.parse_packet(param1, param2, param3, param4, param5) + if packet ~= nil then + smem.q.mq_comms.puch_packet(packet) + end elseif event == "timer" and networked and param1 == conn_watchdog.get_timer() then -- haven't heard from server recently? shutdown reactor plc_comms.unlink() - os.queueEvent("iss_command", ISS_EVENT.TRIP_TIMEOUT) + smem.q.mq_iss.push_command(MQ__ISS_CMD.TRIP_TIMEOUT) elseif event == "peripheral_detach" then -- peripheral disconnect local device = ppm.handle_unmount(param1) @@ -108,7 +106,7 @@ function thread__main(shared_memory, init) if plc_state.init_ok then -- try to scram reactor if it is still connected - os.queueEvent("iss_command", ISS_EVENT.DEGRADED_SCRAM) + smem.q.mq_iss.push_command(MQ__ISS_CMD.DEGRADED_SCRAM) end plc_state.degraded = true @@ -122,18 +120,18 @@ function thread__main(shared_memory, init) if type == "fissionReactor" then -- reconnected reactor - plc_devices.reactor = device + plc_dev.reactor = device - os.queueEvent("iss_command", ISS_EVENT.SCRAM) + smem.q.mq_iss.push_command(MQ__ISS_CMD.SCRAM) println_ts("reactor reconnected.") log._info("reactor reconnected.") plc_state.no_reactor = false if plc_state.init_ok then - iss.reconnect_reactor(plc_devices.reactor) + iss.reconnect_reactor(plc_dev.reactor) if networked then - plc_comms.reconnect_reactor(plc_devices.reactor) + plc_comms.reconnect_reactor(plc_dev.reactor) end end @@ -144,10 +142,10 @@ function thread__main(shared_memory, init) elseif networked and type == "modem" then if device.isWireless() then -- reconnected modem - plc_devices.modem = device + plc_dev.modem = device if plc_state.init_ok then - plc_comms.reconnect_modem(plc_devices.modem) + plc_comms.reconnect_modem(plc_dev.modem) end println_ts("wireless modem reconnected.") @@ -176,6 +174,7 @@ function thread__main(shared_memory, init) -- check for termination request if event == "terminate" or ppm.should_terminate() then -- iss handles reactor shutdown + plc_state.shutdown = true log._warning("terminate requested, main thread exiting") break end @@ -186,80 +185,66 @@ function thread__main(shared_memory, init) end -- ISS monitor thread -function thread__iss(shared_memory) +function thread__iss(smem) -- execute thread local exec = function () - local loop_clock = nil - -- load in from shared memory - local networked = shared_memory.networked - local plc_state = shared_memory.plc_state - local plc_devices = shared_memory.plc_devices + local networked = smem.networked + local plc_state = smem.plc_state + local plc_dev = smem.plc_dev + local iss = smem.plc_sys.iss + local plc_comms = smem.plc_sys.plc_comms - local iss = shared_memory.system.iss - local plc_comms = shared_memory.system.plc_comms + local iss_queue = smem.q.mq_iss - -- debug - -- local last_update = util.time() + local last_update = util.time() - -- event loop + -- thread loop while true do - local event, param1, param2, param3, param4, param5 = os.pullEventRaw() + local reactor = smem.plc_dev.reactor - local reactor = shared_memory.plc_devices.reactor - - if event == "timer" and param1 == loop_clock then - -- ISS checks - if plc_state.init_ok then - -- if we tried to SCRAM but failed, keep trying - -- in that case, SCRAM won't be called until it reconnects (this is the expected use of this check) - async_wait(function () - if not plc_state.no_reactor and plc_state.scram and reactor.getStatus() then - reactor.scram() - end - end) - - -- if we are in standalone mode, continuously reset ISS - -- ISS will trip again if there are faults, but if it isn't cleared, the user can't re-enable - if not networked then - plc_state.scram = false - iss.reset() - end - - -- check safety (SCRAM occurs if tripped) - async_wait(function () - if not plc_state.degraded then - local iss_tripped, iss_status_string, iss_first = iss.check() - plc_state.scram = plc_state.scram or iss_tripped - - if iss_first then - println_ts("[ISS] SCRAM! safety trip: " .. iss_status_string) - if networked then - plc_comms.send_iss_alarm(iss_status_string) - end - end - end - end) + -- ISS checks + if plc_state.init_ok then + -- if we tried to SCRAM but failed, keep trying + -- in that case, SCRAM won't be called until it reconnects (this is the expected use of this check) + if not plc_state.no_reactor and plc_state.scram and reactor.getStatus() then + reactor.scram() end - -- start next clock timer after all the long operations - -- otherwise we will never get around to other events - loop_clock = os.startTimer(ISS_CLOCK) + -- if we are in standalone mode, continuously reset ISS + -- ISS will trip again if there are faults, but if it isn't cleared, the user can't re-enable + if not networked then + plc_state.scram = false + iss.reset() + end - -- debug - -- print(util.time() - last_update) - -- println("ms") - -- last_update = util.time() - elseif event == "iss_command" then - -- handle ISS commands - if param1 == ISS_EVENT.SCRAM then - -- basic SCRAM - plc_state.scram = true - async_wait(reactor.scram) - elseif param1 == ISS_EVENT.DEGRADED_SCRAM then - -- SCRAM with print - plc_state.scram = true - async_wait(function () + -- check safety (SCRAM occurs if tripped) + if not plc_state.degraded then + local iss_tripped, iss_status_string, iss_first = iss.check() + plc_state.scram = plc_state.scram or iss_tripped + + if iss_first then + println_ts("[ISS] SCRAM! safety trip: " .. iss_status_string) + if networked then + plc_comms.send_iss_alarm(iss_status_string) + end + end + end + end + + -- check for messages in the message queue + while comms_queue.ready() do + local msg = comms_queue.pop() + + if msg.qtype == mqueue.TYPE.COMMAND then + -- received a command + if msg.message == MQ__ISS_CMD.SCRAM then + -- basic SCRAM + plc_state.scram = true + reactor.scram() + elseif msg.message == MQ__ISS_CMD.DEGRADED_SCRAM then + -- SCRAM with print + plc_state.scram = true if reactor.scram() then println_ts("successful reactor SCRAM") log._error("successful reactor SCRAM") @@ -267,38 +252,106 @@ function thread__iss(shared_memory) println_ts("failed reactor SCRAM") log._error("failed reactor SCRAM") end - end) - elseif param1 == ISS_EVENT.TRIP_TIMEOUT then - -- watchdog tripped - plc_state.scram = true - iss.trip_timeout() - println_ts("server timeout") - log._warning("server timeout") + elseif msg.message == MQ__ISS_CMD.TRIP_TIMEOUT then + -- watchdog tripped + plc_state.scram = true + iss.trip_timeout() + println_ts("server timeout") + log._warning("server timeout") + end + elseif msg.qtype == mqueue.TYPE.DATA then + -- received data + elseif msg.qtype == mqueue.TYPE.PACKET then + -- received a packet end - elseif event == "clock_start" then - -- start loop clock - loop_clock = os.startTimer(ISS_CLOCK) - log._debug("iss thread started") + + -- quick yield + if iss_queue.ready() then util.nop() end end -- check for termination request - if event == "terminate" or ppm.should_terminate() then + if plc_state.shutdown then -- safe exit - log._warning("terminate requested, iss thread shutdown") + log._warning("iss thread shutdown initiated") if plc_state.init_ok then plc_state.scram = true - async_wait(reactor.scram) + reactor.scram() if reactor.__p_is_ok() then println_ts("reactor disabled") + log._info("iss thread reactor SCRAM OK") else -- send an alarm: plc_comms.send_alarm(ALARMS.PLC_LOST_CONTROL) ? println_ts("exiting, reactor failed to disable") + log._error("iss thread failed to SCRAM reactor on exit") end end - break + log._warning("iss thread exiting") + return + end + + -- debug + -- print(util.time() - last_update) + -- println("ms") + -- last_update = util.time() + + -- delay before next check + local sleep_for = ISS_CLOCK - (util.time() - last_update) + if sleep_for > 0.05 then + sleep(sleep_for) end end end return { exec = exec } end + +function thread__comms(smem) + -- execute thread + local exec = function () + -- load in from shared memory + local plc_state = smem.plc_state + local plc_comms = smem.plc_sys.plc_comms + + local comms_queue = smem.q.mq_comms + + -- thread loop + while true do + local last_update = util.time() + + -- check for messages in the message queue + while comms_queue.ready() do + local msg = comms_queue.pop() + + if msg.qtype == mqueue.TYPE.COMMAND then + -- received a command + if msg.message == MQ__COMM_CMD.SEND_STATUS then + -- send PLC/ISS status + plc_comms.send_status(plc_state.degraded) + plc_comms.send_iss_status() + end + elseif msg.qtype == mqueue.TYPE.DATA then + -- received data + elseif msg.qtype == mqueue.TYPE.PACKET then + -- received a packet + -- handle the packet (plc_state passed to allow clearing SCRAM flag) + plc_comms.handle_packet(msg.message, plc_state) + end + + -- quick yield + if comms_queue.ready() then util.nop() end + end + + -- check for termination request + if plc_state.shutdown then + log._warning("comms thread exiting") + return + end + + -- delay before next check + local sleep_for = COMMS_CLOCK - (util.time() - last_update) + if sleep_for > 0.05 then + sleep(sleep_for) + end + end + end +end diff --git a/supervisor/mqueue.lua b/scada-common/mqueue.lua similarity index 78% rename from supervisor/mqueue.lua rename to scada-common/mqueue.lua index f79e686..3881d02 100644 --- a/supervisor/mqueue.lua +++ b/scada-common/mqueue.lua @@ -4,7 +4,8 @@ TYPE = { COMMAND = 0, - PACKET = 1 + DATA = 1, + PACKET = 2 } function new() @@ -17,19 +18,27 @@ function new() local empty = function () return #queue == 0 end + + local ready = function () + return #queue > 0 + end local _push = function (qtype, message) table.insert(queue, { qtype = qtype, message = message }) end - local push_packet = function (message) - _push(TYPE.PACKET, message) - end - local push_command = function (message) _push(TYPE.COMMAND, message) end - + + local push_data = function (message) + _push(TYPE.DATA, message) + end + + local push_packet = function (message) + _push(TYPE.PACKET, message) + end + local pop = function () if #queue > 0 then return table.remove(queue) @@ -41,7 +50,9 @@ function new() return { length = length, empty = empty, + ready = ready, push_packet = push_packet, + push_data = push_data, push_command = push_command, pop = pop } diff --git a/scada-common/util.lua b/scada-common/util.lua index 97ce601..11b258e 100644 --- a/scada-common/util.lua +++ b/scada-common/util.lua @@ -39,9 +39,10 @@ end -- PARALLELIZATION -- --- block waiting for parallel call -function async_wait(f) - parallel.waitForAll(f) +-- no-op to provide a brief pause (and a yield) +-- EVENT_CONSUMER: this function consumes events +function nop() + sleep(0.05) end -- WATCHDOG -- diff --git a/supervisor/session/plc.lua b/supervisor/session/plc.lua index 695b34b..3a51666 100644 --- a/supervisor/session/plc.lua +++ b/supervisor/session/plc.lua @@ -330,7 +330,7 @@ function new_session(id, for_reactor, in_queue, out_queue) -- handle queue -- ------------------ - if not self.in_q.empty() then + if self.in_q.ready() then -- get a new message to process local message = self.in_q.pop() diff --git a/supervisor/session/svsessions.lua b/supervisor/session/svsessions.lua index e02619c..9c02ce0 100644 --- a/supervisor/session/svsessions.lua +++ b/supervisor/session/svsessions.lua @@ -130,7 +130,7 @@ local function _iterate(sessions) if ok then -- send packets in out queue -- @todo handle commands if that's being used too - while not session.out_queue.empty() do + while session.out_queue.ready() do local msg = session.out_queue.pop() if msg.qtype == mqueue.TYPE.PACKET then self.modem.transmit(session.r_port, session.l_port, msg.message.raw_sendable()) diff --git a/supervisor/startup.lua b/supervisor/startup.lua index 27a9db6..138cfd8 100644 --- a/supervisor/startup.lua +++ b/supervisor/startup.lua @@ -7,9 +7,9 @@ os.loadAPI("scada-common/util.lua") os.loadAPI("scada-common/ppm.lua") os.loadAPI("scada-common/comms.lua") os.loadAPI("scada-common/modbus.lua") +os.loadAPI("scada-common/mqueue.lua") os.loadAPI("config.lua") -os.loadAPI("mqueue.lua") os.loadAPI("session/rtu.lua") os.loadAPI("session/plc.lua") @@ -18,7 +18,7 @@ os.loadAPI("session/svsessions.lua") os.loadAPI("supervisor.lua") -local SUPERVISOR_VERSION = "alpha-v0.1.5" +local SUPERVISOR_VERSION = "alpha-v0.1.6" local print = util.print local println = util.println From 1ba5c7f8283406613ab8645b54604a1d45835e21 Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Wed, 27 Apr 2022 12:27:15 -0400 Subject: [PATCH 2/8] fixed PLC mqueue typo and removed unused mq_main --- reactor-plc/startup.lua | 3 +-- reactor-plc/threads.lua | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/reactor-plc/startup.lua b/reactor-plc/startup.lua index e448e55..56bb072 100644 --- a/reactor-plc/startup.lua +++ b/reactor-plc/startup.lua @@ -57,9 +57,8 @@ local __shared_memory = { -- message queues q = { - mq_main = mqueue.new(), mq_iss = mqueue.new(), - mq_comms = mqeuue.new() + mq_comms = mqeueu.new() } } diff --git a/reactor-plc/threads.lua b/reactor-plc/threads.lua index eb5468d..060bbe5 100644 --- a/reactor-plc/threads.lua +++ b/reactor-plc/threads.lua @@ -1,6 +1,5 @@ -- #REQUIRES comms.lua -- #REQUIRES ppm.lua --- #REQUIRES plc.lua -- #REQUIRES util.lua local print = util.print From ccf06956f9c71b6a27e9a44b7277fae48ab8aaf7 Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Wed, 27 Apr 2022 12:37:28 -0400 Subject: [PATCH 3/8] fixed another typo --- reactor-plc/startup.lua | 2 +- reactor-plc/threads.lua | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/reactor-plc/startup.lua b/reactor-plc/startup.lua index 56bb072..025c274 100644 --- a/reactor-plc/startup.lua +++ b/reactor-plc/startup.lua @@ -48,7 +48,7 @@ local __shared_memory = { modem = ppm.get_wireless_modem() }, - -- system control objects + -- system objects plc_sys = { iss = nil, plc_comms = nil, diff --git a/reactor-plc/threads.lua b/reactor-plc/threads.lua index 060bbe5..ec0d267 100644 --- a/reactor-plc/threads.lua +++ b/reactor-plc/threads.lua @@ -1,4 +1,5 @@ -- #REQUIRES comms.lua +-- #REQUIRES log.lua -- #REQUIRES ppm.lua -- #REQUIRES util.lua @@ -80,7 +81,7 @@ function thread__main(smem, init) -- handle the packet local packet = plc_comms.parse_packet(param1, param2, param3, param4, param5) if packet ~= nil then - smem.q.mq_comms.puch_packet(packet) + smem.q.mq_comms.push_packet(packet) end elseif event == "timer" and networked and param1 == conn_watchdog.get_timer() then -- haven't heard from server recently? shutdown reactor From 71be6aca1a09400688e218f5973a71bede67143b Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Wed, 27 Apr 2022 12:43:32 -0400 Subject: [PATCH 4/8] cleanup and last_update bugfix for comms thread --- reactor-plc/threads.lua | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/reactor-plc/threads.lua b/reactor-plc/threads.lua index ec0d267..f4375c9 100644 --- a/reactor-plc/threads.lua +++ b/reactor-plc/threads.lua @@ -286,7 +286,7 @@ function thread__iss(smem) end end log._warning("iss thread exiting") - return + break end -- debug @@ -305,6 +305,7 @@ function thread__iss(smem) return { exec = exec } end +-- communications handler thread function thread__comms(smem) -- execute thread local exec = function () @@ -314,10 +315,10 @@ function thread__comms(smem) local comms_queue = smem.q.mq_comms + local last_update = util.time() + -- thread loop while true do - local last_update = util.time() - -- check for messages in the message queue while comms_queue.ready() do local msg = comms_queue.pop() @@ -344,7 +345,7 @@ function thread__comms(smem) -- check for termination request if plc_state.shutdown then log._warning("comms thread exiting") - return + break end -- delay before next check From 8c4598e7a6d6565a8be8944ddb5758f3be94b763 Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Wed, 27 Apr 2022 12:46:04 -0400 Subject: [PATCH 5/8] #32 new threaded RTU code --- rtu/rtu.lua | 4 +- rtu/startup.lua | 143 ++++++++++++++++------------------------------ rtu/threads.lua | 147 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 199 insertions(+), 95 deletions(-) create mode 100644 rtu/threads.lua diff --git a/rtu/rtu.lua b/rtu/rtu.lua index ccf6e44..54bf780 100644 --- a/rtu/rtu.lua +++ b/rtu/rtu.lua @@ -183,7 +183,7 @@ function rtu_comms(modem, local_port, server_port) end -- handle a MODBUS/SCADA packet - local handle_packet = function(packet, units, ref) + local handle_packet = function(packet, units, rtu_state) if packet ~= nil then local protocol = packet.scada_frame.protocol() @@ -209,7 +209,7 @@ function rtu_comms(modem, local_port, server_port) -- SCADA management packet if packet.type == SCADA_MGMT_TYPES.REMOTE_LINKED then -- acknowledgement - ref.linked = true + rtu_state.linked = true elseif packet.type == SCADA_MGMT_TYPES.RTU_ADVERT then -- request for capabilities again send_advertisement(units) diff --git a/rtu/startup.lua b/rtu/startup.lua index 2d4c58f..990819f 100644 --- a/rtu/startup.lua +++ b/rtu/startup.lua @@ -6,26 +6,26 @@ os.loadAPI("scada-common/log.lua") os.loadAPI("scada-common/util.lua") os.loadAPI("scada-common/ppm.lua") os.loadAPI("scada-common/comms.lua") +os.loadAPI("scada-common/mqueue.lua") os.loadAPI("scada-common/modbus.lua") os.loadAPI("scada-common/rsio.lua") os.loadAPI("config.lua") os.loadAPI("rtu.lua") +os.loadAPI("threads.lua") os.loadAPI("dev/redstone_rtu.lua") os.loadAPI("dev/boiler_rtu.lua") os.loadAPI("dev/imatrix_rtu.lua") os.loadAPI("dev/turbine_rtu.lua") -local RTU_VERSION = "alpha-v0.3.2" +local RTU_VERSION = "alpha-v0.4.0" local print = util.print local println = util.println local print_ts = util.print_ts local println_ts = util.println_ts -local async_wait = util.async_wait - log._info("========================================") log._info("BOOTING rtu.startup " .. RTU_VERSION) log._info("========================================") @@ -35,15 +35,37 @@ println(">> RTU " .. RTU_VERSION .. " <<") -- startup ---------------------------------------- -local units = {} -local linked = false - -- mount connected devices ppm.mount_all() +local __shared_memory = { + -- RTU system state flags + rtu_state = { + linked = false, + shutdown = false + }, + + -- core RTU devices + rtu_dev = { + modem = ppm.get_wireless_modem() + }, + + -- system objects + rtu_sys = { + rtu_comms = nil, + units = {} + }, + + -- message queues + q = { + mq_comms = mqeueu.new() + } +} + +local smem_dev = __shared_memory.rtu_dev + -- get modem -local modem = ppm.get_wireless_modem() -if modem == nil then +if smem_dev.modem == nil then println("boot> wireless modem not found") log._warning("no wireless modem on startup") return @@ -52,9 +74,11 @@ end local rtu_comms = rtu.rtu_comms(modem, config.LISTEN_PORT, config.SERVER_PORT) ---------------------------------------- --- determine configuration +-- interpret config and init units ---------------------------------------- +local units = __shared_memory.rtu_sys.units + local rtu_redstone = config.RTU_REDSTONE local rtu_devices = config.RTU_DEVICES @@ -69,12 +93,12 @@ for reactor_idx = 1, #rtu_redstone do for i = 1, #io_table do local valid = false - local config = io_table[i] + local conf = io_table[i] -- verify configuration - if rsio.is_valid_channel(config.channel) and rsio.is_valid_side(config.side) then - if config.bundled_color then - valid = rsio.is_color(config.bundled_color) + if rsio.is_valid_channel(conf.channel) and rsio.is_valid_side(conf.side) then + if conf.bundled_color then + valid = rsio.is_color(conf.bundled_color) else valid = true end @@ -87,24 +111,24 @@ for reactor_idx = 1, #rtu_redstone do log._warning(message) else -- link redstone in RTU - local mode = rsio.get_io_mode(config.channel) + local mode = rsio.get_io_mode(conf.channel) if mode == rsio.IO_MODE.DIGITAL_IN then - rs_rtu.link_di(config.channel, config.side, config.bundled_color) + rs_rtu.link_di(conf.channel, conf.side, conf.bundled_color) elseif mode == rsio.IO_MODE.DIGITAL_OUT then - rs_rtu.link_do(config.channel, config.side, config.bundled_color) + rs_rtu.link_do(conf.channel, conf.side, conf.bundled_color) elseif mode == rsio.IO_MODE.ANALOG_IN then - rs_rtu.link_ai(config.channel, config.side) + rs_rtu.link_ai(conf.channel, conf.side) elseif mode == rsio.IO_MODE.ANALOG_OUT then - rs_rtu.link_ao(config.channel, config.side) + rs_rtu.link_ao(conf.channel, conf.side) else -- should be unreachable code, we already validated channels log._error("init> fell through if chain attempting to identify IO mode", true) break end - table.insert(capabilities, config.channel) + table.insert(capabilities, conf.channel) - log._debug("init> linked redstone " .. #capabilities .. ": " .. rsio.to_string(config.channel) .. " (" .. config.side .. + log._debug("init> linked redstone " .. #capabilities .. ": " .. rsio.to_string(conf.channel) .. " (" .. conf.side .. ") for reactor " .. rtu_redstone[reactor_idx].for_reactor) end end @@ -171,82 +195,15 @@ for i = 1, #rtu_devices do end ---------------------------------------- --- main loop +-- start system ---------------------------------------- --- advertisement/heartbeat clock (every 2 seconds) -local loop_clock = os.startTimer(2) +-- init threads +local main_thread = threads.thread__main(__shared_memory) +local comms_thread = threads.thread__comms(__shared_memory) --- event loop -while true do - local event, param1, param2, param3, param4, param5 = os.pullEventRaw() - - if event == "peripheral_detach" then - -- handle loss of a device - local device = ppm.handle_unmount(param1) - - for i = 1, #units do - -- find disconnected device - if units[i].device == device.dev then - -- we are going to let the PPM prevent crashes - -- return fault flags/codes to MODBUS queries - local unit = units[i] - println_ts("lost the " .. unit.type .. " on interface " .. unit.name) - end - end - elseif event == "peripheral" then - -- relink lost peripheral to correct unit entry - local type, device = ppm.mount(param1) - - for i = 1, #units do - local unit = units[i] - - -- find disconnected device to reconnect - if unit.name == param1 then - -- found, re-link - unit.device = device - - if unit.type == "boiler" then - unit.rtu = boiler_rtu.new(device) - elseif unit.type == "turbine" then - unit.rtu = turbine_rtu.new(device) - elseif unit.type == "imatrix" then - unit.rtu = imatrix_rtu.new(device) - end - - unit.modbus_io = modbus.new(unit.rtu) - - println_ts("reconnected the " .. unit.type .. " on interface " .. unit.name) - end - end - elseif event == "timer" and param1 == loop_clock then - -- start next clock timer - loop_clock = os.startTimer(2) - - -- period tick, if we are linked send heartbeat, if not send advertisement - if linked then - rtu_comms.send_heartbeat() - else - -- advertise units - rtu_comms.send_advertisement(units) - end - elseif event == "modem_message" then - -- got a packet - local link_ref = { linked = linked } - local packet = rtu_comms.parse_packet(param1, param2, param3, param4, param5) - - async_wait(function () rtu_comms.handle_packet(packet, units, link_ref) end) - - -- if linked, stop sending advertisements - linked = link_ref.linked - end - - -- check for termination request - if event == "terminate" or ppm.should_terminate() then - log._warning("terminate requested, exiting...") - break - end -end +-- run threads +parallel.waitForAll(main_thread.exec, comms_thread.exec) println_ts("exited") log._info("exited") diff --git a/rtu/threads.lua b/rtu/threads.lua new file mode 100644 index 0000000..bc96d3b --- /dev/null +++ b/rtu/threads.lua @@ -0,0 +1,147 @@ +-- #REQUIRES comms.lua +-- #REQUIRES log.lua +-- #REQUIRES ppm.lua +-- #REQUIRES util.lua + +local print = util.print +local println = util.println +local print_ts = util.print_ts +local println_ts = util.println_ts + +local MAIN_CLOCK = 2 -- (2Hz, 40 ticks) +local COMMS_CLOCK = 0.25 -- (4Hz, 5 ticks) + +-- main thread +function thread__main(smem) + -- execute thread + local exec = function () + -- advertisement/heartbeat clock + local loop_clock = os.startTimer(MAIN_CLOCK) + + -- load in from shared memory + local rtu_state = smem.rtu_state + local rtu_dev = smem.rtu_dev + local rtu_comms = smem.rtu_sys.rtu_comms + + -- event loop + while true do + local event, param1, param2, param3, param4, param5 = os.pullEventRaw() + + if event == "peripheral_detach" then + -- handle loss of a device + local device = ppm.handle_unmount(param1) + + for i = 1, #units do + -- find disconnected device + if units[i].device == device.dev then + -- we are going to let the PPM prevent crashes + -- return fault flags/codes to MODBUS queries + local unit = units[i] + println_ts("lost the " .. unit.type .. " on interface " .. unit.name) + end + end + elseif event == "peripheral" then + -- relink lost peripheral to correct unit entry + local type, device = ppm.mount(param1) + + for i = 1, #units do + local unit = units[i] + + -- find disconnected device to reconnect + if unit.name == param1 then + -- found, re-link + unit.device = device + + if unit.type == "boiler" then + unit.rtu = boiler_rtu.new(device) + elseif unit.type == "turbine" then + unit.rtu = turbine_rtu.new(device) + elseif unit.type == "imatrix" then + unit.rtu = imatrix_rtu.new(device) + end + + unit.modbus_io = modbus.new(unit.rtu) + + println_ts("reconnected the " .. unit.type .. " on interface " .. unit.name) + end + end + elseif event == "timer" and param1 == loop_clock then + -- start next clock timer + loop_clock = os.startTimer(MAIN_CLOCK) + + -- period tick, if we are linked send heartbeat, if not send advertisement + if rtu_state.linked then + rtu_comms.send_heartbeat() + else + -- advertise units + rtu_comms.send_advertisement(units) + end + elseif event == "modem_message" then + -- got a packet + local packet = rtu_comms.parse_packet(param1, param2, param3, param4, param5) + if packet ~= nil then + smem.q.mq_comms.push_packet(packet) + end + + rtu_comms.handle_packet(packet, units, link_ref) + end + + -- check for termination request + if event == "terminate" or ppm.should_terminate() then + rtu_state.shutdown = true + log._warning("terminate requested, main thread exiting") + break + end + end + end + + return { exec = exec } +end + +-- communications handler thread +function thread__comms(smem) + -- execute thread + local exec = function () + -- load in from shared memory + local rtu_state = smem.rtu_state + local rtu_comms = smem.rtu_sys.rtu_comms + local units = smem.rtu_sys.units + + local comms_queue = smem.q.mq_comms + + local last_update = util.time() + + -- thread loop + while true do + -- check for messages in the message queue + while comms_queue.ready() do + local msg = comms_queue.pop() + + if msg.qtype == mqueue.TYPE.COMMAND then + -- received a command + elseif msg.qtype == mqueue.TYPE.DATA then + -- received data + elseif msg.qtype == mqueue.TYPE.PACKET then + -- received a packet + -- handle the packet (rtu_state passed to allow setting link flag) + rtu_comms.handle_packet(msg.message, units, rtu_state) + end + + -- quick yield + if comms_queue.ready() then util.nop() end + end + + -- check for termination request + if rtu_state.shutdown then + log._warning("comms thread exiting") + break + end + + -- delay before next check + local sleep_for = COMMS_CLOCK - (util.time() - last_update) + if sleep_for > 0.05 then + sleep(sleep_for) + end + end + end +end From 14377e734853e5ab5a43186eea57a2ecd5373d1a Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Wed, 27 Apr 2022 15:01:10 -0400 Subject: [PATCH 6/8] don't run PLC comms thread if not networked --- reactor-plc/startup.lua | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/reactor-plc/startup.lua b/reactor-plc/startup.lua index 025c274..23ca80a 100644 --- a/reactor-plc/startup.lua +++ b/reactor-plc/startup.lua @@ -129,7 +129,11 @@ local iss_thread = threads.thread__iss(__shared_memory) local comms_thread = threads.thread__comms(__shared_memory) -- run threads -parallel.waitForAll(main_thread.exec, iss_thread.exec, comms_thread.exec) +if __shared_memory.networked then + parallel.waitForAll(main_thread.exec, iss_thread.exec, comms_thread.exec) +else + parallel.waitForAll(main_thread.exec, iss_thread.exec) +end -- send an alarm: plc_comms.send_alarm(ALARMS.PLC_SHUTDOWN) ? println_ts("exited") From 67a93016c0f837fa12a4738c2c56a5e88d6689fe Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Wed, 27 Apr 2022 15:52:34 -0400 Subject: [PATCH 7/8] threaded RTU/PLC bugfixes --- reactor-plc/startup.lua | 4 ++-- reactor-plc/threads.lua | 28 +++++++++++++++++++++------- rtu/startup.lua | 7 ++++--- rtu/threads.lua | 14 ++++++++++++-- 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/reactor-plc/startup.lua b/reactor-plc/startup.lua index 23ca80a..df117f5 100644 --- a/reactor-plc/startup.lua +++ b/reactor-plc/startup.lua @@ -12,7 +12,7 @@ os.loadAPI("config.lua") os.loadAPI("plc.lua") os.loadAPI("threads.lua") -local R_PLC_VERSION = "alpha-v0.4.0" +local R_PLC_VERSION = "alpha-v0.4.1" local print = util.print local println = util.println @@ -58,7 +58,7 @@ local __shared_memory = { -- message queues q = { mq_iss = mqueue.new(), - mq_comms = mqeueu.new() + mq_comms = mqueue.new() } } diff --git a/reactor-plc/threads.lua b/reactor-plc/threads.lua index f4375c9..17ab22e 100644 --- a/reactor-plc/threads.lua +++ b/reactor-plc/threads.lua @@ -26,6 +26,8 @@ local MQ__COMM_CMD = { function thread__main(smem, init) -- execute thread local exec = function () + log._debug("main thread init, clock inactive") + -- send status updates at 2Hz (every 10 server ticks) (every loop tick) -- send link requests at 0.5Hz (every 40 server ticks) (every 4 loop ticks) local LINK_TICKS = 4 @@ -168,7 +170,7 @@ function thread__main(smem, init) elseif event == "clock_start" then -- start loop clock loop_clock = os.startTimer(MAIN_CLOCK) - log._debug("main thread started") + log._debug("main thread clock started") end -- check for termination request @@ -188,6 +190,8 @@ end function thread__iss(smem) -- execute thread local exec = function () + log._debug("iss thread start") + -- load in from shared memory local networked = smem.networked local plc_state = smem.plc_state @@ -233,8 +237,8 @@ function thread__iss(smem) end -- check for messages in the message queue - while comms_queue.ready() do - local msg = comms_queue.pop() + while iss_queue.ready() do + local msg = iss_queue.pop() if msg.qtype == mqueue.TYPE.COMMAND then -- received a command @@ -265,7 +269,7 @@ function thread__iss(smem) -- received a packet end - -- quick yield + -- quick yield if we are looping right back if iss_queue.ready() then util.nop() end end @@ -296,8 +300,11 @@ function thread__iss(smem) -- delay before next check local sleep_for = ISS_CLOCK - (util.time() - last_update) - if sleep_for > 0.05 then + last_update = util.time() + if sleep_for > 0 then sleep(sleep_for) + else + sleep(0.05) end end end @@ -309,6 +316,8 @@ end function thread__comms(smem) -- execute thread local exec = function () + log._debug("comms thread start") + -- load in from shared memory local plc_state = smem.plc_state local plc_comms = smem.plc_sys.plc_comms @@ -338,7 +347,7 @@ function thread__comms(smem) plc_comms.handle_packet(msg.message, plc_state) end - -- quick yield + -- quick yield if we are looping right back if comms_queue.ready() then util.nop() end end @@ -350,9 +359,14 @@ function thread__comms(smem) -- delay before next check local sleep_for = COMMS_CLOCK - (util.time() - last_update) - if sleep_for > 0.05 then + last_update = util.time() + if sleep_for > 0 then sleep(sleep_for) + else + sleep(0.05) end end end + + return { exec = exec } end diff --git a/rtu/startup.lua b/rtu/startup.lua index 990819f..fed7dd0 100644 --- a/rtu/startup.lua +++ b/rtu/startup.lua @@ -19,7 +19,7 @@ os.loadAPI("dev/boiler_rtu.lua") os.loadAPI("dev/imatrix_rtu.lua") os.loadAPI("dev/turbine_rtu.lua") -local RTU_VERSION = "alpha-v0.4.0" +local RTU_VERSION = "alpha-v0.4.1" local print = util.print local println = util.println @@ -58,11 +58,12 @@ local __shared_memory = { -- message queues q = { - mq_comms = mqeueu.new() + mq_comms = mqueue.new() } } local smem_dev = __shared_memory.rtu_dev +local smem_sys = __shared_memory.rtu_sys -- get modem if smem_dev.modem == nil then @@ -71,7 +72,7 @@ if smem_dev.modem == nil then return end -local rtu_comms = rtu.rtu_comms(modem, config.LISTEN_PORT, config.SERVER_PORT) +smem_sys.rtu_comms = rtu.rtu_comms(smem_dev.modem, config.LISTEN_PORT, config.SERVER_PORT) ---------------------------------------- -- interpret config and init units diff --git a/rtu/threads.lua b/rtu/threads.lua index bc96d3b..3f021a6 100644 --- a/rtu/threads.lua +++ b/rtu/threads.lua @@ -15,6 +15,8 @@ local COMMS_CLOCK = 0.25 -- (4Hz, 5 ticks) function thread__main(smem) -- execute thread local exec = function () + log._debug("main thread start") + -- advertisement/heartbeat clock local loop_clock = os.startTimer(MAIN_CLOCK) @@ -22,6 +24,7 @@ function thread__main(smem) local rtu_state = smem.rtu_state local rtu_dev = smem.rtu_dev local rtu_comms = smem.rtu_sys.rtu_comms + local units = smem.rtu_sys.units -- event loop while true do @@ -102,6 +105,8 @@ end function thread__comms(smem) -- execute thread local exec = function () + log._debug("comms thread start") + -- load in from shared memory local rtu_state = smem.rtu_state local rtu_comms = smem.rtu_sys.rtu_comms @@ -127,7 +132,7 @@ function thread__comms(smem) rtu_comms.handle_packet(msg.message, units, rtu_state) end - -- quick yield + -- quick yield if we are looping right back if comms_queue.ready() then util.nop() end end @@ -139,9 +144,14 @@ function thread__comms(smem) -- delay before next check local sleep_for = COMMS_CLOCK - (util.time() - last_update) - if sleep_for > 0.05 then + last_update = util.time() + if sleep_for > 0 then sleep(sleep_for) + else + sleep(0.05) end end end + + return { exec = exec } end From 146e0bf5693e09dc7478047102025560bc56627e Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Wed, 27 Apr 2022 15:56:55 -0400 Subject: [PATCH 8/8] protected sleep call --- reactor-plc/startup.lua | 2 +- reactor-plc/threads.lua | 10 ++++++---- rtu/startup.lua | 2 +- rtu/threads.lua | 6 ++++-- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/reactor-plc/startup.lua b/reactor-plc/startup.lua index df117f5..bca0027 100644 --- a/reactor-plc/startup.lua +++ b/reactor-plc/startup.lua @@ -12,7 +12,7 @@ os.loadAPI("config.lua") os.loadAPI("plc.lua") os.loadAPI("threads.lua") -local R_PLC_VERSION = "alpha-v0.4.1" +local R_PLC_VERSION = "alpha-v0.4.2" local print = util.print local println = util.println diff --git a/reactor-plc/threads.lua b/reactor-plc/threads.lua index 17ab22e..ce17609 100644 --- a/reactor-plc/threads.lua +++ b/reactor-plc/threads.lua @@ -8,6 +8,8 @@ local println = util.println local print_ts = util.print_ts local println_ts = util.println_ts +local psleep = util.psleep + local MAIN_CLOCK = 1 -- (1Hz, 20 ticks) local ISS_CLOCK = 0.5 -- (2Hz, 10 ticks) local COMMS_CLOCK = 0.25 -- (4Hz, 5 ticks) @@ -302,9 +304,9 @@ function thread__iss(smem) local sleep_for = ISS_CLOCK - (util.time() - last_update) last_update = util.time() if sleep_for > 0 then - sleep(sleep_for) + psleep(sleep_for) else - sleep(0.05) + psleep(0.05) end end end @@ -361,9 +363,9 @@ function thread__comms(smem) local sleep_for = COMMS_CLOCK - (util.time() - last_update) last_update = util.time() if sleep_for > 0 then - sleep(sleep_for) + psleep(sleep_for) else - sleep(0.05) + psleep(0.05) end end end diff --git a/rtu/startup.lua b/rtu/startup.lua index fed7dd0..c04a902 100644 --- a/rtu/startup.lua +++ b/rtu/startup.lua @@ -19,7 +19,7 @@ os.loadAPI("dev/boiler_rtu.lua") os.loadAPI("dev/imatrix_rtu.lua") os.loadAPI("dev/turbine_rtu.lua") -local RTU_VERSION = "alpha-v0.4.1" +local RTU_VERSION = "alpha-v0.4.2" local print = util.print local println = util.println diff --git a/rtu/threads.lua b/rtu/threads.lua index 3f021a6..692697d 100644 --- a/rtu/threads.lua +++ b/rtu/threads.lua @@ -8,6 +8,8 @@ local println = util.println local print_ts = util.print_ts local println_ts = util.println_ts +local psleep = util.psleep + local MAIN_CLOCK = 2 -- (2Hz, 40 ticks) local COMMS_CLOCK = 0.25 -- (4Hz, 5 ticks) @@ -146,9 +148,9 @@ function thread__comms(smem) local sleep_for = COMMS_CLOCK - (util.time() - last_update) last_update = util.time() if sleep_for > 0 then - sleep(sleep_for) + psleep(sleep_for) else - sleep(0.05) + psleep(0.05) end end end