You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

mod_jibri_queue_component.lua 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. local st = require "util.stanza";
  2. local jid = require "util.jid";
  3. local http = require "net.http";
  4. local json = require "cjson";
  5. local inspect = require('inspect');
  6. local socket = require "socket";
  7. local uuid_gen = require "util.uuid".generate;
  8. local jwt = require "luajwtjitsi";
  9. local it = require "util.iterators";
  10. local neturl = require "net.url";
  11. local parse = neturl.parseQuery;
  12. local get_room_from_jid = module:require "util".get_room_from_jid;
  13. local room_jid_match_rewrite = module:require "util".room_jid_match_rewrite;
  14. local is_healthcheck_room = module:require "util".is_healthcheck_room;
  15. local async_handler_wrapper = module:require "util".async_handler_wrapper;
  16. -- this basically strips the domain from the conference.domain address
  17. local parentHostName = string.gmatch(tostring(module.host), "%w+.(%w.+)")();
  18. if parentHostName == nil then
  19. log("error", "Failed to start - unable to get parent hostname");
  20. return;
  21. end
  22. local parentCtx = module:context(parentHostName);
  23. if parentCtx == nil then
  24. log("error",
  25. "Failed to start - unable to get parent context for host: %s",
  26. tostring(parentHostName));
  27. return;
  28. end
  29. local token_util = module:require "token/util".new(parentCtx);
  30. local ASAPKeyPath
  31. = module:get_option_string("asap_key_path", '/etc/prosody/certs/asap.key');
  32. local ASAPKeyId
  33. = module:get_option_string("asap_key_id", 'jitsi');
  34. local ASAPIssuer
  35. = module:get_option_string("asap_issuer", 'jitsi');
  36. local ASAPAudience
  37. = module:get_option_string("asap_audience", 'jibriqueue');
  38. local ASAPTTL
  39. = module:get_option_number("asap_ttl", 3600);
  40. local ASAPTTL_THRESHOLD
  41. = module:get_option_number("asap_ttl_threshold", 600);
  42. local ASAPKey;
  43. local queueServiceURL
  44. = module:get_option_string("jibri_queue_url");
  45. if queueServiceURL == nil then
  46. log("error", "No jibri_queue_url specified. No service to contact!");
  47. return;
  48. end
  49. -- option to enable/disable token verifications
  50. local disableTokenVerification
  51. = module:get_option_boolean("disable_jibri_queue_token_verification", false);
  52. local http_headers = {
  53. ["User-Agent"] = "Prosody ("..prosody.version.."; "..prosody.platform..")",
  54. ["Content-Type"] = "application/json"
  55. };
  56. -- we use async to detect Prosody 0.10 and earlier
  57. local have_async = pcall(require, "util.async");
  58. if not have_async then
  59. module:log("warn", "conference duration will not work with Prosody version 0.10 or less.");
  60. return;
  61. end
  62. local muc_component_host = module:get_option_string("muc_component");
  63. if muc_component_host == nil then
  64. log("error", "No muc_component specified. No muc to operate on for jibri queue!");
  65. return;
  66. end
  67. log("info", "Starting jibri queue handling for %s", muc_component_host);
  68. -- Read ASAP key once on module startup
  69. local f = io.open(ASAPKeyPath, "r");
  70. if f then
  71. ASAPKey = f:read("*all");
  72. f:close();
  73. if not ASAPKey then
  74. module:log("warn", "No ASAP Key read, disabling muc_events plugin");
  75. return
  76. end
  77. else
  78. module:log("warn", "Error reading ASAP Key, disabling muc_events plugin");
  79. return
  80. end
  81. -- TODO: Figure out a less arbitrary default cache size.
  82. local jwtKeyCacheSize = module:get_option_number("jwt_pubkey_cache_size", 128);
  83. local jwtKeyCache = require"util.cache".new(jwtKeyCacheSize);
  84. local function round(num, numDecimalPlaces)
  85. local mult = 10^(numDecimalPlaces or 0)
  86. return math.floor(num * mult + 0.5) / mult
  87. end
  88. local function generateToken(audience)
  89. audience = audience or ASAPAudience
  90. local t = os.time()
  91. local err
  92. local exp_key = 'asap_exp.'..audience
  93. local token_key = 'asap_token.'..audience
  94. local exp = jwtKeyCache:get(exp_key)
  95. local token = jwtKeyCache:get(token_key)
  96. --if we find a token and it isn't too far from expiry, then use it
  97. if token ~= nil and exp ~= nil then
  98. exp = tonumber(exp)
  99. if (exp - t) > ASAPTTL_THRESHOLD then
  100. return token
  101. end
  102. end
  103. --expiry is the current time plus TTL
  104. exp = t + ASAPTTL
  105. local payload = {
  106. iss = ASAPIssuer,
  107. aud = audience,
  108. nbf = t,
  109. exp = exp,
  110. }
  111. -- encode
  112. local alg = "RS256"
  113. token, err = jwt.encode(payload, ASAPKey, alg, {kid = ASAPKeyId})
  114. if not err then
  115. token = 'Bearer '..token
  116. jwtKeyCache:set(exp_key,exp)
  117. jwtKeyCache:set(token_key,token)
  118. return token
  119. else
  120. return ''
  121. end
  122. end
  123. local function cb(content_, code_, response_, request_)
  124. if code_ == 200 or code_ == 204 then
  125. module:log("debug", "URL Callback: Code %s, Content %s, Request (host %s, path %s, body %s), Response: %s",
  126. code_, content_, request_.host, request_.path, inspect(request_.body), inspect(response_));
  127. else
  128. module:log("warn", "URL Callback non successful: Code %s, Content %s, Request (%s), Response: %s",
  129. code_, content_, inspect(request_), inspect(response_));
  130. end
  131. end
  132. local function sendEvent(type,room_address,participant,edetails)
  133. local event_ts = round(socket.gettime()*1000);
  134. local out_event = {
  135. ["conference"] = room_address,
  136. ["event_type"] = "Event"..type,
  137. ["participant"] = participant,
  138. ["event_details"] = edetails,
  139. ["event_ts"] = event_ts
  140. }
  141. module:log("debug","Sending event %s",inspect(out_event));
  142. local headers = http_headers or {}
  143. headers['Authorization'] = generateToken()
  144. module:log("debug","Sending headers %s",inspect(headers));
  145. local request = http.request(queueServiceURL, {
  146. headers = headers,
  147. method = "POST",
  148. body = json.encode(out_event)
  149. }, cb);
  150. end
  151. -- receives iq from client currently connected to the room
  152. function on_iq(event)
  153. -- Check the type of the incoming stanza to avoid loops:
  154. if event.stanza.attr.type == "error" then
  155. return; -- We do not want to reply to these, so leave.
  156. end
  157. if event.stanza.attr.to == module:get_host() then
  158. if event.stanza.attr.type == "set" then
  159. log("info", "Jibri Queue Messsage Event found: %s ",inspect(event.stanza));
  160. local jibriQueue
  161. = event.stanza:get_child('jibriqueue', 'http://jitsi.org/protocol/jibri-queue');
  162. if jibriQueue then
  163. log("info", "Jibri Queue: %s ",inspect(jibriQueue));
  164. local roomAddress = jibriQueue.attr.room;
  165. local room = get_room_from_jid(room_jid_match_rewrite(roomAddress));
  166. if not room then
  167. log("warn", "No room found %s", roomAddress);
  168. return false;
  169. end
  170. local from = event.stanza.attr.from;
  171. local occupant = room:get_occupant_by_real_jid(from);
  172. if not occupant then
  173. log("warn", "No occupant %s found for %s", from, roomAddress);
  174. return false;
  175. end
  176. -- now handle new jibri queue message
  177. local edetails = {
  178. ["foo"] = "bar"
  179. }
  180. sendEvent('JoinQueue',room.jid,occupant.jid,edetails)
  181. end
  182. end
  183. end
  184. return true
  185. end
  186. function occupant_joined(event)
  187. local room = event.room;
  188. local occupant = event.occupant;
  189. if is_healthcheck_room(room.jid) then
  190. return;
  191. end
  192. local participant_count = it.count(room:each_occupant());
  193. -- now handle new jibri queue message
  194. local edetails = {
  195. ["participant_count"] = participant_count
  196. }
  197. sendEvent('Join',room.jid,occupant.jid,edetails)
  198. end
  199. module:hook("iq/host", on_iq);
  200. -- executed on every host added internally in prosody, including components
  201. function process_host(host)
  202. if host == muc_component_host then -- the conference muc component
  203. module:log("info","Hook to muc events on %s", host);
  204. local muc_module = module:context(host);
  205. -- muc_module:hook("muc-room-created", room_created, -1);
  206. muc_module:hook("muc-occupant-joined", occupant_joined, -1);
  207. -- muc_module:hook("muc-occupant-pre-leave", occupant_leaving, -1);
  208. -- muc_module:hook("muc-room-destroyed", room_destroyed, -1);
  209. end
  210. end
  211. if prosody.hosts[muc_component_host] == nil then
  212. module:log("info","No muc component found, will listen for it: %s", muc_component_host)
  213. -- when a host or component is added
  214. prosody.events.add_handler("host-activated", process_host);
  215. else
  216. process_host(muc_component_host);
  217. end
  218. module:log("info", "Loading jibri_queue_component");
  219. --- Verifies room name, domain name with the values in the token
  220. -- @param token the token we received
  221. -- @param room_name the room name
  222. -- @param group name of the group (optional)
  223. -- @param session the session to use for storing token specific fields
  224. -- @return true if values are ok or false otherwise
  225. function verify_token(token, room_name, session)
  226. if disableTokenVerification then
  227. return true;
  228. end
  229. -- if not disableTokenVerification and we do not have token
  230. -- stop here, cause the main virtual host can have guest access enabled
  231. -- (allowEmptyToken = true) and we will allow access to rooms info without
  232. -- a token
  233. if token == nil then
  234. log("warn", "no token provided");
  235. return false;
  236. end
  237. session.auth_token = token;
  238. local verified, reason = token_util:process_and_verify_token(session);
  239. if not verified then
  240. log("warn", "not a valid token %s", tostring(reason));
  241. return false;
  242. end
  243. local room_address = jid.join(room_name, module:get_host());
  244. -- if there is a group we are in multidomain mode and that group is not
  245. -- our parent host
  246. if group and group ~= "" and group ~= parentHostName then
  247. room_address = "["..group.."]"..room_address;
  248. end
  249. if not token_util:verify_room(session, room_address) then
  250. log("warn", "Token %s not allowed to join: %s",
  251. tostring(token), tostring(room_address));
  252. return false;
  253. end
  254. return true;
  255. end
  256. --- Handles request for updating jibri queue status
  257. -- @param event the http event, holds the request query
  258. -- @return GET response, containing a json with response details
  259. function handle_update_jibri_queue(event)
  260. if (not event.request.url.query) then
  261. return { status_code = 400; };
  262. end
  263. local params = parse(event.request.url.query);
  264. local user_jid = params["user"];
  265. local roomAddress = params["room"];
  266. if not verify_token(params["token"], roomAddress, {}) then
  267. return { status_code = 403; };
  268. end
  269. local room = get_room_from_jid(room_jid_match_rewrite(roomAddress));
  270. if (not room) then
  271. log("error", "no room found %s", roomAddress);
  272. return { status_code = 404; };
  273. end
  274. local occupant = room:get_occupant_by_real_jid(user_jid);
  275. if not occupant then
  276. log("warn", "No occupant %s found for %s", user_jid, roomAddress);
  277. return { status_code = 404; };
  278. end
  279. -- TODO: actually implement udpate code here
  280. return { status_code = 200; };
  281. end
  282. module:depends("http");
  283. module:provides("http", {
  284. default_path = "/";
  285. name = "jibriqueue";
  286. route = {
  287. ["GET /jibriqueue/update"] = function (event) return async_handler_wrapper(event,handle_update_jibri_queue) end;
  288. };
  289. });