From b48c9563540636f411dd88313f7ccc7ff42e3c46 Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Tue, 18 Apr 2023 13:55:18 -0400 Subject: [PATCH] #201 coordinator apisessions for pocket access --- coordinator/apisessions.lua | 22 --- coordinator/config.lua | 3 +- coordinator/coordinator.lua | 80 ++++++++- coordinator/iocontrol.lua | 35 ++-- coordinator/session/api.lua | 247 ++++++++++++++++++++++++++++ coordinator/session/apisessions.lua | 174 ++++++++++++++++++++ coordinator/startup.lua | 10 +- 7 files changed, 525 insertions(+), 46 deletions(-) delete mode 100644 coordinator/apisessions.lua create mode 100644 coordinator/session/api.lua create mode 100644 coordinator/session/apisessions.lua diff --git a/coordinator/apisessions.lua b/coordinator/apisessions.lua deleted file mode 100644 index 8646837..0000000 --- a/coordinator/apisessions.lua +++ /dev/null @@ -1,22 +0,0 @@ -local apisessions = {} - ----@param packet capi_frame ----@diagnostic disable-next-line: unused-local -function apisessions.handle_packet(packet) -end - --- attempt to identify which session's watchdog timer fired ----@param timer_event number ----@diagnostic disable-next-line: unused-local -function apisessions.check_all_watchdogs(timer_event) -end - --- delete all closed sessions -function apisessions.free_all_closed() -end - --- close all open connections -function apisessions.close_all() -end - -return apisessions diff --git a/coordinator/config.lua b/coordinator/config.lua index 052bba4..b3eab5f 100644 --- a/coordinator/config.lua +++ b/coordinator/config.lua @@ -9,7 +9,8 @@ config.SCADA_API_LISTEN = 16200 -- max trusted modem message distance (0 to disable check) config.TRUSTED_RANGE = 0 -- time in seconds (>= 2) before assuming a remote device is no longer active -config.COMMS_TIMEOUT = 5 +config.SV_TIMEOUT = 5 +config.API_TIMEOUT = 5 -- expected number of reactor units, used only to require that number of unit monitors config.NUM_UNITS = 4 diff --git a/coordinator/coordinator.lua b/coordinator/coordinator.lua index d8d43a2..c1777a9 100644 --- a/coordinator/coordinator.lua +++ b/coordinator/coordinator.lua @@ -3,10 +3,11 @@ local log = require("scada-common.log") local ppm = require("scada-common.ppm") local util = require("scada-common.util") -local apisessions = require("coordinator.apisessions") local iocontrol = require("coordinator.iocontrol") local process = require("coordinator.process") +local apisessions = require("coordinator.session.apisessions") + local dialog = require("coordinator.ui.dialog") local print = util.print @@ -224,7 +225,8 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, range sv_r_seq_num = nil, sv_config_err = false, connected = false, - last_est_ack = ESTABLISH_ACK.ALLOW + last_est_ack = ESTABLISH_ACK.ALLOW, + last_api_est_acks = {} } comms.set_trusted_range(range) @@ -262,6 +264,19 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, range self.sv_seq_num = self.sv_seq_num + 1 end + -- send an API establish request response + ---@param dest integer + ---@param msg table + local function _send_api_establish_ack(seq_id, dest, msg) + local s_pkt = comms.scada_packet() + local m_pkt = comms.mgmt_packet() + + m_pkt.make(SCADA_MGMT_TYPE.ESTABLISH, msg) + s_pkt.make(seq_id, PROTOCOL.SCADA_MGMT, m_pkt.raw_sendable()) + + modem.transmit(dest, api_listen, s_pkt.raw_sendable()) + end + -- attempt connection establishment local function _send_establish() _send_sv(PROTOCOL.SCADA_MGMT, SCADA_MGMT_TYPE.ESTABLISH, { comms.version, version, DEVICE_TYPE.CRDN }) @@ -416,13 +431,70 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, range ---@param packet mgmt_frame|crdn_frame|capi_frame|nil function public.handle_packet(packet) if packet ~= nil then - local protocol = packet.scada_frame.protocol() local l_port = packet.scada_frame.local_port() + local r_port = packet.scada_frame.remote_port() + local protocol = packet.scada_frame.protocol() if l_port == api_listen then if protocol == PROTOCOL.COORD_API then ---@cast packet capi_frame - apisessions.handle_packet(packet) + -- look for an associated session + local session = apisessions.find_session(r_port) + + -- API packet + if session ~= nil then + -- pass the packet onto the session handler + session.in_queue.push_packet(packet) + else + -- any other packet should be session related, discard it + log.debug("discarding COORD_API packet without a known session") + end + elseif protocol == PROTOCOL.SCADA_MGMT then + ---@cast packet mgmt_frame + -- look for an associated session + local session = apisessions.find_session(r_port) + + -- SCADA management packet + if session ~= nil then + -- pass the packet onto the session handler + session.in_queue.push_packet(packet) + elseif packet.type == SCADA_MGMT_TYPE.ESTABLISH then + -- establish a new session + local next_seq_id = packet.scada_frame.seq_num() + 1 + + -- validate packet and continue + if packet.length == 3 and type(packet.data[1]) == "string" and type(packet.data[2]) == "string" then + local comms_v = packet.data[1] + local firmware_v = packet.data[2] + local dev_type = packet.data[3] + + if comms_v ~= comms.version then + if self.last_api_est_acks[r_port] ~= ESTABLISH_ACK.BAD_VERSION then + log.info(util.c("dropping API establish packet with incorrect comms version v", comms_v, " (expected v", comms.version, ")")) + self.last_api_est_acks[r_port] = ESTABLISH_ACK.BAD_VERSION + end + + _send_api_establish_ack(next_seq_id, r_port, { ESTABLISH_ACK.BAD_VERSION }) + elseif dev_type == DEVICE_TYPE.PKT then + -- pocket linking request + local id = apisessions.establish_session(l_port, r_port, firmware_v) + println(util.c("API: pocket (", firmware_v, ") [:", r_port, "] connected with session ID ", id)) + coordinator.log_comms(util.c("API: pocket (", firmware_v, ") [:", r_port, "] connected with session ID ", id)) + + _send_api_establish_ack(next_seq_id, r_port, { ESTABLISH_ACK.ALLOW }) + self.last_api_est_acks[r_port] = ESTABLISH_ACK.ALLOW + else + log.debug(util.c("illegal establish packet for device ", dev_type, " on API listening channel")) + _send_api_establish_ack(next_seq_id, r_port, { ESTABLISH_ACK.DENY }) + end + else + log.debug("invalid establish packet (on API listening channel)") + _send_api_establish_ack(next_seq_id, r_port, { ESTABLISH_ACK.DENY }) + end + else + -- any other packet should be session related, discard it + log.debug(util.c(r_port, "->", l_port, ": discarding SCADA_MGMT packet without a known session")) + end else log.debug("illegal packet type " .. protocol .. " on api listening channel", true) end diff --git a/coordinator/iocontrol.lua b/coordinator/iocontrol.lua index ac45a2f..e26546d 100644 --- a/coordinator/iocontrol.lua +++ b/coordinator/iocontrol.lua @@ -18,6 +18,11 @@ local iocontrol = {} ---@class ioctl local io = {} +-- placeholder acknowledge function for type hinting +---@param success boolean +---@diagnostic disable-next-line: unused-local +local function __generic_ack(success) end + -- initialize the coordinator IO controller ---@param conf facility_conf configuration ---@param comms coord_comms comms reference @@ -45,11 +50,11 @@ function iocontrol.init(conf, comms) radiation = types.new_zero_radiation_reading(), - save_cfg_ack = function (success) end, ---@param success boolean - start_ack = function (success) end, ---@param success boolean - stop_ack = function (success) end, ---@param success boolean - scram_ack = function (success) end, ---@param success boolean - ack_alarms_ack = function (success) end, ---@param success boolean + save_cfg_ack = __generic_ack, + start_ack = __generic_ack, + stop_ack = __generic_ack, + scram_ack = __generic_ack, + ack_alarms_ack = __generic_ack, ps = psil.create(), @@ -74,7 +79,6 @@ function iocontrol.init(conf, comms) ---@class ioctl_unit local entry = { - ---@type integer unit_id = i, num_boilers = 0, @@ -85,7 +89,8 @@ function iocontrol.init(conf, comms) waste_control = 0, radiation = types.new_zero_radiation_reading(), - a_group = 0, -- auto control group + -- auto control group + a_group = 0, start = function () process.start(i) end, scram = function () process.scram(i) end, @@ -96,12 +101,12 @@ function iocontrol.init(conf, comms) set_group = function (grp) process.set_group(i, grp) end, ---@param grp integer|0 group ID or 0 - start_ack = function (success) end, ---@param success boolean - scram_ack = function (success) end, ---@param success boolean - reset_rps_ack = function (success) end, ---@param success boolean - ack_alarms_ack = function (success) end, ---@param success boolean - set_burn_ack = function (success) end, ---@param success boolean - set_waste_ack = function (success) end, ---@param success boolean + start_ack = __generic_ack, + scram_ack = __generic_ack, + reset_rps_ack = __generic_ack, + ack_alarms_ack = __generic_ack, + set_burn_ack = __generic_ack, + set_waste_ack = __generic_ack, alarm_callbacks = { c_breach = { ack = function () ack(1) end, reset = function () reset(1) end }, @@ -134,10 +139,10 @@ function iocontrol.init(conf, comms) ALARM_STATE.INACTIVE -- turbine trip }, - annunciator = {}, ---@type annunciator + annunciator = {}, ---@type annunciator unit_ps = psil.create(), - reactor_data = {}, ---@type reactor_db + reactor_data = {}, ---@type reactor_db boiler_ps_tbl = {}, boiler_data_tbl = {}, diff --git a/coordinator/session/api.lua b/coordinator/session/api.lua new file mode 100644 index 0000000..e062295 --- /dev/null +++ b/coordinator/session/api.lua @@ -0,0 +1,247 @@ +local comms = require("scada-common.comms") +local log = require("scada-common.log") +local mqueue = require("scada-common.mqueue") +local util = require("scada-common.util") + +local api = {} + +local PROTOCOL = comms.PROTOCOL +-- local CAPI_TYPE = comms.CAPI_TYPE +local SCADA_MGMT_TYPE = comms.SCADA_MGMT_TYPE + +local println = util.println + +-- retry time constants in ms +-- local INITIAL_WAIT = 1500 +-- local RETRY_PERIOD = 1000 + +local API_S_CMDS = { +} + +local API_S_DATA = { +} + +api.API_S_CMDS = API_S_CMDS +api.API_S_DATA = API_S_DATA + +local PERIODICS = { + KEEP_ALIVE = 2000 +} + +-- pocket API session +---@nodiscard +---@param id integer session ID +---@param in_queue mqueue in message queue +---@param out_queue mqueue out message queue +---@param timeout number communications timeout +function api.new_session(id, in_queue, out_queue, timeout) + local log_header = "api_session(" .. id .. "): " + + local self = { + -- connection properties + seq_num = 0, + r_seq_num = nil, + connected = true, + conn_watchdog = util.new_watchdog(timeout), + last_rtt = 0, + -- periodic messages + periodics = { + keep_alive = 0 + }, + -- when to next retry one of these requests + retry_times = { + }, + -- command acknowledgements + acks = { + }, + -- session database + ---@class api_db + sDB = { + } + } + + ---@class api_session + local public = {} + + -- mark this API session as closed, stop watchdog + local function _close() + self.conn_watchdog.cancel() + self.connected = false + end + + -- send a CAPI packet + ---@param msg_type CAPI_TYPE + ---@param msg table + local function _send(msg_type, msg) + local s_pkt = comms.scada_packet() + local c_pkt = comms.capi_packet() + + c_pkt.make(msg_type, msg) + s_pkt.make(self.seq_num, PROTOCOL.COORD_API, c_pkt.raw_sendable()) + + out_queue.push_packet(s_pkt) + self.seq_num = self.seq_num + 1 + end + + -- send a SCADA management packet + ---@param msg_type SCADA_MGMT_TYPE + ---@param msg table + local function _send_mgmt(msg_type, msg) + local s_pkt = comms.scada_packet() + local m_pkt = comms.mgmt_packet() + + m_pkt.make(msg_type, msg) + s_pkt.make(self.seq_num, PROTOCOL.SCADA_MGMT, m_pkt.raw_sendable()) + + out_queue.push_packet(s_pkt) + self.seq_num = self.seq_num + 1 + end + + -- handle a packet + ---@param pkt mgmt_frame|capi_frame + local function _handle_packet(pkt) + -- check sequence number + if self.r_seq_num == nil then + self.r_seq_num = pkt.scada_frame.seq_num() + elseif self.r_seq_num >= pkt.scada_frame.seq_num() then + log.warning(log_header .. "sequence out-of-order: last = " .. self.r_seq_num .. ", new = " .. pkt.scada_frame.seq_num()) + return + else + self.r_seq_num = pkt.scada_frame.seq_num() + end + + -- process packet + if pkt.scada_frame.protocol() == PROTOCOL.COORD_API then + ---@cast pkt capi_frame + -- feed watchdog + self.conn_watchdog.feed() + + -- handle packet by type + if pkt.type == nil then + else + log.debug(log_header .. "handler received unsupported CAPI packet type " .. pkt.type) + end + elseif pkt.scada_frame.protocol() == PROTOCOL.SCADA_MGMT then + ---@cast pkt mgmt_frame + if pkt.type == SCADA_MGMT_TYPE.KEEP_ALIVE then + -- keep alive reply + if pkt.length == 2 then + local srv_start = pkt.data[1] + -- local api_send = pkt.data[2] + local srv_now = util.time() + self.last_rtt = srv_now - srv_start + + if self.last_rtt > 750 then + log.warning(log_header .. "API KEEP_ALIVE round trip time > 750ms (" .. self.last_rtt .. "ms)") + end + + -- log.debug(log_header .. "API RTT = " .. self.last_rtt .. "ms") + -- log.debug(log_header .. "API TT = " .. (srv_now - api_send) .. "ms") + else + log.debug(log_header .. "SCADA keep alive packet length mismatch") + end + elseif pkt.type == SCADA_MGMT_TYPE.CLOSE then + -- close the session + _close() + else + log.debug(log_header .. "handler received unsupported SCADA_MGMT packet type " .. pkt.type) + end + end + end + + -- PUBLIC FUNCTIONS -- + + -- get the session ID + ---@nodiscard + function public.get_id() return id end + + -- get the session database + ---@nodiscard + function public.get_db() return self.sDB end + + -- check if a timer matches this session's watchdog + ---@nodiscard + function public.check_wd(timer) + return self.conn_watchdog.is_timer(timer) and self.connected + end + + -- close the connection + function public.close() + _close() + _send_mgmt(SCADA_MGMT_TYPE.CLOSE, {}) + println("connection to API session " .. id .. " closed by server") + log.info(log_header .. "session closed by server") + end + + -- iterate the session + ---@nodiscard + ---@return boolean connected + function public.iterate() + if self.connected then + ------------------ + -- handle queue -- + ------------------ + + local handle_start = util.time() + + while in_queue.ready() and self.connected do + -- get a new message to process + local message = in_queue.pop() + + if message ~= nil then + if message.qtype == mqueue.TYPE.PACKET then + -- handle a packet + _handle_packet(message.message) + elseif message.qtype == mqueue.TYPE.COMMAND then + -- handle instruction + elseif message.qtype == mqueue.TYPE.DATA then + -- instruction with body + end + end + + -- max 100ms spent processing queue + if util.time() - handle_start > 100 then + log.warning(log_header .. "exceeded 100ms queue process limit") + break + end + end + + -- exit if connection was closed + if not self.connected then + println("connection to API session " .. id .. " closed by remote host") + log.info(log_header .. "session closed by remote host") + return self.connected + end + + ---------------------- + -- update periodics -- + ---------------------- + + local elapsed = util.time() - self.periodics.last_update + + local periodics = self.periodics + + -- keep alive + + periodics.keep_alive = periodics.keep_alive + elapsed + if periodics.keep_alive >= PERIODICS.KEEP_ALIVE then + _send_mgmt(SCADA_MGMT_TYPE.KEEP_ALIVE, { util.time() }) + periodics.keep_alive = 0 + end + + self.periodics.last_update = util.time() + + --------------------- + -- attempt retries -- + --------------------- + + -- local rtimes = self.retry_times + end + + return self.connected + end + + return public +end + +return api diff --git a/coordinator/session/apisessions.lua b/coordinator/session/apisessions.lua new file mode 100644 index 0000000..c6d5a20 --- /dev/null +++ b/coordinator/session/apisessions.lua @@ -0,0 +1,174 @@ + +local log = require("scada-common.log") +local mqueue = require("scada-common.mqueue") +local util = require("scada-common.util") + +local config = require("coordinator.config") + +local api = require("coordinator.session.api") + +local apisessions = {} + +local self = { + modem = nil, + next_id = 0, + sessions = {} +} + +-- PRIVATE FUNCTIONS -- + +-- handle a session output queue +---@param session api_session_struct +local function _api_handle_outq(session) + -- record handler start time + local handle_start = util.time() + + -- process output queue + while session.out_queue.ready() do + -- get a new message to process + local msg = session.out_queue.pop() + + if msg ~= nil then + if msg.qtype == mqueue.TYPE.PACKET then + -- handle a packet to be sent + self.modem.transmit(session.r_port, session.l_port, msg.message.raw_sendable()) + elseif msg.qtype == mqueue.TYPE.COMMAND then + -- handle instruction/notification + elseif msg.qtype == mqueue.TYPE.DATA then + -- instruction/notification with body + end + end + + -- max 100ms spent processing queue + if util.time() - handle_start > 100 then + log.warning("API out queue handler exceeded 100ms queue process limit") + log.warning(util.c("offending session: port ", session.r_port)) + break + end + end +end + +-- cleanly close a session +---@param session api_session_struct +local function _shutdown(session) + session.open = false + session.instance.close() + + -- send packets in out queue (namely the close packet) + while session.out_queue.ready() do + local msg = session.out_queue.pop() + if msg ~= nil and msg.qtype == mqueue.TYPE.PACKET then + self.modem.transmit(session.r_port, session.l_port, msg.message.raw_sendable()) + end + end + + log.debug(util.c("closed API session ", session.instance.get_id(), " on remote port ", session.r_port)) +end + +-- PUBLIC FUNCTIONS -- + +-- initialize apisessions +---@param modem table +function apisessions.init(modem) + self.modem = modem +end + +-- re-link the modem +---@param modem table +function apisessions.relink_modem(modem) + self.modem = modem +end + +-- find a session by remote port +---@nodiscard +---@param port integer +---@return api_session_struct|nil +function apisessions.find_session(port) + for i = 1, #self.sessions do + if self.sessions[i].r_port == port then return self.sessions[i] end + end + return nil +end + +-- establish a new API session +---@nodiscard +---@param local_port integer +---@param remote_port integer +---@param version string +---@return integer session_id +function apisessions.establish_session(local_port, remote_port, version) + ---@class api_session_struct + local api_s = { + open = true, + version = version, + l_port = local_port, + r_port = remote_port, + in_queue = mqueue.new(), + out_queue = mqueue.new(), + instance = nil ---@type api_session + } + + api_s.instance = api.new_session(self.next_id, api_s.in_queue, api_s.out_queue, config.API_TIMEOUT) + table.insert(self.sessions, api_s) + + log.debug(util.c("established new API session to ", remote_port, " with ID ", self.next_id)) + + self.next_id = self.next_id + 1 + + -- success + return api_s.instance.get_id() +end + +-- attempt to identify which session's watchdog timer fired +---@param timer_event number +function apisessions.check_all_watchdogs(timer_event) + for i = 1, #self.sessions do + local session = self.sessions[i] ---@type api_session_struct + if session.open then + local triggered = session.instance.check_wd(timer_event) + if triggered then + log.debug(util.c("watchdog closing API session ", session.instance.get_id(), + " on remote port ", session.r_port, "...")) + _shutdown(session) + end + end + end +end + +-- iterate all the API sessions +function apisessions.iterate_all() + for i = 1, #self.sessions do + local session = self.sessions[i] ---@type api_session_struct + + if session.open and session.instance.iterate() then + _api_handle_outq(session) + else + session.open = false + end + end +end + +-- delete all closed sessions +function apisessions.free_all_closed() + local f = function (session) return session.open end + + ---@param session api_session_struct + local on_delete = function (session) + log.debug(util.c("free'ing closed API session ", session.instance.get_id(), + " on remote port ", session.r_port)) + end + + util.filter_table(self.sessions, f, on_delete) +end + +-- close all open connections +function apisessions.close_all() + for i = 1, #self.sessions do + local session = self.sessions[i] ---@type api_session_struct + if session.open then _shutdown(session) end + end + + apisessions.free_all_closed() +end + +return apisessions diff --git a/coordinator/startup.lua b/coordinator/startup.lua index d59e528..71f6f07 100644 --- a/coordinator/startup.lua +++ b/coordinator/startup.lua @@ -19,7 +19,7 @@ local iocontrol = require("coordinator.iocontrol") local renderer = require("coordinator.renderer") local sounder = require("coordinator.sounder") -local COORDINATOR_VERSION = "v0.12.6" +local COORDINATOR_VERSION = "v0.13.0" local println = util.println local println_ts = util.println_ts @@ -40,8 +40,10 @@ cfv.assert_port(config.SCADA_SV_PORT) cfv.assert_port(config.SCADA_SV_LISTEN) cfv.assert_port(config.SCADA_API_LISTEN) cfv.assert_type_int(config.TRUSTED_RANGE) -cfv.assert_type_num(config.COMMS_TIMEOUT) -cfv.assert_min(config.COMMS_TIMEOUT, 2) +cfv.assert_type_num(config.SV_TIMEOUT) +cfv.assert_min(config.SV_TIMEOUT, 2) +cfv.assert_type_num(config.API_TIMEOUT) +cfv.assert_min(config.API_TIMEOUT, 2) cfv.assert_type_int(config.NUM_UNITS) cfv.assert_type_num(config.SOUNDER_VOLUME) cfv.assert_type_bool(config.TIME_24_HOUR) @@ -140,7 +142,7 @@ local function main() end -- create connection watchdog - local conn_watchdog = util.new_watchdog(config.COMMS_TIMEOUT) + local conn_watchdog = util.new_watchdog(config.SV_TIMEOUT) conn_watchdog.cancel() log.debug("startup> conn watchdog created")