Du kannst nicht mehr als 25 Themen auswählen Themen müssen mit entweder einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

mod_jibri_queue_component.lua 17KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. local st = require "util.stanza";
  2. local jid = require "util.jid";
  3. local http = require "net.http";
  4. local json = require "cjson";
  5. local inspect = require('inspect');
  6. local socket = require "socket";
  7. local uuid_gen = require "util.uuid".generate;
  8. local jwt = require "luajwtjitsi";
  9. local it = require "util.iterators";
  10. local neturl = require "net.url";
  11. local parse = neturl.parseQuery;
  12. local get_room_from_jid = module:require "util".get_room_from_jid;
  13. local room_jid_match_rewrite = module:require "util".room_jid_match_rewrite;
  14. local is_healthcheck_room = module:require "util".is_healthcheck_room;
  15. local room_jid_split_subdomain = module:require "util".room_jid_split_subdomain;
  16. local internal_room_jid_match_rewrite = module:require "util".internal_room_jid_match_rewrite;
  17. local async_handler_wrapper = module:require "util".async_handler_wrapper;
  18. -- this basically strips the domain from the conference.domain address
  19. local parentHostName = string.gmatch(tostring(module.host), "%w+.(%w.+)")();
  20. if parentHostName == nil then
  21. log("error", "Failed to start - unable to get parent hostname");
  22. return;
  23. end
  24. local parentCtx = module:context(parentHostName);
  25. if parentCtx == nil then
  26. log("error",
  27. "Failed to start - unable to get parent context for host: %s",
  28. tostring(parentHostName));
  29. return;
  30. end
  31. local token_util = module:require "token/util".new(parentCtx);
  32. local ASAPKeyServer
  33. = module:get_option_string("asap_key_server");
  34. if ASAPKeyServer then
  35. module:log("info", "ASAP Public Key URL %s", ASAPKeyServer);
  36. token_util:set_asap_key_server(ASAPKeyServer);
  37. end
  38. local ASAPKeyPath
  39. = module:get_option_string("asap_key_path", '/etc/prosody/certs/asap.key');
  40. local ASAPKeyId
  41. = module:get_option_string("asap_key_id", 'jitsi');
  42. local ASAPIssuer
  43. = module:get_option_string("asap_issuer", 'jitsi');
  44. local ASAPAudience
  45. = module:get_option_string("asap_audience", 'jitsi');
  46. local ASAPTTL
  47. = module:get_option_number("asap_ttl", 3600);
  48. local ASAPTTL_THRESHOLD
  49. = module:get_option_number("asap_ttl_threshold", 600);
  50. local ASAPKey;
  51. local queueServiceURL
  52. = module:get_option_string("jibri_queue_url");
  53. local JibriRegion
  54. = module:get_option_string("jibri_region", 'default');
  55. if queueServiceURL == nil then
  56. log("error", "No jibri_queue_url specified. No service to contact!");
  57. return;
  58. end
  59. -- option to enable/disable token verifications
  60. local disableTokenVerification
  61. = module:get_option_boolean("disable_jibri_queue_token_verification", false);
  62. local http_headers = {
  63. ["User-Agent"] = "Prosody ("..prosody.version.."; "..prosody.platform..")",
  64. ["Content-Type"] = "application/json"
  65. };
  66. -- we use async to detect Prosody 0.10 and earlier
  67. local have_async = pcall(require, "util.async");
  68. if not have_async then
  69. module:log("warn", "conference duration will not work with Prosody version 0.10 or less.");
  70. return;
  71. end
  72. local muc_component_host = module:get_option_string("muc_component");
  73. if muc_component_host == nil then
  74. log("error", "No muc_component specified. No muc to operate on for jibri queue!");
  75. return;
  76. end
  77. log("info", "Starting jibri queue handling for %s", muc_component_host);
  78. local external_api_url = module:get_option_string("external_api_url",tostring(parentHostName));
  79. module:log("info", "External advertised API URL", external_api_url);
  80. -- Read ASAP key once on module startup
  81. local f = io.open(ASAPKeyPath, "r");
  82. if f then
  83. ASAPKey = f:read("*all");
  84. f:close();
  85. if not ASAPKey then
  86. module:log("warn", "No ASAP Key read, disabling muc_events plugin");
  87. return
  88. end
  89. else
  90. module:log("warn", "Error reading ASAP Key, disabling muc_events plugin");
  91. return
  92. end
  93. -- TODO: Figure out a less arbitrary default cache size.
  94. local jwtKeyCacheSize = module:get_option_number("jwt_pubkey_cache_size", 128);
  95. local jwtKeyCache = require"util.cache".new(jwtKeyCacheSize);
  96. local function round(num, numDecimalPlaces)
  97. local mult = 10^(numDecimalPlaces or 0)
  98. return math.floor(num * mult + 0.5) / mult
  99. end
  100. local function generateToken(audience)
  101. audience = audience or ASAPAudience
  102. local t = os.time()
  103. local err
  104. local exp_key = 'asap_exp.'..audience
  105. local token_key = 'asap_token.'..audience
  106. local exp = jwtKeyCache:get(exp_key)
  107. local token = jwtKeyCache:get(token_key)
  108. --if we find a token and it isn't too far from expiry, then use it
  109. if token ~= nil and exp ~= nil then
  110. exp = tonumber(exp)
  111. if (exp - t) > ASAPTTL_THRESHOLD then
  112. return token
  113. end
  114. end
  115. --expiry is the current time plus TTL
  116. exp = t + ASAPTTL
  117. local payload = {
  118. iss = ASAPIssuer,
  119. aud = audience,
  120. nbf = t,
  121. exp = exp,
  122. }
  123. -- encode
  124. local alg = "RS256"
  125. token, err = jwt.encode(payload, ASAPKey, alg, {kid = ASAPKeyId})
  126. if not err then
  127. token = 'Bearer '..token
  128. jwtKeyCache:set(exp_key,exp)
  129. jwtKeyCache:set(token_key,token)
  130. return token
  131. else
  132. return ''
  133. end
  134. end
  135. local function sendIq(participant,action,requestId,time,position,token)
  136. local iqId = uuid_gen();
  137. local from = module:get_host();
  138. module:log("info","Oubound iq id %s",iqId);
  139. local outStanza = st.iq({type = 'set', from = from, to = participant, id = iqId}):tag("jibri-queue",
  140. { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId, action = action });
  141. module:log("info","Oubound base stanza %s",inspect(outStanza));
  142. if token then
  143. outStanza:tag("token"):text(token):up()
  144. end
  145. if time then
  146. outStanza:tag("time"):text(time):up()
  147. end
  148. if position then
  149. outStanza:tag("position"):text(position):up()
  150. end
  151. module:log("info","Oubound stanza %s",inspect(outStanza));
  152. module:send(outStanza);
  153. end
  154. local function cb(content_, code_, response_, request_)
  155. if code_ == 200 or code_ == 204 then
  156. module:log("debug", "URL Callback: Code %s, Content %s, Request (host %s, path %s, body %s), Response: %s",
  157. code_, content_, request_.host, request_.path, inspect(request_.body), inspect(response_));
  158. else
  159. module:log("warn", "URL Callback non successful: Code %s, Content %s, Request (%s), Response: %s",
  160. code_, content_, inspect(request_), inspect(response_));
  161. end
  162. end
  163. local function sendEvent(type,room_address,participant,requestId,replyIq,replyError)
  164. local event_ts = round(socket.gettime()*1000);
  165. local node, host, resource, target_subdomain = room_jid_split_subdomain(room_address);
  166. local room_param = '';
  167. if target_subdomain then
  168. room_param = target_subdomain..'/'..node;
  169. else
  170. room_param = node;
  171. end
  172. local out_event = {
  173. ["conference"] = room_address,
  174. ["roomParam"] = room_param,
  175. ["eventType"] = type,
  176. ["participant"] = participant,
  177. ["externalApiUrl"] = external_api_url.."/jibriqueue/update",
  178. ["requestId"] = requestId,
  179. ["region"] = JibriRegion,
  180. }
  181. module:log("debug","Sending event %s",inspect(out_event));
  182. local headers = http_headers or {}
  183. headers['Authorization'] = generateToken()
  184. module:log("debug","Sending headers %s",inspect(headers));
  185. local requestURL = queueServiceURL.."/job/recording"
  186. if type=="LeaveQueue" then
  187. requestURL = requestURL .."/cancel"
  188. end
  189. local request = http.request(requestURL, {
  190. headers = headers,
  191. method = "POST",
  192. body = json.encode(out_event)
  193. }, function (content_, code_, response_, request_)
  194. if code_ == 200 or code_ == 204 then
  195. module:log("debug", "URL Callback: Code %s, Content %s, Request (host %s, path %s, body %s), Response: %s",
  196. code_, content_, request_.host, request_.path, inspect(request_.body), inspect(response_));
  197. module:log("info", "sending reply IQ %s",inspect(replyIq));
  198. module:send(replyIq);
  199. else
  200. module:log("warn", "URL Callback non successful: Code %s, Content %s, Request (%s), Response: %s",
  201. code_, content_, inspect(request_), inspect(response_));
  202. module:log("warn", "sending reply error IQ %s",inspect(replyError));
  203. module:send(replyError);
  204. end
  205. end);
  206. end
  207. -- receives iq from client currently connected to the room
  208. function on_iq(event)
  209. local requestId;
  210. -- Check the type of the incoming stanza to avoid loops:
  211. if event.stanza.attr.type == "error" then
  212. return; -- We do not want to reply to these, so leave.
  213. end
  214. if event.stanza.attr.to == module:get_host() then
  215. if event.stanza.attr.type == "set" then
  216. log("info", "Jibri Queue Messsage Event found: %s ",inspect(event.stanza));
  217. local reply = st.reply(event.stanza);
  218. local replyError = st.error_reply(event.stanza,'cancel','internal-server-error',"Queue Server Error");
  219. module:log("info","Reply stanza %s",inspect(reply));
  220. local jibriQueue
  221. = event.stanza:get_child('jibri-queue', 'http://jitsi.org/protocol/jibri-queue');
  222. if jibriQueue then
  223. module:log("info", "Jibri Queue Request: %s ",inspect(jibriQueue));
  224. local roomAddress = jibriQueue.attr.room;
  225. local room = get_room_from_jid(room_jid_match_rewrite(roomAddress));
  226. if not room then
  227. module:log("warn", "No room found %s", roomAddress);
  228. return false;
  229. end
  230. local from = event.stanza.attr.from;
  231. local occupant = room:get_occupant_by_real_jid(from);
  232. if not occupant then
  233. module:log("warn", "No occupant %s found for %s", from, roomAddress);
  234. return false;
  235. end
  236. local action = jibriQueue.attr.action;
  237. if action == 'join' then
  238. -- join action, so send event out
  239. requestId = uuid_gen();
  240. -- now handle new jibri queue message
  241. room.jibriQueue[occupant.jid] = requestId;
  242. reply:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
  243. replyError:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
  244. module:log("info","Sending JoinQueue event for jid %s occupant %s reply %s",roomAddress,occupant.jid,inspect(reply));
  245. sendEvent('JoinQueue',roomAddress,occupant.jid,requestId,reply,replyError);
  246. end
  247. if action == 'leave' then
  248. requestId = jibriQueue.attr.requestId;
  249. -- TODO: check that requestId is the same as cached value
  250. room.jibriQueue[occupant.jid] = nil;
  251. reply:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
  252. replyError:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
  253. sendEvent('LeaveQueue',roomAddress,occupant.jid,requestId,reply,replyError);
  254. end
  255. else
  256. module:log("warn","Jibri Queue Stanza missing child %s",inspect(event.stanza))
  257. end
  258. end
  259. end
  260. return true
  261. end
  262. -- create recorder queue cache for the room
  263. function room_created(event)
  264. local room = event.room;
  265. if is_healthcheck_room(room.jid) then
  266. return;
  267. end
  268. room.jibriQueue = {};
  269. end
  270. -- Conference ended, clear all queue cache jids
  271. function room_destroyed(event)
  272. local room = event.room;
  273. if is_healthcheck_room(room.jid) then
  274. return;
  275. end
  276. for jid, x in pairs(room.jibriQueue) do
  277. if x then
  278. sendEvent('LeaveQueue',internal_room_jid_match_rewrite(room.jid),jid,x);
  279. end
  280. end
  281. end
  282. -- Occupant left remove it from the queue if it joined the queue
  283. function occupant_leaving(event)
  284. local room = event.room;
  285. if is_healthcheck_room(room.jid) then
  286. return;
  287. end
  288. local occupant = event.occupant;
  289. local requestId = room.jibriQueue[occupant.jid];
  290. -- check if user has cached queue request
  291. if requestId then
  292. -- remove occupant from queue cache, signal backend
  293. room.jibriQueue[occupant.jid] = nil;
  294. sendEvent('LeaveQueue',internal_room_jid_match_rewrite(room.jid),occupant.jid,requestId);
  295. end
  296. end
  297. module:hook("iq/host", on_iq);
  298. -- executed on every host added internally in prosody, including components
  299. function process_host(host)
  300. if host == muc_component_host then -- the conference muc component
  301. module:log("info","Hook to muc events on %s", host);
  302. local muc_module = module:context(host);
  303. muc_module:hook("muc-room-created", room_created, -1);
  304. -- muc_module:hook("muc-occupant-joined", occupant_joined, -1);
  305. muc_module:hook("muc-occupant-pre-leave", occupant_leaving, -1);
  306. muc_module:hook("muc-room-destroyed", room_destroyed, -1);
  307. end
  308. end
  309. if prosody.hosts[muc_component_host] == nil then
  310. module:log("info","No muc component found, will listen for it: %s", muc_component_host)
  311. -- when a host or component is added
  312. prosody.events.add_handler("host-activated", process_host);
  313. else
  314. process_host(muc_component_host);
  315. end
  316. module:log("info", "Loading jibri_queue_component");
  317. --- Verifies room name, domain name with the values in the token
  318. -- @param token the token we received
  319. -- @param room_name the room name
  320. -- @param group name of the group (optional)
  321. -- @param session the session to use for storing token specific fields
  322. -- @return true if values are ok or false otherwise
  323. function verify_token(token, room_jid, session)
  324. if disableTokenVerification then
  325. return true;
  326. end
  327. -- if not disableTokenVerification and we do not have token
  328. -- stop here, cause the main virtual host can have guest access enabled
  329. -- (allowEmptyToken = true) and we will allow access to rooms info without
  330. -- a token
  331. if token == nil then
  332. log("warn", "no token provided");
  333. return false;
  334. end
  335. session.auth_token = token;
  336. local verified, reason, message = token_util:process_and_verify_token(session);
  337. if not verified then
  338. log("warn", "not a valid token %s: %s", tostring(reason), tostring(message));
  339. return false;
  340. end
  341. if not token_util:verify_room(session, room_jid) then
  342. log("warn", "Token %s not allowed to access: %s",
  343. tostring(token), tostring(room_jid));
  344. return false;
  345. end
  346. return true;
  347. end
  348. --- Handles request for updating jibri queue status
  349. -- @param event the http event, holds the request query
  350. -- @return GET response, containing a json with response details
  351. function handle_update_jibri_queue(event)
  352. module:log("info","Update Jibri Queue Event Received");
  353. -- if (not event.request.url.query) then
  354. -- return { status_code = 400; };
  355. -- end
  356. local body = json.decode(event.request.body);
  357. -- local params = parse(event.request.url.query);
  358. module:log("info","Update Jibri Event Body %s",inspect(body));
  359. -- local token = params["token"];
  360. local token
  361. if not token then
  362. token = event.request.headers["authorization"];
  363. if not token then
  364. token = ''
  365. else
  366. local prefixStart, prefixEnd = token:find("Bearer ");
  367. if prefixStart ~= 1 then
  368. module:log("error", "Invalid authorization header format. The header must start with the string 'Bearer '");
  369. return 403
  370. end
  371. token = token:sub(prefixEnd + 1);
  372. end
  373. end
  374. local user_jid = body["participant"];
  375. local roomAddress = body["conference"];
  376. local userJWT = body["token"];
  377. local action = body["action"];
  378. local time = body["time"];
  379. local position = body["position"];
  380. local requestId = body["requestId"];
  381. local room_jid = room_jid_match_rewrite(roomAddress);
  382. if not verify_token(token, room_jid, {}) then
  383. return { status_code = 403; };
  384. end
  385. local room = get_room_from_jid(room_jid);
  386. if (not room) then
  387. log("error", "no room found %s", roomAddress);
  388. return { status_code = 404; };
  389. end
  390. local occupant = room:get_occupant_by_real_jid(user_jid);
  391. if not occupant then
  392. log("warn", "No occupant %s found for %s", user_jid, roomAddress);
  393. return { status_code = 404; };
  394. end
  395. if not room.jibriQueue[occupant.jid] then
  396. log("warn", "No queue request found for occupant %s in conference %s",occupant.jid,room.jid)
  397. return { status_code = 404; };
  398. end
  399. if not action then
  400. if userJWT then
  401. action = 'token';
  402. else
  403. action = 'info';
  404. end
  405. end
  406. if not requestId then
  407. requestId = room.jibriQueue[occupant.jid];
  408. end
  409. -- TODO: actually implement udpate code here
  410. sendIq(occupant.jid,action,requestId,time,position,userJWT);
  411. return { status_code = 200; };
  412. end
  413. module:depends("http");
  414. module:provides("http", {
  415. default_path = "/";
  416. name = "jibriqueue";
  417. route = {
  418. ["POST /jibriqueue/update"] = function (event) return async_handler_wrapper(event,handle_update_jibri_queue) end;
  419. };
  420. });