diff --git a/coordinator/config.lua b/coordinator/config.lua index 3196e80..ecb7599 100644 --- a/coordinator/config.lua +++ b/coordinator/config.lua @@ -1,11 +1,11 @@ local config = {} --- port of the SCADA supervisor -config.SCADA_SV_PORT = 16100 --- port to listen to incoming packets from supervisor -config.SCADA_SV_CTL_LISTEN = 16101 --- listen port for SCADA coordinator API access -config.SCADA_API_LISTEN = 16200 +-- supervisor comms channel +config.SVR_CHANNEL = 16240 +-- coordinator comms channel +config.CRD_CHANNEL = 16243 +-- pocket comms channel +config.PKT_CHANNEL = 16244 -- max trusted modem message distance (0 to disable check) config.TRUSTED_RANGE = 0 -- time in seconds (>= 2) before assuming a remote device is no longer active diff --git a/coordinator/coordinator.lua b/coordinator/coordinator.lua index 6f586ef..f6e8038 100644 --- a/coordinator/coordinator.lua +++ b/coordinator/coordinator.lua @@ -213,14 +213,15 @@ end ---@nodiscard ---@param version string coordinator version ---@param modem table modem device ----@param sv_port integer port of configured supervisor ----@param sv_listen integer listening port for supervisor replys ----@param api_listen integer listening port for pocket API +---@param crd_channel integer port of configured supervisor +---@param svr_channel integer listening port for supervisor replys +---@param pkt_channel integer listening port for pocket API ---@param range integer trusted device connection range ---@param sv_watchdog watchdog -function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, range, sv_watchdog) +function coordinator.comms(version, modem, crd_channel, svr_channel, pkt_channel, range, sv_watchdog) local self = { sv_linked = false, + sv_addr = comms.BROADCAST, sv_seq_num = 0, sv_r_seq_num = nil, sv_config_err = false, @@ -236,8 +237,7 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, range -- configure modem channels local function _conf_channels() modem.closeAll() - modem.open(sv_listen) - modem.open(api_listen) + modem.open(crd_channel) end _conf_channels() @@ -261,23 +261,24 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, range end pkt.make(msg_type, msg) - s_pkt.make(self.sv_seq_num, protocol, pkt.raw_sendable()) + s_pkt.make(self.sv_addr, self.sv_seq_num, protocol, pkt.raw_sendable()) - modem.transmit(sv_port, sv_listen, s_pkt.raw_sendable()) + modem.transmit(svr_channel, crd_channel, s_pkt.raw_sendable()) self.sv_seq_num = self.sv_seq_num + 1 end -- send an API establish request response - ---@param dest integer - ---@param msg table - local function _send_api_establish_ack(seq_id, dest, msg) + ---@param packet scada_packet + ---@param ack ESTABLISH_ACK + local function _send_api_establish_ack(packet, ack) local s_pkt = comms.scada_packet() local m_pkt = comms.mgmt_packet() - m_pkt.make(SCADA_MGMT_TYPE.ESTABLISH, msg) - s_pkt.make(seq_id, PROTOCOL.SCADA_MGMT, m_pkt.raw_sendable()) + m_pkt.make(SCADA_MGMT_TYPE.ESTABLISH, { ack }) + s_pkt.make(packet.src_addr(), packet.seq_num() + 1, PROTOCOL.SCADA_MGMT, m_pkt.raw_sendable()) - modem.transmit(dest, api_listen, s_pkt.raw_sendable()) + modem.transmit(pkt_channel, crd_channel, s_pkt.raw_sendable()) + self.last_api_est_acks[packet.src_addr()] = ack end -- attempt connection establishment @@ -307,7 +308,9 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, range -- close the connection to the server function public.close() sv_watchdog.cancel() + self.sv_addr = comms.BROADCAST self.sv_linked = false + self.sv_r_seq_num = nil _send_sv(PROTOCOL.SCADA_MGMT, SCADA_MGMT_TYPE.CLOSE, {}) end @@ -436,15 +439,18 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, range ---@param packet mgmt_frame|crdn_frame|capi_frame|nil function public.handle_packet(packet) if packet ~= nil then - local l_port = packet.scada_frame.local_channel() - local r_port = packet.scada_frame.remote_channel() + local l_chan = packet.scada_frame.local_channel() + local r_chan = packet.scada_frame.remote_channel() + local src_addr = packet.scada_frame.src_addr() local protocol = packet.scada_frame.protocol() - if l_port == api_listen then + if l_chan ~= crd_channel then + log.debug("received packet on unconfigured channel " .. l_chan, true) + elseif r_chan == pkt_channel then if protocol == PROTOCOL.COORD_API then ---@cast packet capi_frame -- look for an associated session - local session = apisessions.find_session(r_port) + local session = apisessions.find_session(src_addr) -- API packet if session ~= nil then @@ -457,7 +463,7 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, range elseif protocol == PROTOCOL.SCADA_MGMT then ---@cast packet mgmt_frame -- look for an associated session - local session = apisessions.find_session(r_port) + local session = apisessions.find_session(src_addr) -- SCADA management packet if session ~= nil then @@ -465,8 +471,6 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, range session.in_queue.push_packet(packet) elseif packet.type == SCADA_MGMT_TYPE.ESTABLISH then -- establish a new session - local next_seq_id = packet.scada_frame.seq_num() + 1 - -- validate packet and continue if packet.length == 3 and type(packet.data[1]) == "string" and type(packet.data[2]) == "string" then local comms_v = packet.data[1] @@ -474,42 +478,43 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, range local dev_type = packet.data[3] if comms_v ~= comms.version then - if self.last_api_est_acks[r_port] ~= ESTABLISH_ACK.BAD_VERSION then + if self.last_api_est_acks[src_addr] ~= ESTABLISH_ACK.BAD_VERSION then log.info(util.c("dropping API establish packet with incorrect comms version v", comms_v, " (expected v", comms.version, ")")) - self.last_api_est_acks[r_port] = ESTABLISH_ACK.BAD_VERSION end - _send_api_establish_ack(next_seq_id, r_port, { ESTABLISH_ACK.BAD_VERSION }) + _send_api_establish_ack(packet.scada_frame, ESTABLISH_ACK.BAD_VERSION) elseif dev_type == DEVICE_TYPE.PKT then -- pocket linking request - local id = apisessions.establish_session(l_port, r_port, firmware_v) - println(util.c("API: pocket (", firmware_v, ") [:", r_port, "] connected with session ID ", id)) - coordinator.log_comms(util.c("API: pocket (", firmware_v, ") [:", r_port, "] connected with session ID ", id)) + local id = apisessions.establish_session(src_addr, firmware_v) + println(util.c("[API] pocket (", firmware_v, ") [@", src_addr, "] \xbb connected")) + coordinator.log_comms(util.c("API_ESTABLISH: pocket (", firmware_v, ") [@", src_addr, "] connected with session ID ", id)) - _send_api_establish_ack(next_seq_id, r_port, { ESTABLISH_ACK.ALLOW }) - self.last_api_est_acks[r_port] = ESTABLISH_ACK.ALLOW + _send_api_establish_ack(packet.scada_frame, ESTABLISH_ACK.ALLOW) else - log.debug(util.c("illegal establish packet for device ", dev_type, " on API listening channel")) - _send_api_establish_ack(next_seq_id, r_port, { ESTABLISH_ACK.DENY }) + log.debug(util.c("API_ESTABLISH: illegal establish packet for device ", dev_type, " on pocket channel")) + _send_api_establish_ack(packet.scada_frame, ESTABLISH_ACK.DENY) end else log.debug("invalid establish packet (on API listening channel)") - _send_api_establish_ack(next_seq_id, r_port, { ESTABLISH_ACK.DENY }) + _send_api_establish_ack(packet.scada_frame, ESTABLISH_ACK.DENY) end else -- any other packet should be session related, discard it - log.debug(util.c(r_port, "->", l_port, ": discarding SCADA_MGMT packet without a known session")) + log.debug(util.c("discarding pocket SCADA_MGMT packet without a known session from computer ", src_addr)) end else - log.debug("illegal packet type " .. protocol .. " on api listening channel", true) + log.debug("illegal packet type " .. protocol .. " on pocket channel", true) end - elseif l_port == sv_listen then + elseif r_chan == svr_channel then -- check sequence number if self.sv_r_seq_num == nil then self.sv_r_seq_num = packet.scada_frame.seq_num() elseif self.connected and ((self.sv_r_seq_num + 1) ~= packet.scada_frame.seq_num()) then log.warning("sequence out-of-order: last = " .. self.sv_r_seq_num .. ", new = " .. packet.scada_frame.seq_num()) return + elseif self.sv_linked and src_addr ~= self.sv_addr then + log.debug("received packet from unknown computer " .. src_addr .. " while linked; channel in use by another system?") + return else self.sv_r_seq_num = packet.scada_frame.seq_num() end @@ -660,6 +665,7 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, range -- init io controller iocontrol.init(conf, public) + self.sv_addr = src_addr self.sv_linked = true self.sv_config_err = false else @@ -705,10 +711,10 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, range local trip_time = util.time() - timestamp if trip_time > 750 then - log.warning("coord KEEP_ALIVE trip time > 750ms (" .. trip_time .. "ms)") + log.warning("coordinator KEEP_ALIVE trip time > 750ms (" .. trip_time .. "ms)") end - -- log.debug("coord RTT = " .. trip_time .. "ms") + -- log.debug("coordinator RTT = " .. trip_time .. "ms") iocontrol.get_db().facility.ps.publish("sv_ping", trip_time) @@ -719,7 +725,9 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, range elseif packet.type == SCADA_MGMT_TYPE.CLOSE then -- handle session close sv_watchdog.cancel() + self.sv_addr = comms.BROADCAST self.sv_linked = false + self.sv_r_seq_num = nil println_ts("server connection closed by remote host") log.info("server connection closed by remote host") else @@ -732,7 +740,7 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, range log.debug("illegal packet type " .. protocol .. " on supervisor listening channel", true) end else - log.debug("received packet on unconfigured channel " .. l_port, true) + log.debug("received packet for unknown channel " .. r_chan, true) end end end diff --git a/coordinator/session/apisessions.lua b/coordinator/session/apisessions.lua index c6d5a20..17988f5 100644 --- a/coordinator/session/apisessions.lua +++ b/coordinator/session/apisessions.lua @@ -5,7 +5,7 @@ local util = require("scada-common.util") local config = require("coordinator.config") -local api = require("coordinator.session.api") +local pocket = require("coordinator.session.pocket") local apisessions = {} @@ -18,7 +18,7 @@ local self = { -- PRIVATE FUNCTIONS -- -- handle a session output queue ----@param session api_session_struct +---@param session pkt_session_struct local function _api_handle_outq(session) -- record handler start time local handle_start = util.time() @@ -31,7 +31,7 @@ local function _api_handle_outq(session) if msg ~= nil then if msg.qtype == mqueue.TYPE.PACKET then -- handle a packet to be sent - self.modem.transmit(session.r_port, session.l_port, msg.message.raw_sendable()) + self.modem.transmit(config.PKT_CHANNEL, config.CRD_CHANNEL, msg.message.raw_sendable()) elseif msg.qtype == mqueue.TYPE.COMMAND then -- handle instruction/notification elseif msg.qtype == mqueue.TYPE.DATA then @@ -41,15 +41,15 @@ local function _api_handle_outq(session) -- max 100ms spent processing queue if util.time() - handle_start > 100 then - log.warning("API out queue handler exceeded 100ms queue process limit") - log.warning(util.c("offending session: port ", session.r_port)) + log.warning("[API] out queue handler exceeded 100ms queue process limit") + log.warning(util.c("[API] offending session: ", session)) break end end end -- cleanly close a session ----@param session api_session_struct +---@param session pkt_session_struct local function _shutdown(session) session.open = false session.instance.close() @@ -58,11 +58,11 @@ local function _shutdown(session) while session.out_queue.ready() do local msg = session.out_queue.pop() if msg ~= nil and msg.qtype == mqueue.TYPE.PACKET then - self.modem.transmit(session.r_port, session.l_port, msg.message.raw_sendable()) + self.modem.transmit(config.PKT_CHANNEL, config.CRD_CHANNEL, msg.message.raw_sendable()) end end - log.debug(util.c("closed API session ", session.instance.get_id(), " on remote port ", session.r_port)) + log.debug(util.c("[API] closed session ", session)) end -- PUBLIC FUNCTIONS -- @@ -81,54 +81,60 @@ end -- find a session by remote port ---@nodiscard ----@param port integer ----@return api_session_struct|nil -function apisessions.find_session(port) +---@param source_addr integer +---@return pkt_session_struct|nil +function apisessions.find_session(source_addr) for i = 1, #self.sessions do - if self.sessions[i].r_port == port then return self.sessions[i] end + if self.sessions[i].s_addr == source_addr then return self.sessions[i] end end return nil end -- establish a new API session ---@nodiscard ----@param local_port integer ----@param remote_port integer +---@param source_addr integer ---@param version string ---@return integer session_id -function apisessions.establish_session(local_port, remote_port, version) - ---@class api_session_struct - local api_s = { +function apisessions.establish_session(source_addr, version) + ---@class pkt_session_struct + local pkt_s = { open = true, version = version, - l_port = local_port, - r_port = remote_port, + s_addr = source_addr, in_queue = mqueue.new(), out_queue = mqueue.new(), - instance = nil ---@type api_session + instance = nil ---@type pkt_session } - api_s.instance = api.new_session(self.next_id, api_s.in_queue, api_s.out_queue, config.API_TIMEOUT) - table.insert(self.sessions, api_s) + local id = self.next_id - log.debug(util.c("established new API session to ", remote_port, " with ID ", self.next_id)) + pkt_s.instance = pocket.new_session(id, source_addr, pkt_s.in_queue, pkt_s.out_queue, config.API_TIMEOUT) + table.insert(self.sessions, pkt_s) - self.next_id = self.next_id + 1 + local mt = { + ---@param s pkt_session_struct + __tostring = function (s) return util.c("PKT [", id, "] (@", s.s_addr, ")") end + } + + setmetatable(pkt_s, mt) + + log.debug(util.c("[API] established new session: ", pkt_s)) + + self.next_id = id + 1 -- success - return api_s.instance.get_id() + return pkt_s.instance.get_id() end -- attempt to identify which session's watchdog timer fired ---@param timer_event number function apisessions.check_all_watchdogs(timer_event) for i = 1, #self.sessions do - local session = self.sessions[i] ---@type api_session_struct + local session = self.sessions[i] ---@type pkt_session_struct if session.open then local triggered = session.instance.check_wd(timer_event) if triggered then - log.debug(util.c("watchdog closing API session ", session.instance.get_id(), - " on remote port ", session.r_port, "...")) + log.debug(util.c("[API] watchdog closing session ", session, "...")) _shutdown(session) end end @@ -138,7 +144,7 @@ end -- iterate all the API sessions function apisessions.iterate_all() for i = 1, #self.sessions do - local session = self.sessions[i] ---@type api_session_struct + local session = self.sessions[i] ---@type pkt_session_struct if session.open and session.instance.iterate() then _api_handle_outq(session) @@ -152,10 +158,9 @@ end function apisessions.free_all_closed() local f = function (session) return session.open end - ---@param session api_session_struct + ---@param session pkt_session_struct local on_delete = function (session) - log.debug(util.c("free'ing closed API session ", session.instance.get_id(), - " on remote port ", session.r_port)) + log.debug(util.c("[API] free'ing closed session ", session)) end util.filter_table(self.sessions, f, on_delete) @@ -164,7 +169,7 @@ end -- close all open connections function apisessions.close_all() for i = 1, #self.sessions do - local session = self.sessions[i] ---@type api_session_struct + local session = self.sessions[i] ---@type pkt_session_struct if session.open then _shutdown(session) end end diff --git a/coordinator/session/api.lua b/coordinator/session/pocket.lua similarity index 89% rename from coordinator/session/api.lua rename to coordinator/session/pocket.lua index 4ba7383..ddabdda 100644 --- a/coordinator/session/api.lua +++ b/coordinator/session/pocket.lua @@ -3,7 +3,7 @@ local log = require("scada-common.log") local mqueue = require("scada-common.mqueue") local util = require("scada-common.util") -local api = {} +local pocket = {} local PROTOCOL = comms.PROTOCOL -- local CAPI_TYPE = comms.CAPI_TYPE @@ -21,8 +21,8 @@ local API_S_CMDS = { local API_S_DATA = { } -api.API_S_CMDS = API_S_CMDS -api.API_S_DATA = API_S_DATA +pocket.API_S_CMDS = API_S_CMDS +pocket.API_S_DATA = API_S_DATA local PERIODICS = { KEEP_ALIVE = 2000 @@ -31,11 +31,12 @@ local PERIODICS = { -- pocket API session ---@nodiscard ---@param id integer session ID +---@param s_addr integer device source address ---@param in_queue mqueue in message queue ---@param out_queue mqueue out message queue ---@param timeout number communications timeout -function api.new_session(id, in_queue, out_queue, timeout) - local log_header = "api_session(" .. id .. "): " +function pocket.new_session(id, s_addr, in_queue, out_queue, timeout) + local log_header = "pkt_session(" .. id .. "): " local self = { -- connection properties @@ -61,10 +62,10 @@ function api.new_session(id, in_queue, out_queue, timeout) } } - ---@class api_session + ---@class pkt_session local public = {} - -- mark this API session as closed, stop watchdog + -- mark this pocket session as closed, stop watchdog local function _close() self.conn_watchdog.cancel() self.connected = false @@ -92,7 +93,7 @@ function api.new_session(id, in_queue, out_queue, timeout) local m_pkt = comms.mgmt_packet() m_pkt.make(msg_type, msg) - s_pkt.make(self.seq_num, PROTOCOL.SCADA_MGMT, m_pkt.raw_sendable()) + s_pkt.make(s_addr, self.seq_num, PROTOCOL.SCADA_MGMT, m_pkt.raw_sendable()) out_queue.push_packet(s_pkt) self.seq_num = self.seq_num + 1 @@ -134,11 +135,11 @@ function api.new_session(id, in_queue, out_queue, timeout) self.last_rtt = srv_now - srv_start if self.last_rtt > 750 then - log.warning(log_header .. "API KEEP_ALIVE round trip time > 750ms (" .. self.last_rtt .. "ms)") + log.warning(log_header .. "PKT KEEP_ALIVE round trip time > 750ms (" .. self.last_rtt .. "ms)") end - -- log.debug(log_header .. "API RTT = " .. self.last_rtt .. "ms") - -- log.debug(log_header .. "API TT = " .. (srv_now - api_send) .. "ms") + -- log.debug(log_header .. "PKT RTT = " .. self.last_rtt .. "ms") + -- log.debug(log_header .. "PKT TT = " .. (srv_now - api_send) .. "ms") else log.debug(log_header .. "SCADA keep alive packet length mismatch") end @@ -171,7 +172,7 @@ function api.new_session(id, in_queue, out_queue, timeout) function public.close() _close() _send_mgmt(SCADA_MGMT_TYPE.CLOSE, {}) - println("connection to API session " .. id .. " closed by server") + println("connection to pocket session " .. id .. " closed by server") log.info(log_header .. "session closed by server") end @@ -210,7 +211,7 @@ function api.new_session(id, in_queue, out_queue, timeout) -- exit if connection was closed if not self.connected then - println("connection to API session " .. id .. " closed by remote host") + println("connection to pocket session " .. id .. " closed by remote host") log.info(log_header .. "session closed by remote host") return self.connected end @@ -246,4 +247,4 @@ function api.new_session(id, in_queue, out_queue, timeout) return public end -return api +return pocket diff --git a/coordinator/startup.lua b/coordinator/startup.lua index aacfe1b..5d4fec8 100644 --- a/coordinator/startup.lua +++ b/coordinator/startup.lua @@ -20,7 +20,7 @@ local sounder = require("coordinator.sounder") local apisessions = require("coordinator.session.apisessions") -local COORDINATOR_VERSION = "v0.15.8" +local COORDINATOR_VERSION = "v0.16.0" local println = util.println local println_ts = util.println_ts @@ -37,9 +37,9 @@ local log_comms_connecting = coordinator.log_comms_connecting local cfv = util.new_validator() -cfv.assert_channel(config.SCADA_SV_PORT) -cfv.assert_channel(config.SCADA_SV_CTL_LISTEN) -cfv.assert_channel(config.SCADA_API_LISTEN) +cfv.assert_channel(config.SVR_CHANNEL) +cfv.assert_channel(config.CRD_CHANNEL) +cfv.assert_channel(config.PKT_CHANNEL) cfv.assert_type_int(config.TRUSTED_RANGE) cfv.assert_type_num(config.SV_TIMEOUT) cfv.assert_min(config.SV_TIMEOUT, 2) @@ -148,8 +148,8 @@ local function main() log.debug("startup> conn watchdog created") -- start comms, open all channels - local coord_comms = coordinator.comms(COORDINATOR_VERSION, modem, config.SCADA_SV_PORT, config.SCADA_SV_CTL_LISTEN, - config.SCADA_API_LISTEN, config.TRUSTED_RANGE, conn_watchdog) + local coord_comms = coordinator.comms(COORDINATOR_VERSION, modem, config.CRD_CHANNEL, config.SVR_CHANNEL, + config.PKT_CHANNEL, config.TRUSTED_RANGE, conn_watchdog) log.debug("startup> comms init") log_comms("comms initialized") @@ -163,7 +163,7 @@ local function main() -- attempt to connect to the supervisor or exit local function init_connect_sv() - local tick_waiting, task_done = log_comms_connecting("attempting to connect to configured supervisor on channel " .. config.SCADA_SV_PORT) + local tick_waiting, task_done = log_comms_connecting("attempting to connect to configured supervisor on channel " .. config.SVR_CHANNEL) -- attempt to establish a connection with the supervisory computer if not coord_comms.sv_connect(60, tick_waiting, task_done) then