diff --git a/reactor-plc/plc.lua b/reactor-plc/plc.lua index a6d260a..da71998 100644 --- a/reactor-plc/plc.lua +++ b/reactor-plc/plc.lua @@ -6,6 +6,11 @@ local PROTOCOLS = comms.PROTOCOLS local RPLC_TYPES = comms.RPLC_TYPES local RPLC_LINKING = comms.RPLC_LINKING +local print = util.print +local println = util.println +local print_ts = util.print_ts +local println_ts = util.println_ts + -- Internal Safety System -- identifies dangerous states and SCRAMs reactor if warranted -- autonomous from main SCADA supervisor/coordinator control @@ -167,6 +172,10 @@ function iss_init(reactor) return self.tripped, status, first_trip end + -- get the ISS status + local status = function () return self.cache end + local is_tripped = function () return self.tripped end + -- reset the ISS local reset = function () self.timed_out = false @@ -174,17 +183,13 @@ function iss_init(reactor) self.trip_cause = "" end - -- get the ISS status - local status = function () - return self.cache - end - return { reconnect_reactor = reconnect_reactor, trip_timeout = trip_timeout, check = check, - reset = reset, - status = status + status = status, + is_tripped = is_tripped, + reset = reset } end @@ -225,7 +230,6 @@ function comms_init(id, modem, local_port, server_port, reactor, iss) -- variable reactor status information, excluding heating rate local _reactor_status = function () - ppm.clear_fault() return { self.reactor.getStatus(), self.reactor.getBurnRate(), @@ -249,20 +253,24 @@ function comms_init(id, modem, local_port, server_port, reactor, iss) self.reactor.getHeatedCoolant()['amount'], self.reactor.getHeatedCoolantNeeded(), self.reactor.getHeatedCoolantFilledPercentage() - }, ppm.faulted() + }, self.reactor.__p_is_faulted() end local _update_status_cache = function () local status, faulted = _reactor_status() local changed = false - if not faulted then - for i = 1, #status do - if status[i] ~= self.status_cache[i] then - changed = true - break + if self.status_cache ~= nil then + if not faulted then + for i = 1, #status do + if status[i] ~= self.status_cache[i] then + changed = true + break + end end end + else + changed = true end if changed then @@ -285,8 +293,6 @@ function comms_init(id, modem, local_port, server_port, reactor, iss) -- send structure properties (these should not change) -- (server will cache these) local _send_struct = function () - ppm.clear_fault() - local mek_data = { self.reactor.getHeatCapacity(), self.reactor.getFuelAssemblies(), @@ -298,7 +304,7 @@ function comms_init(id, modem, local_port, server_port, reactor, iss) self.reactor.getMaxBurnRate() } - if not ppm.is_faulted() then + if not self.reactor.__p_is_faulted() then _send(RPLC_TYPES.MEK_STRUCT, mek_data) else log._error("failed to send structure: PPM fault") @@ -323,10 +329,48 @@ function comms_init(id, modem, local_port, server_port, reactor, iss) _update_status_cache() end + -- attempt to establish link with supervisor + local send_link_req = function () + _send(RPLC_TYPES.LINK_REQ, { self.id }) + end + + -- send live status information + local send_status = function (degraded) + local mek_data = nil + + if _update_status_cache() then + mek_data = self.status_cache + end + + local sys_status = { + util.time(), + (not self.scrammed), + iss.is_tripped(), + degraded, + self.reactor.getHeatingRate(), + mek_data + } + + _send(RPLC_TYPES.STATUS, sys_status) + end + + local send_iss_status = function () + _send(RPLC_TYPES.ISS_STATUS, iss.status()) + end + + local send_iss_alarm = function (cause) + local iss_alarm = { + cause, + iss.status() + } + + _send(RPLC_TYPES.ISS_ALARM, iss_alarm) + end + -- parse an RPLC packet local parse_packet = function(side, sender, reply_to, message, distance) local pkt = nil - local s_pkt = scada_packet() + local s_pkt = comms.scada_packet() -- parse packet as generic SCADA packet s_pkt.receive(side, sender, reply_to, message, distance) @@ -358,7 +402,7 @@ function comms_init(id, modem, local_port, server_port, reactor, iss) -- check sequence number if self.r_seq_num == nil then self.r_seq_num = packet.scada_frame.seq_num() - elseif self.r_seq_num >= packet.scada_frame.seq_num() then + elseif self.linked and self.r_seq_num >= packet.scada_frame.seq_num() then log._warning("sequence out-of-order: last = " .. self.r_seq_num .. ", new = " .. packet.scada_frame.seq_num()) return else @@ -388,19 +432,19 @@ function comms_init(id, modem, local_port, server_port, reactor, iss) if link_ack == RPLC_LINKING.ALLOW then _send_struct() - send_status() + send_status(plc_state.degraded) log._debug("re-sent initial status data") elseif link_ack == RPLC_LINKING.DENY then -- @todo: make sure this doesn't become a MITM security risk - print_ts("received unsolicited link denial, unlinking\n") - log._debug("unsolicited rplc link request denied") + println_ts("received unsolicited link denial, unlinking") + log._debug("unsolicited RPLC link request denied") elseif link_ack == RPLC_LINKING.COLLISION then -- @todo: make sure this doesn't become a MITM security risk - print_ts("received unsolicited link collision, unlinking\n") - log._warning("unsolicited rplc link request collision") + println_ts("received unsolicited link collision, unlinking") + log._warning("unsolicited RPLC link request collision") else - print_ts("invalid unsolicited link response\n") - log._error("unsolicited unknown rplc link request response") + println_ts("invalid unsolicited link response") + log._error("unsolicited unknown RPLC link request response") end self.linked = link_ack == RPLC_LINKING.ALLOW @@ -452,22 +496,25 @@ function comms_init(id, modem, local_port, server_port, reactor, iss) local link_ack = packet.data[1] if link_ack == RPLC_LINKING.ALLOW then - print_ts("...linked!\n") - log._debug("rplc link request approved") + println_ts("linked!") + log._debug("RPLC link request approved") + + -- reset remote sequence number + self.r_seq_num = nil _send_struct() - send_status() + send_status(plc_state.degraded) log._debug("sent initial status data") elseif link_ack == RPLC_LINKING.DENY then - print_ts("...denied, retrying...\n") - log._debug("rplc link request denied") + println_ts("link request denied, retrying...") + log._debug("RPLC link request denied") elseif link_ack == RPLC_LINKING.COLLISION then - print_ts("reactor PLC ID collision (check config), retrying...\n") - log._warning("rplc link request collision") + println_ts("reactor PLC ID collision (check config), retrying...") + log._warning("RPLC link request collision") else - print_ts("invalid link response, bad channel? retrying...\n") - log._error("unknown rplc link request response") + println_ts("invalid link response, bad channel? retrying...") + log._error("unknown RPLC link request response") end self.linked = link_ack == RPLC_LINKING.ALLOW @@ -480,46 +527,6 @@ function comms_init(id, modem, local_port, server_port, reactor, iss) end end - -- attempt to establish link with supervisor - local send_link_req = function () - _send(RPLC_TYPES.LINK_REQ, {}) - end - - -- send live status information - -- overridden : if ISS force disabled reactor - -- degraded : if PLC status is degraded - local send_status = function (overridden, degraded) - local mek_data = nil - - if _update_status_cache() then - mek_data = self.status_cache - end - - local sys_status = { - util.time(), - (not self.scrammed), - overridden, - degraded, - self.reactor.getHeatingRate(), - mek_data - } - - _send(RPLC_TYPES.STATUS, sys_status) - end - - local send_iss_status = function () - _send(RPLC_TYPES.ISS_STATUS, iss.status()) - end - - local send_iss_alarm = function (cause) - local iss_alarm = { - cause, - iss.status() - } - - _send(RPLC_TYPES.ISS_ALARM, iss_alarm) - end - local is_scrammed = function () return self.scrammed end local is_linked = function () return self.linked end local unlink = function () self.linked = false end diff --git a/reactor-plc/startup.lua b/reactor-plc/startup.lua index f37280d..a4ac6a8 100644 --- a/reactor-plc/startup.lua +++ b/reactor-plc/startup.lua @@ -11,7 +11,7 @@ os.loadAPI("config.lua") os.loadAPI("plc.lua") os.loadAPI("threads.lua") -local R_PLC_VERSION = "alpha-v0.3.1" +local R_PLC_VERSION = "alpha-v0.3.3" local print = util.print local println = util.println diff --git a/reactor-plc/threads.lua b/reactor-plc/threads.lua index 201abe1..e9f0d3d 100644 --- a/reactor-plc/threads.lua +++ b/reactor-plc/threads.lua @@ -10,8 +10,8 @@ local println_ts = util.println_ts local async_wait = util.async_wait -local MAIN_CLOCK = 0.5 -- (2Hz, 10 ticks) -local ISS_CLOCK = 0.5 -- (2Hz, 10 ticks) +local MAIN_CLOCK = 0.5 -- (2Hz, 10 ticks) +local ISS_CLOCK = 0.25 -- (4Hz, 5 ticks) however this is AFTER all the ISS checks, so it is a pause between calls, not start-to-start local ISS_EVENT = { SCRAM = 1, @@ -28,7 +28,7 @@ function thread__main(shared_memory, init) local LINK_TICKS = 4 local loop_clock = nil - local ticks_to_update = LINK_TICKS -- start by linking + local ticks_to_update = 0 -- load in from shared memory local networked = shared_memory.networked @@ -40,7 +40,7 @@ function thread__main(shared_memory, init) local conn_watchdog = shared_memory.system.conn_watchdog -- debug - -- local last_update = util.time() + local last_update = util.time() -- event loop while true do @@ -55,26 +55,25 @@ function thread__main(shared_memory, init) -- send updated data if not plc_state.no_modem then - if plc_comms.is_linked() then async_wait(function () plc_comms.send_status(iss_tripped, plc_state.degraded) plc_comms.send_iss_status() end) else - ticks_to_update = ticks_to_update - 1 - - if ticks_to_update <= 0 then + if ticks_to_update == 0 then plc_comms.send_link_req() ticks_to_update = LINK_TICKS + else + ticks_to_update = ticks_to_update - 1 end end end -- debug - -- print(util.time() - last_update) - -- println("ms") - -- last_update = util.time() + print(util.time() - last_update) + println("ms") + last_update = util.time() end elseif event == "modem_message" and networked and not plc_state.no_modem then -- got a packet @@ -82,8 +81,10 @@ function thread__main(shared_memory, init) conn_watchdog.feed() -- handle the packet (plc_state passed to allow clearing SCRAM flag) - local packet = plc_comms.parse_packet(p1, p2, p3, p4, p5) - async_wait(function () plc_comms.handle_packet(packet, plc_state) end) + async_wait(function () + local packet = plc_comms.parse_packet(param1, param2, param3, param4, param5) + plc_comms.handle_packet(packet, plc_state) + end) elseif event == "timer" and networked and param1 == conn_watchdog.get_timer() then -- haven't heard from server recently? shutdown reactor plc_comms.unlink() @@ -169,7 +170,7 @@ function thread__main(shared_memory, init) elseif event == "clock_start" then -- start loop clock loop_clock = os.startTimer(MAIN_CLOCK) - log._debug("loop clock started") + log._debug("main thread started") end -- check for termination request @@ -208,9 +209,6 @@ function thread__iss(shared_memory) local reactor = shared_memory.plc_devices.reactor if event == "timer" and param1 == loop_clock then - -- start next clock timer - loop_clock = os.startTimer(ISS_CLOCK) - -- ISS checks if plc_state.init_ok then -- if we tried to SCRAM but failed, keep trying @@ -244,6 +242,10 @@ function thread__iss(shared_memory) end) end + -- start next clock timer after all the long operations + -- otherwise we will never get around to other events + loop_clock = os.startTimer(ISS_CLOCK) + -- debug -- print(util.time() - last_update) -- println("ms") @@ -270,13 +272,13 @@ function thread__iss(shared_memory) -- watchdog tripped plc_state.scram = true iss.trip_timeout() - println_ts("server timeout, reactor disabled") - log._warning("server timeout, reactor disabled") + println_ts("server timeout") + log._warning("server timeout") end elseif event == "clock_start" then -- start loop clock loop_clock = os.startTimer(ISS_CLOCK) - log._debug("loop clock started") + log._debug("iss thread started") end -- check for termination request diff --git a/rtu/startup.lua b/rtu/startup.lua index 4626df2..2d4c58f 100644 --- a/rtu/startup.lua +++ b/rtu/startup.lua @@ -17,7 +17,7 @@ os.loadAPI("dev/boiler_rtu.lua") os.loadAPI("dev/imatrix_rtu.lua") os.loadAPI("dev/turbine_rtu.lua") -local RTU_VERSION = "alpha-v0.3.1" +local RTU_VERSION = "alpha-v0.3.2" local print = util.print local println = util.println @@ -233,7 +233,7 @@ while true do elseif event == "modem_message" then -- got a packet local link_ref = { linked = linked } - local packet = rtu_comms.parse_packet(p1, p2, p3, p4, p5) + local packet = rtu_comms.parse_packet(param1, param2, param3, param4, param5) async_wait(function () rtu_comms.handle_packet(packet, units, link_ref) end) diff --git a/scada-common/comms.lua b/scada-common/comms.lua index 972f768..bca5682 100644 --- a/scada-common/comms.lua +++ b/scada-common/comms.lua @@ -74,12 +74,14 @@ function scada_packet() self.raw = self.modem_msg_in.msg - if #self.raw >= 3 then - self.valid = true - self.seq_num = self.raw[1] - self.protocol = self.raw[2] - self.length = #self.raw[3] - self.payload = self.raw[3] + if type(self.raw) == "table" then + if #self.raw >= 3 then + self.valid = true + self.seq_num = self.raw[1] + self.protocol = self.raw[2] + self.length = #self.raw[3] + self.payload = self.raw[3] + end end return self.valid @@ -90,12 +92,12 @@ function scada_packet() local modem_event = function () return self.modem_msg_in end local raw_sendable = function () return self.raw end - local sender = function () return self.s_port end - local receiver = function () return self.r_port end + local local_port = function () return self.modem_msg_in.s_port end + local remote_port = function () return self.modem_msg_in.r_port end local is_valid = function () return self.valid end - local seq_num = function () return self.seq_num end + local seq_num = function () return self.seq_num end local protocol = function () return self.protocol end local length = function () return self.length end local data = function () return self.payload end @@ -107,9 +109,9 @@ function scada_packet() -- raw access modem_event = modem_event, raw_sendable = raw_sendable, - -- sender/receiver - sender = sender, - receiver = receiver, + -- ports + local_port = local_port, + remote_port = remote_port, -- well-formed is_valid = is_valid, -- packet properties diff --git a/supervisor/mqueue.lua b/supervisor/mqueue.lua index 1aa5869..f79e686 100644 --- a/supervisor/mqueue.lua +++ b/supervisor/mqueue.lua @@ -23,11 +23,11 @@ function new() end local push_packet = function (message) - push(TYPE.PACKET, message) + _push(TYPE.PACKET, message) end local push_command = function (message) - push(TYPE.COMMAND, message) + _push(TYPE.COMMAND, message) end local pop = function () diff --git a/supervisor/session/svsessions.lua b/supervisor/session/svsessions.lua index edc2ea2..e02619c 100644 --- a/supervisor/session/svsessions.lua +++ b/supervisor/session/svsessions.lua @@ -70,6 +70,7 @@ function get_reactor_session(reactor) end function establish_plc_session(local_port, remote_port, for_reactor) + util.println(remote_port) if get_reactor_session(for_reactor) == nil then local plc_s = { open = true, @@ -81,9 +82,12 @@ function establish_plc_session(local_port, remote_port, for_reactor) instance = nil } - plc_s.instance = plc.new_session(next_plc_id, plc_s.in_queue, plc_s.out_queue) + plc_s.instance = plc.new_session(self.next_plc_id, for_reactor, plc_s.in_queue, plc_s.out_queue) table.insert(self.plc_sessions, plc_s) - next_plc_id = next_plc_id + 1 + + log._debug("established new PLC session to " .. remote_port .. " with ID " .. self.next_plc_id) + + self.next_plc_id = self.next_plc_id + 1 -- success return plc_s.instance.get_id() @@ -129,7 +133,7 @@ local function _iterate(sessions) while not session.out_queue.empty() do local msg = session.out_queue.pop() if msg.qtype == mqueue.TYPE.PACKET then - self.modem.transmit(self.r_port, self.l_port, msg.message.raw_sendable()) + self.modem.transmit(session.r_port, session.l_port, msg.message.raw_sendable()) end end else @@ -163,7 +167,7 @@ local function _free_closed(sessions) end move_to = move_to + 1 else - log._debug("free'ing closing session " .. session.instance.get_id() .. " on remote port " .. session.r_port) + log._debug("free'ing closed session " .. session.instance.get_id() .. " on remote port " .. session.r_port) sessions[i] = nil end end diff --git a/supervisor/startup.lua b/supervisor/startup.lua index fa4de82..27a9db6 100644 --- a/supervisor/startup.lua +++ b/supervisor/startup.lua @@ -18,7 +18,7 @@ os.loadAPI("session/svsessions.lua") os.loadAPI("supervisor.lua") -local SUPERVISOR_VERSION = "alpha-v0.1.4" +local SUPERVISOR_VERSION = "alpha-v0.1.5" local print = util.print local println = util.println @@ -93,7 +93,7 @@ while true do svsessions.check_all_watchdogs(param1) elseif event == "modem_message" then -- got a packet - local packet = superv_comms.parse_packet(p1, p2, p3, p4, p5) + local packet = superv_comms.parse_packet(param1, param2, param3, param4, param5) superv_comms.handle_packet(packet) end diff --git a/supervisor/supervisor.lua b/supervisor/supervisor.lua index 31b032e..571ab49 100644 --- a/supervisor/supervisor.lua +++ b/supervisor/supervisor.lua @@ -5,6 +5,7 @@ local PROTOCOLS = comms.PROTOCOLS local RPLC_TYPES = comms.RPLC_TYPES +local RPLC_LINKING = comms.RPLC_LINKING local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES local RTU_ADVERT_TYPES = comms.RTU_ADVERT_TYPES @@ -64,7 +65,7 @@ function superv_comms(num_reactors, modem, dev_listen, coord_listen) -- parse a packet local parse_packet = function(side, sender, reply_to, message, distance) local pkt = nil - local s_pkt = scada_packet() + local s_pkt = comms.scada_packet() -- parse packet as generic SCADA packet s_pkt.receive(side, sender, reply_to, message, distance) @@ -104,21 +105,22 @@ function superv_comms(num_reactors, modem, dev_listen, coord_listen) local handle_packet = function(packet) if packet ~= nil then - local sender = packet.scada_frame.sender() - local receiver = packet.scada_frame.receiver() + local l_port = packet.scada_frame.local_port() + local r_port = packet.scada_frame.remote_port() local protocol = packet.scada_frame.protocol() -- device (RTU/PLC) listening channel - if receiver == self.dev_listen then + if l_port == self.dev_listen then if protocol == PROTOCOLS.MODBUS_TCP then -- MODBUS response elseif protocol == PROTOCOLS.RPLC then -- reactor PLC packet - local session = svsessions.find_session(SESSION_TYPE.PLC_SESSION, sender) + local session = svsessions.find_session(SESSION_TYPE.PLC_SESSION, r_port) if session then if packet.type == RPLC_TYPES.LINK_REQ then -- new device on this port? that's a collision - _send_plc_linking(sender, { RPLC_LINKING.COLLISION }) + log._debug("PLC_LNK: request from existing connection received on " .. r_port .. ", responding with collision") + _send_plc_linking(r_port, { RPLC_LINKING.COLLISION }) else -- pass the packet onto the session handler session.in_queue.push_packet(packet) @@ -126,18 +128,25 @@ function superv_comms(num_reactors, modem, dev_listen, coord_listen) else -- unknown session, is this a linking request? if packet.type == RPLC_TYPES.LINK_REQ then - -- this is a linking request - local plc_id = svsessions.establish_plc_session(sender) - if plc_id == false then - -- reactor already has a PLC assigned - _send_plc_linking(sender, { RPLC_LINKING.COLLISION }) + if packet.length == 1 then + -- this is a linking request + local plc_id = svsessions.establish_plc_session(l_port, r_port, packet.data[1]) + if plc_id == false then + -- reactor already has a PLC assigned + log._debug("PLC_LNK: assignment collision with reactor " .. packet.data[1]) + _send_plc_linking(r_port, { RPLC_LINKING.COLLISION }) + else + -- got an ID; assigned to a reactor successfully + log._debug("PLC_LNK: allowed for device at " .. r_port) + _send_plc_linking(r_port, { RPLC_LINKING.ALLOW }) + end else - -- got an ID; assigned to a reactor successfully - _send_plc_linking(sender, { RPLC_LINKING.ALLOW }) + log._debug("PLC_LNK: new linking packet length mismatch") end else -- force a re-link - _send_plc_linking(sender, { RPLC_LINKING.DENY }) + log._debug("PLC_LNK: no session but not a link, force relink") + _send_plc_linking(r_port, { RPLC_LINKING.DENY }) end end elseif protocol == PROTOCOLS.SCADA_MGMT then @@ -146,7 +155,7 @@ function superv_comms(num_reactors, modem, dev_listen, coord_listen) log._debug("illegal packet type " .. protocol .. " on device listening channel") end -- coordinator listening channel - elseif receiver == self.coord_listen then + elseif l_port == self.coord_listen then if protocol == PROTOCOLS.SCADA_MGMT then -- SCADA management packet elseif protocol == PROTOCOLS.COORD_DATA then @@ -155,7 +164,7 @@ function superv_comms(num_reactors, modem, dev_listen, coord_listen) log._debug("illegal packet type " .. protocol .. " on coordinator listening channel") end else - log._error("received packet on unused channel " .. receiver, true) + log._error("received packet on unused channel " .. l_port, true) end end end