You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

mod_jibri_queue_component.lua 20KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559
  1. local st = require "util.stanza";
  2. local jid = require "util.jid";
  3. local http = require "net.http";
  4. local json = require "cjson";
  5. local inspect = require('inspect');
  6. local socket = require "socket";
  7. local uuid_gen = require "util.uuid".generate;
  8. local jwt = require "luajwtjitsi";
  9. local it = require "util.iterators";
  10. local neturl = require "net.url";
  11. local parse = neturl.parseQuery;
  12. local get_room_from_jid = module:require "util".get_room_from_jid;
  13. local room_jid_match_rewrite = module:require "util".room_jid_match_rewrite;
  14. local is_healthcheck_room = module:require "util".is_healthcheck_room;
  15. local room_jid_split_subdomain = module:require "util".room_jid_split_subdomain;
  16. local internal_room_jid_match_rewrite = module:require "util".internal_room_jid_match_rewrite;
  17. local async_handler_wrapper = module:require "util".async_handler_wrapper;
  18. -- this basically strips the domain from the conference.domain address
  19. local parentHostName = string.gmatch(tostring(module.host), "%w+.(%w.+)")();
  20. if parentHostName == nil then
  21. log("error", "Failed to start - unable to get parent hostname");
  22. return;
  23. end
  24. local parentCtx = module:context(parentHostName);
  25. if parentCtx == nil then
  26. log("error",
  27. "Failed to start - unable to get parent context for host: %s",
  28. tostring(parentHostName));
  29. return;
  30. end
  31. local token_util = module:require "token/util".new(parentCtx);
  32. local ASAPKeyServer;
  33. local ASAPKeyPath;
  34. local ASAPKeyId;
  35. local ASAPIssuer;
  36. local ASAPAudience;
  37. local ASAPAcceptedIssuers;
  38. local ASAPAcceptedAudiences;
  39. local ASAPTTL;
  40. local ASAPTTL_THRESHOLD;
  41. local ASAPKey;
  42. local JibriRegion;
  43. local disableTokenVerification;
  44. local muc_component_host;
  45. local external_api_url;
  46. local jwtKeyCacheSize;
  47. local jwtKeyCache;
  48. local function load_config()
  49. ASAPKeyServer = module:get_option_string("asap_key_server");
  50. if ASAPKeyServer then
  51. module:log("debug", "ASAP Public Key URL %s", ASAPKeyServer);
  52. token_util:set_asap_key_server(ASAPKeyServer);
  53. end
  54. ASAPKeyPath
  55. = module:get_option_string("asap_key_path", '/etc/prosody/certs/asap.key');
  56. ASAPKeyId
  57. = module:get_option_string("asap_key_id", 'jitsi');
  58. ASAPIssuer
  59. = module:get_option_string("asap_issuer", 'jitsi');
  60. ASAPAudience
  61. = module:get_option_string("asap_audience", 'jibri-queue');
  62. ASAPAcceptedIssuers
  63. = module:get_option_array('asap_accepted_issuers',{'jibri-queue'});
  64. module:log("debug", "ASAP Accepted Issuers %s", ASAPAcceptedIssuers);
  65. token_util:set_asap_accepted_issuers(ASAPAcceptedIssuers);
  66. ASAPAcceptedAudiences
  67. = module:get_option_array('asap_accepted_audiences',{'*'});
  68. module:log("debug", "ASAP Accepted Audiences %s", ASAPAcceptedAudiences);
  69. token_util:set_asap_accepted_audiences(ASAPAcceptedAudiences);
  70. -- do not require room to be set on tokens for jibri queue
  71. token_util:set_asap_require_room_claim(false);
  72. ASAPTTL
  73. = module:get_option_number("asap_ttl", 3600);
  74. ASAPTTL_THRESHOLD
  75. = module:get_option_number("asap_ttl_threshold", 600);
  76. queueServiceURL
  77. = module:get_option_string("jibri_queue_url");
  78. JibriRegion
  79. = module:get_option_string("jibri_region", 'default');
  80. -- option to enable/disable token verifications
  81. disableTokenVerification
  82. = module:get_option_boolean("disable_jibri_queue_token_verification", false);
  83. muc_component_host
  84. = module:get_option_string("muc_component");
  85. external_api_url = module:get_option_string("external_api_url",tostring(parentHostName));
  86. module:log("debug", "External advertised API URL", external_api_url);
  87. -- TODO: Figure out a less arbitrary default cache size.
  88. jwtKeyCacheSize
  89. = module:get_option_number("jwt_pubkey_cache_size", 128);
  90. jwtKeyCache = require"util.cache".new(jwtKeyCacheSize);
  91. if queueServiceURL == nil then
  92. log("error", "No jibri_queue_url specified. No service to contact!");
  93. return;
  94. end
  95. if muc_component_host == nil then
  96. log("error", "No muc_component specified. No muc to operate on for jibri queue!");
  97. return;
  98. end
  99. -- Read ASAP key once on module startup
  100. local f = io.open(ASAPKeyPath, "r");
  101. if f then
  102. ASAPKey = f:read("*all");
  103. f:close();
  104. if not ASAPKey then
  105. module:log("warn", "No ASAP Key read from %s, disabling jibri queue component plugin", ASAPKeyPath);
  106. return
  107. end
  108. else
  109. module:log("warn", "Error reading ASAP Key %s, disabling jibri queue component plugin", ASAPKeyPath);
  110. return
  111. end
  112. return true;
  113. end
  114. local function reload_config()
  115. module:log("info", "Reloading configuration for jibri queue component");
  116. local config_success = load_config();
  117. -- clear ASAP public key cache on config reload
  118. token_util:clear_asap_cache();
  119. if not config_success then
  120. log("error", "Unsuccessful reconfiguration, jibri queue component may misbehave");
  121. end
  122. end
  123. local config_success = load_config();
  124. if not config_success then
  125. log("error", "Unsuccessful configuration step, jibri queue component disabled")
  126. return;
  127. end
  128. local http_headers = {
  129. ["User-Agent"] = "Prosody ("..prosody.version.."; "..prosody.platform..")",
  130. ["Content-Type"] = "application/json"
  131. };
  132. -- we use async to detect Prosody 0.10 and earlier
  133. local have_async = pcall(require, "util.async");
  134. if not have_async then
  135. module:log("warn", "conference duration will not work with Prosody version 0.10 or less.");
  136. return;
  137. end
  138. log("info", "Starting jibri queue handling for %s", muc_component_host);
  139. local function round(num, numDecimalPlaces)
  140. local mult = 10^(numDecimalPlaces or 0)
  141. return math.floor(num * mult + 0.5) / mult
  142. end
  143. local function generateToken(audience)
  144. audience = audience or ASAPAudience
  145. local t = os.time()
  146. local err
  147. local exp_key = 'asap_exp.'..audience
  148. local token_key = 'asap_token.'..audience
  149. local exp = jwtKeyCache:get(exp_key)
  150. local token = jwtKeyCache:get(token_key)
  151. --if we find a token and it isn't too far from expiry, then use it
  152. if token ~= nil and exp ~= nil then
  153. exp = tonumber(exp)
  154. if (exp - t) > ASAPTTL_THRESHOLD then
  155. return token
  156. end
  157. end
  158. --expiry is the current time plus TTL
  159. exp = t + ASAPTTL
  160. local payload = {
  161. iss = ASAPIssuer,
  162. aud = audience,
  163. nbf = t,
  164. exp = exp,
  165. }
  166. -- encode
  167. local alg = "RS256"
  168. token, err = jwt.encode(payload, ASAPKey, alg, {kid = ASAPKeyId})
  169. if not err then
  170. token = 'Bearer '..token
  171. jwtKeyCache:set(exp_key,exp)
  172. jwtKeyCache:set(token_key,token)
  173. return token
  174. else
  175. return ''
  176. end
  177. end
  178. local function sendIq(participant,action,requestId,time,position,token)
  179. local iqId = uuid_gen();
  180. local from = module:get_host();
  181. local outStanza = st.iq({type = 'set', from = from, to = participant, id = iqId}):tag("jibri-queue",
  182. { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId, action = action });
  183. if token then
  184. outStanza:tag("token"):text(token):up()
  185. end
  186. if time then
  187. outStanza:tag("time"):text(tostring(time)):up()
  188. end
  189. if position then
  190. outStanza:tag("position"):text(tostring(position)):up()
  191. end
  192. module:send(outStanza);
  193. end
  194. local function cb(content_, code_, response_, request_)
  195. if code_ == 200 or code_ == 204 then
  196. module:log("debug", "URL Callback: Code %s, Content %s, Request (host %s, path %s, body %s), Response: %s",
  197. code_, content_, request_.host, request_.path, inspect(request_.body), inspect(response_));
  198. else
  199. module:log("warn", "URL Callback non successful: Code %s, Content %s, Request (%s), Response: %s",
  200. code_, content_, inspect(request_), inspect(response_));
  201. end
  202. end
  203. local function sendEvent(type,room_address,participant,requestId,replyIq,replyError)
  204. local event_ts = round(socket.gettime()*1000);
  205. local node, host, resource, target_subdomain = room_jid_split_subdomain(room_address);
  206. local room_param = '';
  207. if target_subdomain then
  208. room_param = target_subdomain..'/'..node;
  209. else
  210. room_param = node;
  211. end
  212. local out_event = {
  213. ["conference"] = room_address,
  214. ["roomParam"] = room_param,
  215. ["eventType"] = type,
  216. ["participant"] = participant,
  217. ["externalApiUrl"] = external_api_url.."/jibriqueue/update",
  218. ["requestId"] = requestId,
  219. ["region"] = JibriRegion,
  220. }
  221. module:log("debug","Sending event %s",inspect(out_event));
  222. local headers = http_headers or {}
  223. headers['Authorization'] = generateToken()
  224. module:log("debug","Sending headers %s",inspect(headers));
  225. local requestURL = queueServiceURL.."/job/recording"
  226. if type=="LeaveQueue" then
  227. requestURL = requestURL .."/cancel"
  228. end
  229. local request = http.request(requestURL, {
  230. headers = headers,
  231. method = "POST",
  232. body = json.encode(out_event)
  233. }, function (content_, code_, response_, request_)
  234. if code_ == 200 or code_ == 204 then
  235. module:log("debug", "URL Callback: Code %s, Content %s, Request (host %s, path %s, body %s), Response: %s",
  236. code_, content_, request_.host, request_.path, inspect(request_.body), inspect(response_));
  237. if (replyIq) then
  238. module:log("debug", "sending reply IQ %s",inspect(replyIq));
  239. module:send(replyIq);
  240. end
  241. else
  242. module:log("warn", "URL Callback non successful: Code %s, Content %s, Request (%s), Response: %s",
  243. code_, content_, inspect(request_), inspect(response_));
  244. if (replyError) then
  245. module:log("warn", "sending reply error IQ %s",inspect(replyError));
  246. module:send(replyError);
  247. end
  248. end
  249. end);
  250. end
  251. function clearRoomQueueByOccupant(room, occupant)
  252. room.jibriQueue[occupant.jid] = nil;
  253. end
  254. function addRoomQueueByOccupant(room, occupant, requestId)
  255. room.jibriQueue[occupant.jid] = requestId;
  256. end
  257. -- receives iq from client currently connected to the room
  258. function on_iq(event)
  259. local requestId;
  260. -- Check the type of the incoming stanza to avoid loops:
  261. if event.stanza.attr.type == "error" then
  262. return; -- We do not want to reply to these, so leave.
  263. end
  264. if event.stanza.attr.to == module:get_host() then
  265. if event.stanza.attr.type == "set" then
  266. local reply = st.reply(event.stanza);
  267. local replyError = st.error_reply(event.stanza,'cancel','internal-server-error',"Queue Server Error");
  268. local jibriQueue
  269. = event.stanza:get_child('jibri-queue', 'http://jitsi.org/protocol/jibri-queue');
  270. if jibriQueue then
  271. module:log("debug", "Received Jibri Queue Request: %s ",inspect(jibriQueue));
  272. local roomAddress = jibriQueue.attr.room;
  273. local room = get_room_from_jid(room_jid_match_rewrite(roomAddress));
  274. if not room then
  275. module:log("warn", "No room found %s", roomAddress);
  276. return false;
  277. end
  278. local from = event.stanza.attr.from;
  279. local occupant = room:get_occupant_by_real_jid(from);
  280. if not occupant then
  281. module:log("warn", "No occupant %s found for %s", from, roomAddress);
  282. return false;
  283. end
  284. local action = jibriQueue.attr.action;
  285. if action == 'join' then
  286. -- join action, so send event out
  287. requestId = uuid_gen();
  288. module:log("debug","Received join queue request for jid %s occupant %s requestId %s",roomAddress,occupant.jid,requestId);
  289. -- now handle new jibri queue message
  290. addRoomQueueByOccupant(room, occupant, requestId);
  291. reply:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
  292. replyError:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
  293. module:log("debug","Sending JoinQueue event for jid %s occupant %s reply %s",roomAddress,occupant.jid,inspect(reply));
  294. sendEvent('JoinQueue',roomAddress,occupant.jid,requestId,reply,replyError);
  295. end
  296. if action == 'leave' then
  297. requestId = jibriQueue.attr.requestId;
  298. module:log("debug","Received leave queue request for jid %s occupant %s requestId %s",roomAddress,occupant.jid,requestId);
  299. -- TODO: check that requestId is the same as cached value
  300. clearRoomQueueByOccupant(room, occupant);
  301. reply:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
  302. replyError:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
  303. module:log("debug","Sending LeaveQueue event for jid %s occupant %s reply %s",roomAddress,occupant.jid,inspect(reply));
  304. sendEvent('LeaveQueue',roomAddress,occupant.jid,requestId,reply,replyError);
  305. end
  306. else
  307. module:log("warn","Jibri Queue Stanza missing child %s",inspect(event.stanza))
  308. end
  309. end
  310. end
  311. return true
  312. end
  313. -- create recorder queue cache for the room
  314. function room_created(event)
  315. local room = event.room;
  316. if is_healthcheck_room(room.jid) then
  317. return;
  318. end
  319. room.jibriQueue = {};
  320. end
  321. -- Conference ended, clear all queue cache jids
  322. function room_destroyed(event)
  323. local room = event.room;
  324. if is_healthcheck_room(room.jid) then
  325. return;
  326. end
  327. for jid, x in pairs(room.jibriQueue) do
  328. if x then
  329. sendEvent('LeaveQueue',internal_room_jid_match_rewrite(room.jid),jid,x);
  330. end
  331. end
  332. end
  333. -- Occupant left remove it from the queue if it joined the queue
  334. function occupant_leaving(event)
  335. local room = event.room;
  336. if is_healthcheck_room(room.jid) then
  337. return;
  338. end
  339. local occupant = event.occupant;
  340. local requestId = room.jibriQueue[occupant.jid];
  341. -- check if user has cached queue request
  342. if requestId then
  343. -- remove occupant from queue cache, signal backend
  344. room.jibriQueue[occupant.jid] = nil;
  345. sendEvent('LeaveQueue',internal_room_jid_match_rewrite(room.jid),occupant.jid,requestId);
  346. end
  347. end
  348. module:hook("iq/host", on_iq);
  349. -- executed on every host added internally in prosody, including components
  350. function process_host(host)
  351. if host == muc_component_host then -- the conference muc component
  352. module:log("debug","Hook to muc events on %s", host);
  353. local muc_module = module:context(host);
  354. muc_module:hook("muc-room-created", room_created, -1);
  355. -- muc_module:hook("muc-occupant-joined", occupant_joined, -1);
  356. muc_module:hook("muc-occupant-pre-leave", occupant_leaving, -1);
  357. muc_module:hook("muc-room-destroyed", room_destroyed, -1);
  358. end
  359. end
  360. if prosody.hosts[muc_component_host] == nil then
  361. module:log("debug","No muc component found, will listen for it: %s", muc_component_host)
  362. -- when a host or component is added
  363. prosody.events.add_handler("host-activated", process_host);
  364. else
  365. process_host(muc_component_host);
  366. end
  367. module:log("info", "Loading jibri_queue_component");
  368. --- Verifies room name, domain name with the values in the token
  369. -- @param token the token we received
  370. -- @param room_name the room name
  371. -- @param group name of the group (optional)
  372. -- @param session the session to use for storing token specific fields
  373. -- @return true if values are ok or false otherwise
  374. function verify_token(token, room_jid, session)
  375. if disableTokenVerification then
  376. return true;
  377. end
  378. -- if not disableTokenVerification and we do not have token
  379. -- stop here, cause the main virtual host can have guest access enabled
  380. -- (allowEmptyToken = true) and we will allow access to rooms info without
  381. -- a token
  382. if token == nil then
  383. log("warn", "no token provided");
  384. return false;
  385. end
  386. session.auth_token = token;
  387. local verified, reason, message = token_util:process_and_verify_token(session);
  388. if not verified then
  389. log("warn", "not a valid token %s: %s", tostring(reason), tostring(message));
  390. log("debug", "invalid token %s", token);
  391. return false;
  392. end
  393. return true;
  394. end
  395. --- Handles request for updating jibri queue status
  396. -- @param event the http event, holds the request query
  397. -- @return GET response, containing a json with response details
  398. function handle_update_jibri_queue(event)
  399. local body = json.decode(event.request.body);
  400. module:log("debug","Update Jibri Queue Event Received: %s",inspect(body));
  401. local token = event.request.headers["authorization"];
  402. if not token then
  403. token = ''
  404. else
  405. local prefixStart, prefixEnd = token:find("Bearer ");
  406. if prefixStart ~= 1 then
  407. module:log("error", "REST event: Invalid authorization header format. The header must start with the string 'Bearer '");
  408. return { status_code = 403; };
  409. end
  410. token = token:sub(prefixEnd + 1);
  411. end
  412. local user_jid = body["participant"];
  413. local roomAddress = body["conference"];
  414. local userJWT = body["token"];
  415. local action = body["action"];
  416. local time = body["time"];
  417. local position = body["position"];
  418. local requestId = body["requestId"];
  419. if not action then
  420. if userJWT then
  421. action = 'token';
  422. else
  423. action = 'info';
  424. end
  425. end
  426. local room_jid = room_jid_match_rewrite(roomAddress);
  427. if not verify_token(token, room_jid, {}) then
  428. log("error", "REST event: Invalid token for room %s to route action %s for requestId %s", roomAddress, action, requestId);
  429. return { status_code = 403; };
  430. end
  431. local room = get_room_from_jid(room_jid);
  432. if (not room) then
  433. log("error", "REST event: no room found %s to route action %s for requestId %s", roomAddress, action, requestId);
  434. return { status_code = 404; };
  435. end
  436. local occupant = room:get_occupant_by_real_jid(user_jid);
  437. if not occupant then
  438. log("warn", "REST event: No occupant %s found for %s to route action %s for requestId %s", user_jid, roomAddress, action, requestId);
  439. return { status_code = 404; };
  440. end
  441. if not room.jibriQueue[occupant.jid] then
  442. log("warn", "REST event: No queue request found for occupant %s in conference %s to route action %s for requestId %s",occupant.jid,room.jid, action, requestId)
  443. return { status_code = 404; };
  444. end
  445. if not requestId then
  446. requestId = room.jibriQueue[occupant.jid];
  447. end
  448. if action == 'token' and userJWT then
  449. log("debug", "REST event: Token received for occupant %s in conference %s requestId %s, clearing room queue");
  450. clearRoomQueueByOccupant(room, occupant);
  451. end
  452. log("debug", "REST event: Sending update for occupant %s in conference %s to route action %s for requestId %s",occupant.jid,room.jid, action, requestId);
  453. sendIq(occupant.jid,action,requestId,time,position,userJWT);
  454. return { status_code = 200; };
  455. end
  456. module:depends("http");
  457. module:provides("http", {
  458. default_path = "/";
  459. name = "jibriqueue";
  460. route = {
  461. ["POST /jibriqueue/update"] = function (event) return async_handler_wrapper(event,handle_update_jibri_queue) end;
  462. };
  463. });
  464. module:hook_global('config-reloaded', reload_config);