diff --git a/reactor-plc/startup.lua b/reactor-plc/startup.lua index a4ac6a8..bca0027 100644 --- a/reactor-plc/startup.lua +++ b/reactor-plc/startup.lua @@ -6,12 +6,13 @@ os.loadAPI("scada-common/log.lua") os.loadAPI("scada-common/util.lua") os.loadAPI("scada-common/ppm.lua") os.loadAPI("scada-common/comms.lua") +os.loadAPI("scada-common/mqueue.lua") os.loadAPI("config.lua") os.loadAPI("plc.lua") os.loadAPI("threads.lua") -local R_PLC_VERSION = "alpha-v0.3.3" +local R_PLC_VERSION = "alpha-v0.4.2" local print = util.print local println = util.println @@ -28,30 +29,41 @@ ppm.mount_all() -- shared memory across threads local __shared_memory = { + -- networked setting networked = config.NETWORKED, + -- PLC system state flags plc_state = { init_ok = true, + shutdown = false, scram = true, degraded = false, no_reactor = false, no_modem = false }, - plc_devices = { + -- core PLC devices + plc_dev = { reactor = ppm.get_fission_reactor(), modem = ppm.get_wireless_modem() }, - system = { + -- system objects + plc_sys = { iss = nil, plc_comms = nil, conn_watchdog = nil + }, + + -- message queues + q = { + mq_iss = mqueue.new(), + mq_comms = mqueue.new() } } -local smem_dev = __shared_memory.plc_devices -local smem_sys = __shared_memory.system +local smem_dev = __shared_memory.plc_dev +local smem_sys = __shared_memory.plc_sys local plc_state = __shared_memory.plc_state @@ -112,12 +124,16 @@ end init() -- init threads -local main_thread = threads.thread__main(__shared_memory, init) -local iss_thread = threads.thread__iss(__shared_memory) --- local comms_thread = plc.thread__comms(__shared_memory) +local main_thread = threads.thread__main(__shared_memory, init) +local iss_thread = threads.thread__iss(__shared_memory) +local comms_thread = threads.thread__comms(__shared_memory) -- run threads -parallel.waitForAll(main_thread.exec, iss_thread.exec) +if __shared_memory.networked then + parallel.waitForAll(main_thread.exec, iss_thread.exec, comms_thread.exec) +else + parallel.waitForAll(main_thread.exec, iss_thread.exec) +end -- send an alarm: plc_comms.send_alarm(ALARMS.PLC_SHUTDOWN) ? println_ts("exited") diff --git a/reactor-plc/threads.lua b/reactor-plc/threads.lua index e9f0d3d..ce17609 100644 --- a/reactor-plc/threads.lua +++ b/reactor-plc/threads.lua @@ -1,6 +1,6 @@ -- #REQUIRES comms.lua +-- #REQUIRES log.lua -- #REQUIRES ppm.lua --- #REQUIRES plc.lua -- #REQUIRES util.lua local print = util.print @@ -8,36 +8,41 @@ local println = util.println local print_ts = util.print_ts local println_ts = util.println_ts -local async_wait = util.async_wait +local psleep = util.psleep -local MAIN_CLOCK = 0.5 -- (2Hz, 10 ticks) -local ISS_CLOCK = 0.25 -- (4Hz, 5 ticks) however this is AFTER all the ISS checks, so it is a pause between calls, not start-to-start +local MAIN_CLOCK = 1 -- (1Hz, 20 ticks) +local ISS_CLOCK = 0.5 -- (2Hz, 10 ticks) +local COMMS_CLOCK = 0.25 -- (4Hz, 5 ticks) -local ISS_EVENT = { +local MQ__ISS_CMD = { SCRAM = 1, DEGRADED_SCRAM = 2, TRIP_TIMEOUT = 3 } +local MQ__COMM_CMD = { + SEND_STATUS = 1 +} + -- main thread -function thread__main(shared_memory, init) +function thread__main(smem, init) -- execute thread local exec = function () + log._debug("main thread init, clock inactive") + -- send status updates at 2Hz (every 10 server ticks) (every loop tick) -- send link requests at 0.5Hz (every 40 server ticks) (every 4 loop ticks) local LINK_TICKS = 4 - - local loop_clock = nil local ticks_to_update = 0 + local loop_clock = nil -- load in from shared memory - local networked = shared_memory.networked - local plc_state = shared_memory.plc_state - local plc_devices = shared_memory.plc_devices - - local iss = shared_memory.system.iss - local plc_comms = shared_memory.system.plc_comms - local conn_watchdog = shared_memory.system.conn_watchdog + local networked = smem.networked + local plc_state = smem.plc_state + local plc_dev = smem.plc_dev + local iss = smem.plc_sys.iss + local plc_comms = smem.plc_sys.plc_comms + local conn_watchdog = smem.plc_sys.conn_watchdog -- debug local last_update = util.time() @@ -56,10 +61,7 @@ function thread__main(shared_memory, init) -- send updated data if not plc_state.no_modem then if plc_comms.is_linked() then - async_wait(function () - plc_comms.send_status(iss_tripped, plc_state.degraded) - plc_comms.send_iss_status() - end) + smem.q.mq_comms.push_command(MQ__COMM_CMD.SEND_STATUS) else if ticks_to_update == 0 then plc_comms.send_link_req() @@ -80,15 +82,15 @@ function thread__main(shared_memory, init) -- feed the watchdog first so it doesn't uhh...eat our packets conn_watchdog.feed() - -- handle the packet (plc_state passed to allow clearing SCRAM flag) - async_wait(function () - local packet = plc_comms.parse_packet(param1, param2, param3, param4, param5) - plc_comms.handle_packet(packet, plc_state) - end) + -- handle the packet + local packet = plc_comms.parse_packet(param1, param2, param3, param4, param5) + if packet ~= nil then + smem.q.mq_comms.push_packet(packet) + end elseif event == "timer" and networked and param1 == conn_watchdog.get_timer() then -- haven't heard from server recently? shutdown reactor plc_comms.unlink() - os.queueEvent("iss_command", ISS_EVENT.TRIP_TIMEOUT) + smem.q.mq_iss.push_command(MQ__ISS_CMD.TRIP_TIMEOUT) elseif event == "peripheral_detach" then -- peripheral disconnect local device = ppm.handle_unmount(param1) @@ -108,7 +110,7 @@ function thread__main(shared_memory, init) if plc_state.init_ok then -- try to scram reactor if it is still connected - os.queueEvent("iss_command", ISS_EVENT.DEGRADED_SCRAM) + smem.q.mq_iss.push_command(MQ__ISS_CMD.DEGRADED_SCRAM) end plc_state.degraded = true @@ -122,18 +124,18 @@ function thread__main(shared_memory, init) if type == "fissionReactor" then -- reconnected reactor - plc_devices.reactor = device + plc_dev.reactor = device - os.queueEvent("iss_command", ISS_EVENT.SCRAM) + smem.q.mq_iss.push_command(MQ__ISS_CMD.SCRAM) println_ts("reactor reconnected.") log._info("reactor reconnected.") plc_state.no_reactor = false if plc_state.init_ok then - iss.reconnect_reactor(plc_devices.reactor) + iss.reconnect_reactor(plc_dev.reactor) if networked then - plc_comms.reconnect_reactor(plc_devices.reactor) + plc_comms.reconnect_reactor(plc_dev.reactor) end end @@ -144,10 +146,10 @@ function thread__main(shared_memory, init) elseif networked and type == "modem" then if device.isWireless() then -- reconnected modem - plc_devices.modem = device + plc_dev.modem = device if plc_state.init_ok then - plc_comms.reconnect_modem(plc_devices.modem) + plc_comms.reconnect_modem(plc_dev.modem) end println_ts("wireless modem reconnected.") @@ -170,12 +172,13 @@ function thread__main(shared_memory, init) elseif event == "clock_start" then -- start loop clock loop_clock = os.startTimer(MAIN_CLOCK) - log._debug("main thread started") + log._debug("main thread clock started") end -- check for termination request if event == "terminate" or ppm.should_terminate() then -- iss handles reactor shutdown + plc_state.shutdown = true log._warning("terminate requested, main thread exiting") break end @@ -186,80 +189,68 @@ function thread__main(shared_memory, init) end -- ISS monitor thread -function thread__iss(shared_memory) +function thread__iss(smem) -- execute thread local exec = function () - local loop_clock = nil + log._debug("iss thread start") -- load in from shared memory - local networked = shared_memory.networked - local plc_state = shared_memory.plc_state - local plc_devices = shared_memory.plc_devices + local networked = smem.networked + local plc_state = smem.plc_state + local plc_dev = smem.plc_dev + local iss = smem.plc_sys.iss + local plc_comms = smem.plc_sys.plc_comms - local iss = shared_memory.system.iss - local plc_comms = shared_memory.system.plc_comms + local iss_queue = smem.q.mq_iss - -- debug - -- local last_update = util.time() + local last_update = util.time() - -- event loop + -- thread loop while true do - local event, param1, param2, param3, param4, param5 = os.pullEventRaw() + local reactor = smem.plc_dev.reactor - local reactor = shared_memory.plc_devices.reactor - - if event == "timer" and param1 == loop_clock then - -- ISS checks - if plc_state.init_ok then - -- if we tried to SCRAM but failed, keep trying - -- in that case, SCRAM won't be called until it reconnects (this is the expected use of this check) - async_wait(function () - if not plc_state.no_reactor and plc_state.scram and reactor.getStatus() then - reactor.scram() - end - end) - - -- if we are in standalone mode, continuously reset ISS - -- ISS will trip again if there are faults, but if it isn't cleared, the user can't re-enable - if not networked then - plc_state.scram = false - iss.reset() - end - - -- check safety (SCRAM occurs if tripped) - async_wait(function () - if not plc_state.degraded then - local iss_tripped, iss_status_string, iss_first = iss.check() - plc_state.scram = plc_state.scram or iss_tripped - - if iss_first then - println_ts("[ISS] SCRAM! safety trip: " .. iss_status_string) - if networked then - plc_comms.send_iss_alarm(iss_status_string) - end - end - end - end) + -- ISS checks + if plc_state.init_ok then + -- if we tried to SCRAM but failed, keep trying + -- in that case, SCRAM won't be called until it reconnects (this is the expected use of this check) + if not plc_state.no_reactor and plc_state.scram and reactor.getStatus() then + reactor.scram() end - -- start next clock timer after all the long operations - -- otherwise we will never get around to other events - loop_clock = os.startTimer(ISS_CLOCK) + -- if we are in standalone mode, continuously reset ISS + -- ISS will trip again if there are faults, but if it isn't cleared, the user can't re-enable + if not networked then + plc_state.scram = false + iss.reset() + end - -- debug - -- print(util.time() - last_update) - -- println("ms") - -- last_update = util.time() - elseif event == "iss_command" then - -- handle ISS commands - if param1 == ISS_EVENT.SCRAM then - -- basic SCRAM - plc_state.scram = true - async_wait(reactor.scram) - elseif param1 == ISS_EVENT.DEGRADED_SCRAM then - -- SCRAM with print - plc_state.scram = true - async_wait(function () + -- check safety (SCRAM occurs if tripped) + if not plc_state.degraded then + local iss_tripped, iss_status_string, iss_first = iss.check() + plc_state.scram = plc_state.scram or iss_tripped + + if iss_first then + println_ts("[ISS] SCRAM! safety trip: " .. iss_status_string) + if networked then + plc_comms.send_iss_alarm(iss_status_string) + end + end + end + end + + -- check for messages in the message queue + while iss_queue.ready() do + local msg = iss_queue.pop() + + if msg.qtype == mqueue.TYPE.COMMAND then + -- received a command + if msg.message == MQ__ISS_CMD.SCRAM then + -- basic SCRAM + plc_state.scram = true + reactor.scram() + elseif msg.message == MQ__ISS_CMD.DEGRADED_SCRAM then + -- SCRAM with print + plc_state.scram = true if reactor.scram() then println_ts("successful reactor SCRAM") log._error("successful reactor SCRAM") @@ -267,36 +258,115 @@ function thread__iss(shared_memory) println_ts("failed reactor SCRAM") log._error("failed reactor SCRAM") end - end) - elseif param1 == ISS_EVENT.TRIP_TIMEOUT then - -- watchdog tripped - plc_state.scram = true - iss.trip_timeout() - println_ts("server timeout") - log._warning("server timeout") + elseif msg.message == MQ__ISS_CMD.TRIP_TIMEOUT then + -- watchdog tripped + plc_state.scram = true + iss.trip_timeout() + println_ts("server timeout") + log._warning("server timeout") + end + elseif msg.qtype == mqueue.TYPE.DATA then + -- received data + elseif msg.qtype == mqueue.TYPE.PACKET then + -- received a packet end - elseif event == "clock_start" then - -- start loop clock - loop_clock = os.startTimer(ISS_CLOCK) - log._debug("iss thread started") + + -- quick yield if we are looping right back + if iss_queue.ready() then util.nop() end end -- check for termination request - if event == "terminate" or ppm.should_terminate() then + if plc_state.shutdown then -- safe exit - log._warning("terminate requested, iss thread shutdown") + log._warning("iss thread shutdown initiated") if plc_state.init_ok then plc_state.scram = true - async_wait(reactor.scram) + reactor.scram() if reactor.__p_is_ok() then println_ts("reactor disabled") + log._info("iss thread reactor SCRAM OK") else -- send an alarm: plc_comms.send_alarm(ALARMS.PLC_LOST_CONTROL) ? println_ts("exiting, reactor failed to disable") + log._error("iss thread failed to SCRAM reactor on exit") end end + log._warning("iss thread exiting") break end + + -- debug + -- print(util.time() - last_update) + -- println("ms") + -- last_update = util.time() + + -- delay before next check + local sleep_for = ISS_CLOCK - (util.time() - last_update) + last_update = util.time() + if sleep_for > 0 then + psleep(sleep_for) + else + psleep(0.05) + end + end + end + + return { exec = exec } +end + +-- communications handler thread +function thread__comms(smem) + -- execute thread + local exec = function () + log._debug("comms thread start") + + -- load in from shared memory + local plc_state = smem.plc_state + local plc_comms = smem.plc_sys.plc_comms + + local comms_queue = smem.q.mq_comms + + local last_update = util.time() + + -- thread loop + while true do + -- check for messages in the message queue + while comms_queue.ready() do + local msg = comms_queue.pop() + + if msg.qtype == mqueue.TYPE.COMMAND then + -- received a command + if msg.message == MQ__COMM_CMD.SEND_STATUS then + -- send PLC/ISS status + plc_comms.send_status(plc_state.degraded) + plc_comms.send_iss_status() + end + elseif msg.qtype == mqueue.TYPE.DATA then + -- received data + elseif msg.qtype == mqueue.TYPE.PACKET then + -- received a packet + -- handle the packet (plc_state passed to allow clearing SCRAM flag) + plc_comms.handle_packet(msg.message, plc_state) + end + + -- quick yield if we are looping right back + if comms_queue.ready() then util.nop() end + end + + -- check for termination request + if plc_state.shutdown then + log._warning("comms thread exiting") + break + end + + -- delay before next check + local sleep_for = COMMS_CLOCK - (util.time() - last_update) + last_update = util.time() + if sleep_for > 0 then + psleep(sleep_for) + else + psleep(0.05) + end end end diff --git a/rtu/rtu.lua b/rtu/rtu.lua index ccf6e44..54bf780 100644 --- a/rtu/rtu.lua +++ b/rtu/rtu.lua @@ -183,7 +183,7 @@ function rtu_comms(modem, local_port, server_port) end -- handle a MODBUS/SCADA packet - local handle_packet = function(packet, units, ref) + local handle_packet = function(packet, units, rtu_state) if packet ~= nil then local protocol = packet.scada_frame.protocol() @@ -209,7 +209,7 @@ function rtu_comms(modem, local_port, server_port) -- SCADA management packet if packet.type == SCADA_MGMT_TYPES.REMOTE_LINKED then -- acknowledgement - ref.linked = true + rtu_state.linked = true elseif packet.type == SCADA_MGMT_TYPES.RTU_ADVERT then -- request for capabilities again send_advertisement(units) diff --git a/rtu/startup.lua b/rtu/startup.lua index 2d4c58f..c04a902 100644 --- a/rtu/startup.lua +++ b/rtu/startup.lua @@ -6,26 +6,26 @@ os.loadAPI("scada-common/log.lua") os.loadAPI("scada-common/util.lua") os.loadAPI("scada-common/ppm.lua") os.loadAPI("scada-common/comms.lua") +os.loadAPI("scada-common/mqueue.lua") os.loadAPI("scada-common/modbus.lua") os.loadAPI("scada-common/rsio.lua") os.loadAPI("config.lua") os.loadAPI("rtu.lua") +os.loadAPI("threads.lua") os.loadAPI("dev/redstone_rtu.lua") os.loadAPI("dev/boiler_rtu.lua") os.loadAPI("dev/imatrix_rtu.lua") os.loadAPI("dev/turbine_rtu.lua") -local RTU_VERSION = "alpha-v0.3.2" +local RTU_VERSION = "alpha-v0.4.2" local print = util.print local println = util.println local print_ts = util.print_ts local println_ts = util.println_ts -local async_wait = util.async_wait - log._info("========================================") log._info("BOOTING rtu.startup " .. RTU_VERSION) log._info("========================================") @@ -35,26 +35,51 @@ println(">> RTU " .. RTU_VERSION .. " <<") -- startup ---------------------------------------- -local units = {} -local linked = false - -- mount connected devices ppm.mount_all() +local __shared_memory = { + -- RTU system state flags + rtu_state = { + linked = false, + shutdown = false + }, + + -- core RTU devices + rtu_dev = { + modem = ppm.get_wireless_modem() + }, + + -- system objects + rtu_sys = { + rtu_comms = nil, + units = {} + }, + + -- message queues + q = { + mq_comms = mqueue.new() + } +} + +local smem_dev = __shared_memory.rtu_dev +local smem_sys = __shared_memory.rtu_sys + -- get modem -local modem = ppm.get_wireless_modem() -if modem == nil then +if smem_dev.modem == nil then println("boot> wireless modem not found") log._warning("no wireless modem on startup") return end -local rtu_comms = rtu.rtu_comms(modem, config.LISTEN_PORT, config.SERVER_PORT) +smem_sys.rtu_comms = rtu.rtu_comms(smem_dev.modem, config.LISTEN_PORT, config.SERVER_PORT) ---------------------------------------- --- determine configuration +-- interpret config and init units ---------------------------------------- +local units = __shared_memory.rtu_sys.units + local rtu_redstone = config.RTU_REDSTONE local rtu_devices = config.RTU_DEVICES @@ -69,12 +94,12 @@ for reactor_idx = 1, #rtu_redstone do for i = 1, #io_table do local valid = false - local config = io_table[i] + local conf = io_table[i] -- verify configuration - if rsio.is_valid_channel(config.channel) and rsio.is_valid_side(config.side) then - if config.bundled_color then - valid = rsio.is_color(config.bundled_color) + if rsio.is_valid_channel(conf.channel) and rsio.is_valid_side(conf.side) then + if conf.bundled_color then + valid = rsio.is_color(conf.bundled_color) else valid = true end @@ -87,24 +112,24 @@ for reactor_idx = 1, #rtu_redstone do log._warning(message) else -- link redstone in RTU - local mode = rsio.get_io_mode(config.channel) + local mode = rsio.get_io_mode(conf.channel) if mode == rsio.IO_MODE.DIGITAL_IN then - rs_rtu.link_di(config.channel, config.side, config.bundled_color) + rs_rtu.link_di(conf.channel, conf.side, conf.bundled_color) elseif mode == rsio.IO_MODE.DIGITAL_OUT then - rs_rtu.link_do(config.channel, config.side, config.bundled_color) + rs_rtu.link_do(conf.channel, conf.side, conf.bundled_color) elseif mode == rsio.IO_MODE.ANALOG_IN then - rs_rtu.link_ai(config.channel, config.side) + rs_rtu.link_ai(conf.channel, conf.side) elseif mode == rsio.IO_MODE.ANALOG_OUT then - rs_rtu.link_ao(config.channel, config.side) + rs_rtu.link_ao(conf.channel, conf.side) else -- should be unreachable code, we already validated channels log._error("init> fell through if chain attempting to identify IO mode", true) break end - table.insert(capabilities, config.channel) + table.insert(capabilities, conf.channel) - log._debug("init> linked redstone " .. #capabilities .. ": " .. rsio.to_string(config.channel) .. " (" .. config.side .. + log._debug("init> linked redstone " .. #capabilities .. ": " .. rsio.to_string(conf.channel) .. " (" .. conf.side .. ") for reactor " .. rtu_redstone[reactor_idx].for_reactor) end end @@ -171,82 +196,15 @@ for i = 1, #rtu_devices do end ---------------------------------------- --- main loop +-- start system ---------------------------------------- --- advertisement/heartbeat clock (every 2 seconds) -local loop_clock = os.startTimer(2) +-- init threads +local main_thread = threads.thread__main(__shared_memory) +local comms_thread = threads.thread__comms(__shared_memory) --- event loop -while true do - local event, param1, param2, param3, param4, param5 = os.pullEventRaw() - - if event == "peripheral_detach" then - -- handle loss of a device - local device = ppm.handle_unmount(param1) - - for i = 1, #units do - -- find disconnected device - if units[i].device == device.dev then - -- we are going to let the PPM prevent crashes - -- return fault flags/codes to MODBUS queries - local unit = units[i] - println_ts("lost the " .. unit.type .. " on interface " .. unit.name) - end - end - elseif event == "peripheral" then - -- relink lost peripheral to correct unit entry - local type, device = ppm.mount(param1) - - for i = 1, #units do - local unit = units[i] - - -- find disconnected device to reconnect - if unit.name == param1 then - -- found, re-link - unit.device = device - - if unit.type == "boiler" then - unit.rtu = boiler_rtu.new(device) - elseif unit.type == "turbine" then - unit.rtu = turbine_rtu.new(device) - elseif unit.type == "imatrix" then - unit.rtu = imatrix_rtu.new(device) - end - - unit.modbus_io = modbus.new(unit.rtu) - - println_ts("reconnected the " .. unit.type .. " on interface " .. unit.name) - end - end - elseif event == "timer" and param1 == loop_clock then - -- start next clock timer - loop_clock = os.startTimer(2) - - -- period tick, if we are linked send heartbeat, if not send advertisement - if linked then - rtu_comms.send_heartbeat() - else - -- advertise units - rtu_comms.send_advertisement(units) - end - elseif event == "modem_message" then - -- got a packet - local link_ref = { linked = linked } - local packet = rtu_comms.parse_packet(param1, param2, param3, param4, param5) - - async_wait(function () rtu_comms.handle_packet(packet, units, link_ref) end) - - -- if linked, stop sending advertisements - linked = link_ref.linked - end - - -- check for termination request - if event == "terminate" or ppm.should_terminate() then - log._warning("terminate requested, exiting...") - break - end -end +-- run threads +parallel.waitForAll(main_thread.exec, comms_thread.exec) println_ts("exited") log._info("exited") diff --git a/rtu/threads.lua b/rtu/threads.lua new file mode 100644 index 0000000..692697d --- /dev/null +++ b/rtu/threads.lua @@ -0,0 +1,159 @@ +-- #REQUIRES comms.lua +-- #REQUIRES log.lua +-- #REQUIRES ppm.lua +-- #REQUIRES util.lua + +local print = util.print +local println = util.println +local print_ts = util.print_ts +local println_ts = util.println_ts + +local psleep = util.psleep + +local MAIN_CLOCK = 2 -- (2Hz, 40 ticks) +local COMMS_CLOCK = 0.25 -- (4Hz, 5 ticks) + +-- main thread +function thread__main(smem) + -- execute thread + local exec = function () + log._debug("main thread start") + + -- advertisement/heartbeat clock + local loop_clock = os.startTimer(MAIN_CLOCK) + + -- load in from shared memory + local rtu_state = smem.rtu_state + local rtu_dev = smem.rtu_dev + local rtu_comms = smem.rtu_sys.rtu_comms + local units = smem.rtu_sys.units + + -- event loop + while true do + local event, param1, param2, param3, param4, param5 = os.pullEventRaw() + + if event == "peripheral_detach" then + -- handle loss of a device + local device = ppm.handle_unmount(param1) + + for i = 1, #units do + -- find disconnected device + if units[i].device == device.dev then + -- we are going to let the PPM prevent crashes + -- return fault flags/codes to MODBUS queries + local unit = units[i] + println_ts("lost the " .. unit.type .. " on interface " .. unit.name) + end + end + elseif event == "peripheral" then + -- relink lost peripheral to correct unit entry + local type, device = ppm.mount(param1) + + for i = 1, #units do + local unit = units[i] + + -- find disconnected device to reconnect + if unit.name == param1 then + -- found, re-link + unit.device = device + + if unit.type == "boiler" then + unit.rtu = boiler_rtu.new(device) + elseif unit.type == "turbine" then + unit.rtu = turbine_rtu.new(device) + elseif unit.type == "imatrix" then + unit.rtu = imatrix_rtu.new(device) + end + + unit.modbus_io = modbus.new(unit.rtu) + + println_ts("reconnected the " .. unit.type .. " on interface " .. unit.name) + end + end + elseif event == "timer" and param1 == loop_clock then + -- start next clock timer + loop_clock = os.startTimer(MAIN_CLOCK) + + -- period tick, if we are linked send heartbeat, if not send advertisement + if rtu_state.linked then + rtu_comms.send_heartbeat() + else + -- advertise units + rtu_comms.send_advertisement(units) + end + elseif event == "modem_message" then + -- got a packet + local packet = rtu_comms.parse_packet(param1, param2, param3, param4, param5) + if packet ~= nil then + smem.q.mq_comms.push_packet(packet) + end + + rtu_comms.handle_packet(packet, units, link_ref) + end + + -- check for termination request + if event == "terminate" or ppm.should_terminate() then + rtu_state.shutdown = true + log._warning("terminate requested, main thread exiting") + break + end + end + end + + return { exec = exec } +end + +-- communications handler thread +function thread__comms(smem) + -- execute thread + local exec = function () + log._debug("comms thread start") + + -- load in from shared memory + local rtu_state = smem.rtu_state + local rtu_comms = smem.rtu_sys.rtu_comms + local units = smem.rtu_sys.units + + local comms_queue = smem.q.mq_comms + + local last_update = util.time() + + -- thread loop + while true do + -- check for messages in the message queue + while comms_queue.ready() do + local msg = comms_queue.pop() + + if msg.qtype == mqueue.TYPE.COMMAND then + -- received a command + elseif msg.qtype == mqueue.TYPE.DATA then + -- received data + elseif msg.qtype == mqueue.TYPE.PACKET then + -- received a packet + -- handle the packet (rtu_state passed to allow setting link flag) + rtu_comms.handle_packet(msg.message, units, rtu_state) + end + + -- quick yield if we are looping right back + if comms_queue.ready() then util.nop() end + end + + -- check for termination request + if rtu_state.shutdown then + log._warning("comms thread exiting") + break + end + + -- delay before next check + local sleep_for = COMMS_CLOCK - (util.time() - last_update) + last_update = util.time() + if sleep_for > 0 then + psleep(sleep_for) + else + psleep(0.05) + end + end + end + + return { exec = exec } +end diff --git a/supervisor/mqueue.lua b/scada-common/mqueue.lua similarity index 78% rename from supervisor/mqueue.lua rename to scada-common/mqueue.lua index f79e686..3881d02 100644 --- a/supervisor/mqueue.lua +++ b/scada-common/mqueue.lua @@ -4,7 +4,8 @@ TYPE = { COMMAND = 0, - PACKET = 1 + DATA = 1, + PACKET = 2 } function new() @@ -17,19 +18,27 @@ function new() local empty = function () return #queue == 0 end + + local ready = function () + return #queue > 0 + end local _push = function (qtype, message) table.insert(queue, { qtype = qtype, message = message }) end - local push_packet = function (message) - _push(TYPE.PACKET, message) - end - local push_command = function (message) _push(TYPE.COMMAND, message) end - + + local push_data = function (message) + _push(TYPE.DATA, message) + end + + local push_packet = function (message) + _push(TYPE.PACKET, message) + end + local pop = function () if #queue > 0 then return table.remove(queue) @@ -41,7 +50,9 @@ function new() return { length = length, empty = empty, + ready = ready, push_packet = push_packet, + push_data = push_data, push_command = push_command, pop = pop } diff --git a/scada-common/util.lua b/scada-common/util.lua index 97ce601..11b258e 100644 --- a/scada-common/util.lua +++ b/scada-common/util.lua @@ -39,9 +39,10 @@ end -- PARALLELIZATION -- --- block waiting for parallel call -function async_wait(f) - parallel.waitForAll(f) +-- no-op to provide a brief pause (and a yield) +-- EVENT_CONSUMER: this function consumes events +function nop() + sleep(0.05) end -- WATCHDOG -- diff --git a/supervisor/session/plc.lua b/supervisor/session/plc.lua index 695b34b..3a51666 100644 --- a/supervisor/session/plc.lua +++ b/supervisor/session/plc.lua @@ -330,7 +330,7 @@ function new_session(id, for_reactor, in_queue, out_queue) -- handle queue -- ------------------ - if not self.in_q.empty() then + if self.in_q.ready() then -- get a new message to process local message = self.in_q.pop() diff --git a/supervisor/session/svsessions.lua b/supervisor/session/svsessions.lua index e02619c..9c02ce0 100644 --- a/supervisor/session/svsessions.lua +++ b/supervisor/session/svsessions.lua @@ -130,7 +130,7 @@ local function _iterate(sessions) if ok then -- send packets in out queue -- @todo handle commands if that's being used too - while not session.out_queue.empty() do + while session.out_queue.ready() do local msg = session.out_queue.pop() if msg.qtype == mqueue.TYPE.PACKET then self.modem.transmit(session.r_port, session.l_port, msg.message.raw_sendable()) diff --git a/supervisor/startup.lua b/supervisor/startup.lua index 27a9db6..138cfd8 100644 --- a/supervisor/startup.lua +++ b/supervisor/startup.lua @@ -7,9 +7,9 @@ os.loadAPI("scada-common/util.lua") os.loadAPI("scada-common/ppm.lua") os.loadAPI("scada-common/comms.lua") os.loadAPI("scada-common/modbus.lua") +os.loadAPI("scada-common/mqueue.lua") os.loadAPI("config.lua") -os.loadAPI("mqueue.lua") os.loadAPI("session/rtu.lua") os.loadAPI("session/plc.lua") @@ -18,7 +18,7 @@ os.loadAPI("session/svsessions.lua") os.loadAPI("supervisor.lua") -local SUPERVISOR_VERSION = "alpha-v0.1.5" +local SUPERVISOR_VERSION = "alpha-v0.1.6" local print = util.print local println = util.println