123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510 |
- local st = require "util.stanza";
- local jid = require "util.jid";
- local http = require "net.http";
- local json = require "cjson";
- local inspect = require('inspect');
- local socket = require "socket";
- local uuid_gen = require "util.uuid".generate;
- local jwt = require "luajwtjitsi";
- local it = require "util.iterators";
- local neturl = require "net.url";
- local parse = neturl.parseQuery;
-
- local get_room_from_jid = module:require "util".get_room_from_jid;
- local room_jid_match_rewrite = module:require "util".room_jid_match_rewrite;
- local is_healthcheck_room = module:require "util".is_healthcheck_room;
- local room_jid_split_subdomain = module:require "util".room_jid_split_subdomain;
- local internal_room_jid_match_rewrite = module:require "util".internal_room_jid_match_rewrite;
- local async_handler_wrapper = module:require "util".async_handler_wrapper;
-
- -- this basically strips the domain from the conference.domain address
- local parentHostName = string.gmatch(tostring(module.host), "%w+.(%w.+)")();
- if parentHostName == nil then
- log("error", "Failed to start - unable to get parent hostname");
- return;
- end
-
- local parentCtx = module:context(parentHostName);
- if parentCtx == nil then
- log("error",
- "Failed to start - unable to get parent context for host: %s",
- tostring(parentHostName));
- return;
- end
- local token_util = module:require "token/util".new(parentCtx);
-
- local ASAPKeyServer
- = module:get_option_string("asap_key_server");
-
- if ASAPKeyServer then
- module:log("info", "ASAP Public Key URL %s", ASAPKeyServer);
- token_util:set_asap_key_server(ASAPKeyServer);
- end
-
- local ASAPKeyPath
- = module:get_option_string("asap_key_path", '/etc/prosody/certs/asap.key');
-
- local ASAPKeyId
- = module:get_option_string("asap_key_id", 'jitsi');
-
- local ASAPIssuer
- = module:get_option_string("asap_issuer", 'jitsi');
-
- local ASAPAudience
- = module:get_option_string("asap_audience", 'jibri-queue');
-
- local ASAPAcceptedIssuers
- = module:get_option_array('asap_accepted_issuers',{'jibri-queue'});
-
- module:log("info", "ASAP Accepted Issuers %s", ASAPAcceptedIssuers);
- token_util:set_asap_accepted_issuers(ASAPAcceptedIssuers);
-
- local ASAPAcceptedAudiences
- = module:get_option_array('asap_accepted_audiences',{'*'});
-
- module:log("info", "ASAP Accepted Audiences %s", ASAPAcceptedAudiences);
- token_util:set_asap_accepted_audiences(ASAPAcceptedAudiences);
-
- local ASAPTTL
- = module:get_option_number("asap_ttl", 3600);
-
- local ASAPTTL_THRESHOLD
- = module:get_option_number("asap_ttl_threshold", 600);
-
- local ASAPKey;
-
- local queueServiceURL
- = module:get_option_string("jibri_queue_url");
-
- local JibriRegion
- = module:get_option_string("jibri_region", 'default');
-
- if queueServiceURL == nil then
- log("error", "No jibri_queue_url specified. No service to contact!");
- return;
- end
-
- -- option to enable/disable token verifications
- local disableTokenVerification
- = module:get_option_boolean("disable_jibri_queue_token_verification", false);
-
- local http_headers = {
- ["User-Agent"] = "Prosody ("..prosody.version.."; "..prosody.platform..")",
- ["Content-Type"] = "application/json"
- };
-
- -- we use async to detect Prosody 0.10 and earlier
- local have_async = pcall(require, "util.async");
- if not have_async then
- module:log("warn", "conference duration will not work with Prosody version 0.10 or less.");
- return;
- end
-
- local muc_component_host = module:get_option_string("muc_component");
- if muc_component_host == nil then
- log("error", "No muc_component specified. No muc to operate on for jibri queue!");
- return;
- end
-
- log("info", "Starting jibri queue handling for %s", muc_component_host);
-
- local external_api_url = module:get_option_string("external_api_url",tostring(parentHostName));
- module:log("info", "External advertised API URL", external_api_url);
-
- -- Read ASAP key once on module startup
- local f = io.open(ASAPKeyPath, "r");
- if f then
- ASAPKey = f:read("*all");
- f:close();
- if not ASAPKey then
- module:log("warn", "No ASAP Key read, disabling muc_events plugin");
- return
- end
- else
- module:log("warn", "Error reading ASAP Key, disabling muc_events plugin");
- return
- end
-
- -- TODO: Figure out a less arbitrary default cache size.
- local jwtKeyCacheSize = module:get_option_number("jwt_pubkey_cache_size", 128);
- local jwtKeyCache = require"util.cache".new(jwtKeyCacheSize);
-
- local function round(num, numDecimalPlaces)
- local mult = 10^(numDecimalPlaces or 0)
- return math.floor(num * mult + 0.5) / mult
- end
-
- local function generateToken(audience)
- audience = audience or ASAPAudience
- local t = os.time()
- local err
- local exp_key = 'asap_exp.'..audience
- local token_key = 'asap_token.'..audience
- local exp = jwtKeyCache:get(exp_key)
- local token = jwtKeyCache:get(token_key)
-
- --if we find a token and it isn't too far from expiry, then use it
- if token ~= nil and exp ~= nil then
- exp = tonumber(exp)
- if (exp - t) > ASAPTTL_THRESHOLD then
- return token
- end
- end
-
- --expiry is the current time plus TTL
- exp = t + ASAPTTL
- local payload = {
- iss = ASAPIssuer,
- aud = audience,
- nbf = t,
- exp = exp,
- }
-
- -- encode
- local alg = "RS256"
- token, err = jwt.encode(payload, ASAPKey, alg, {kid = ASAPKeyId})
- if not err then
- token = 'Bearer '..token
- jwtKeyCache:set(exp_key,exp)
- jwtKeyCache:set(token_key,token)
- return token
- else
- return ''
- end
- end
-
- local function sendIq(participant,action,requestId,time,position,token)
- local iqId = uuid_gen();
- local from = module:get_host();
- module:log("info","Oubound iq id %s",iqId);
- local outStanza = st.iq({type = 'set', from = from, to = participant, id = iqId}):tag("jibri-queue",
- { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId, action = action });
-
- module:log("info","Oubound base stanza %s",inspect(outStanza));
-
- if token then
- outStanza:tag("token"):text(token):up()
- end
- if time then
- outStanza:tag("time"):text(tostring(time)):up()
- end
- if position then
- outStanza:tag("position"):text(tostring(position)):up()
- end
- module:log("info","Oubound stanza %s",inspect(outStanza));
- module:send(outStanza);
- end
-
- local function cb(content_, code_, response_, request_)
- if code_ == 200 or code_ == 204 then
- module:log("debug", "URL Callback: Code %s, Content %s, Request (host %s, path %s, body %s), Response: %s",
- code_, content_, request_.host, request_.path, inspect(request_.body), inspect(response_));
- else
- module:log("warn", "URL Callback non successful: Code %s, Content %s, Request (%s), Response: %s",
- code_, content_, inspect(request_), inspect(response_));
- end
- end
-
- local function sendEvent(type,room_address,participant,requestId,replyIq,replyError)
- local event_ts = round(socket.gettime()*1000);
- local node, host, resource, target_subdomain = room_jid_split_subdomain(room_address);
- local room_param = '';
- if target_subdomain then
- room_param = target_subdomain..'/'..node;
- else
- room_param = node;
- end
-
- local out_event = {
- ["conference"] = room_address,
- ["roomParam"] = room_param,
- ["eventType"] = type,
- ["participant"] = participant,
- ["externalApiUrl"] = external_api_url.."/jibriqueue/update",
- ["requestId"] = requestId,
- ["region"] = JibriRegion,
- }
- module:log("debug","Sending event %s",inspect(out_event));
-
- local headers = http_headers or {}
- headers['Authorization'] = generateToken()
-
- module:log("debug","Sending headers %s",inspect(headers));
- local requestURL = queueServiceURL.."/job/recording"
- if type=="LeaveQueue" then
- requestURL = requestURL .."/cancel"
- end
- local request = http.request(requestURL, {
- headers = headers,
- method = "POST",
- body = json.encode(out_event)
- }, function (content_, code_, response_, request_)
- if code_ == 200 or code_ == 204 then
- module:log("debug", "URL Callback: Code %s, Content %s, Request (host %s, path %s, body %s), Response: %s",
- code_, content_, request_.host, request_.path, inspect(request_.body), inspect(response_));
- module:log("info", "sending reply IQ %s",inspect(replyIq));
- module:send(replyIq);
- else
- module:log("warn", "URL Callback non successful: Code %s, Content %s, Request (%s), Response: %s",
- code_, content_, inspect(request_), inspect(response_));
- module:log("warn", "sending reply error IQ %s",inspect(replyError));
- module:send(replyError);
- end
- end);
- end
-
- -- receives iq from client currently connected to the room
- function on_iq(event)
- local requestId;
- -- Check the type of the incoming stanza to avoid loops:
- if event.stanza.attr.type == "error" then
- return; -- We do not want to reply to these, so leave.
- end
- if event.stanza.attr.to == module:get_host() then
- if event.stanza.attr.type == "set" then
- log("info", "Jibri Queue Messsage Event found: %s ",inspect(event.stanza));
- local reply = st.reply(event.stanza);
- local replyError = st.error_reply(event.stanza,'cancel','internal-server-error',"Queue Server Error");
- module:log("info","Reply stanza %s",inspect(reply));
-
- local jibriQueue
- = event.stanza:get_child('jibri-queue', 'http://jitsi.org/protocol/jibri-queue');
- if jibriQueue then
- module:log("info", "Jibri Queue Request: %s ",inspect(jibriQueue));
-
- local roomAddress = jibriQueue.attr.room;
- local room = get_room_from_jid(room_jid_match_rewrite(roomAddress));
-
- if not room then
- module:log("warn", "No room found %s", roomAddress);
- return false;
- end
-
- local from = event.stanza.attr.from;
-
- local occupant = room:get_occupant_by_real_jid(from);
- if not occupant then
- module:log("warn", "No occupant %s found for %s", from, roomAddress);
- return false;
- end
-
- local action = jibriQueue.attr.action;
- if action == 'join' then
- -- join action, so send event out
- requestId = uuid_gen();
-
- -- now handle new jibri queue message
- room.jibriQueue[occupant.jid] = requestId;
- reply:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
- replyError:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
-
- module:log("info","Sending JoinQueue event for jid %s occupant %s reply %s",roomAddress,occupant.jid,inspect(reply));
- sendEvent('JoinQueue',roomAddress,occupant.jid,requestId,reply,replyError);
- end
- if action == 'leave' then
- requestId = jibriQueue.attr.requestId;
- -- TODO: check that requestId is the same as cached value
- room.jibriQueue[occupant.jid] = nil;
- reply:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
- replyError:add_child(st.stanza("jibri-queue", { xmlns = 'http://jitsi.org/protocol/jibri-queue', requestId = requestId})):up()
- sendEvent('LeaveQueue',roomAddress,occupant.jid,requestId,reply,replyError);
- end
- else
- module:log("warn","Jibri Queue Stanza missing child %s",inspect(event.stanza))
- end
- end
- end
- return true
- end
-
- -- create recorder queue cache for the room
- function room_created(event)
- local room = event.room;
-
- if is_healthcheck_room(room.jid) then
- return;
- end
-
- room.jibriQueue = {};
- end
-
- -- Conference ended, clear all queue cache jids
- function room_destroyed(event)
- local room = event.room;
-
- if is_healthcheck_room(room.jid) then
- return;
- end
- for jid, x in pairs(room.jibriQueue) do
- if x then
- sendEvent('LeaveQueue',internal_room_jid_match_rewrite(room.jid),jid,x);
- end
- end
- end
-
- -- Occupant left remove it from the queue if it joined the queue
- function occupant_leaving(event)
- local room = event.room;
-
- if is_healthcheck_room(room.jid) then
- return;
- end
-
- local occupant = event.occupant;
- local requestId = room.jibriQueue[occupant.jid];
- -- check if user has cached queue request
- if requestId then
- -- remove occupant from queue cache, signal backend
- room.jibriQueue[occupant.jid] = nil;
- sendEvent('LeaveQueue',internal_room_jid_match_rewrite(room.jid),occupant.jid,requestId);
- end
- end
-
- module:hook("iq/host", on_iq);
-
- -- executed on every host added internally in prosody, including components
- function process_host(host)
- if host == muc_component_host then -- the conference muc component
- module:log("info","Hook to muc events on %s", host);
-
- local muc_module = module:context(host);
- muc_module:hook("muc-room-created", room_created, -1);
- -- muc_module:hook("muc-occupant-joined", occupant_joined, -1);
- muc_module:hook("muc-occupant-pre-leave", occupant_leaving, -1);
- muc_module:hook("muc-room-destroyed", room_destroyed, -1);
- end
- end
-
- if prosody.hosts[muc_component_host] == nil then
- module:log("info","No muc component found, will listen for it: %s", muc_component_host)
-
- -- when a host or component is added
- prosody.events.add_handler("host-activated", process_host);
- else
- process_host(muc_component_host);
- end
-
- module:log("info", "Loading jibri_queue_component");
-
- --- Verifies room name, domain name with the values in the token
- -- @param token the token we received
- -- @param room_name the room name
- -- @param group name of the group (optional)
- -- @param session the session to use for storing token specific fields
- -- @return true if values are ok or false otherwise
- function verify_token(token, room_jid, session)
- if disableTokenVerification then
- return true;
- end
-
- -- if not disableTokenVerification and we do not have token
- -- stop here, cause the main virtual host can have guest access enabled
- -- (allowEmptyToken = true) and we will allow access to rooms info without
- -- a token
- if token == nil then
- log("warn", "no token provided");
- return false;
- end
-
- session.auth_token = token;
- local verified, reason, message = token_util:process_and_verify_token(session);
- if not verified then
- log("warn", "not a valid token %s: %s", tostring(reason), tostring(message));
- return false;
- end
-
- if not token_util:verify_room(session, room_jid) then
- log("warn", "Token %s not allowed to access: %s",
- tostring(token), tostring(room_jid));
- return false;
- end
-
- return true;
- end
-
- --- Handles request for updating jibri queue status
- -- @param event the http event, holds the request query
- -- @return GET response, containing a json with response details
- function handle_update_jibri_queue(event)
- module:log("info","Update Jibri Queue Event Received");
- -- if (not event.request.url.query) then
- -- return { status_code = 400; };
- -- end
-
- local body = json.decode(event.request.body);
- -- local params = parse(event.request.url.query);
-
- module:log("info","Update Jibri Event Body %s",inspect(body));
-
- -- local token = params["token"];
- local token
- if not token then
- token = event.request.headers["authorization"];
- if not token then
- token = ''
- else
- local prefixStart, prefixEnd = token:find("Bearer ");
- if prefixStart ~= 1 then
- module:log("error", "Invalid authorization header format. The header must start with the string 'Bearer '");
- return 403
- end
- token = token:sub(prefixEnd + 1);
- end
- end
-
- local user_jid = body["participant"];
- local roomAddress = body["conference"];
- local userJWT = body["token"];
- local action = body["action"];
- local time = body["time"];
- local position = body["position"];
- local requestId = body["requestId"];
-
- local room_jid = room_jid_match_rewrite(roomAddress);
-
- if not verify_token(token, room_jid, {}) then
- return { status_code = 403; };
- end
-
- local room = get_room_from_jid(room_jid);
- if (not room) then
- log("error", "no room found %s", roomAddress);
- return { status_code = 404; };
- end
-
- local occupant = room:get_occupant_by_real_jid(user_jid);
- if not occupant then
- log("warn", "No occupant %s found for %s", user_jid, roomAddress);
- return { status_code = 404; };
- end
-
- if not room.jibriQueue[occupant.jid] then
- log("warn", "No queue request found for occupant %s in conference %s",occupant.jid,room.jid)
- return { status_code = 404; };
- end
-
- if not action then
- if userJWT then
- action = 'token';
- else
- action = 'info';
- end
- end
-
- if not requestId then
- requestId = room.jibriQueue[occupant.jid];
- end
-
- -- TODO: actually implement udpate code here
- sendIq(occupant.jid,action,requestId,time,position,userJWT);
- return { status_code = 200; };
- end
-
- module:depends("http");
- module:provides("http", {
- default_path = "/";
- name = "jibriqueue";
- route = {
- ["POST /jibriqueue/update"] = function (event) return async_handler_wrapper(event,handle_update_jibri_queue) end;
- };
- });
|