Browse Source

chore: Updates mod_smacks.lua version to latest.

https://hg.prosody.im/prosody-modules/rev/f6f28ceff53a
j8
damencho 4 years ago
parent
commit
60db81f31c
1 changed files with 35 additions and 16 deletions
  1. 35
    16
      resources/prosody-plugins/mod_smacks.lua

+ 35
- 16
resources/prosody-plugins/mod_smacks.lua View File

2
 --
2
 --
3
 -- Copyright (C) 2010-2015 Matthew Wild
3
 -- Copyright (C) 2010-2015 Matthew Wild
4
 -- Copyright (C) 2010 Waqas Hussain
4
 -- Copyright (C) 2010 Waqas Hussain
5
+-- Copyright (C) 2012-2021 Kim Alvefur
5
 -- Copyright (C) 2012 Thijs Alkemade
6
 -- Copyright (C) 2012 Thijs Alkemade
6
 -- Copyright (C) 2014 Florian Zeitz
7
 -- Copyright (C) 2014 Florian Zeitz
7
 -- Copyright (C) 2016-2020 Thilo Molitor
8
 -- Copyright (C) 2016-2020 Thilo Molitor
36
 local sm3_attr = { xmlns = xmlns_sm3 };
36
 local sm3_attr = { xmlns = xmlns_sm3 };
37
 
37
 
38
 local resume_timeout = module:get_option_number("smacks_hibernation_time", 600);
38
 local resume_timeout = module:get_option_number("smacks_hibernation_time", 600);
39
-local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", false);
39
+local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", true);
40
 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false);
40
 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false);
41
 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0);
41
 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0);
42
+local max_inactive_unacked_stanzas = module:get_option_number("smacks_max_inactive_unacked_stanzas", 256);
42
 local delayed_ack_timeout = module:get_option_number("smacks_max_ack_delay", 30);
43
 local delayed_ack_timeout = module:get_option_number("smacks_max_ack_delay", 30);
43
 local max_hibernated_sessions = module:get_option_number("smacks_max_hibernated_sessions", 10);
44
 local max_hibernated_sessions = module:get_option_number("smacks_max_hibernated_sessions", 10);
44
 local max_old_sessions = module:get_option_number("smacks_max_old_sessions", 10);
45
 local max_old_sessions = module:get_option_number("smacks_max_old_sessions", 10);
116
 	};
117
 	};
117
 end
118
 end
118
 
119
 
119
-local function delayed_ack_function(session)
120
+local function delayed_ack_function(session, stanza)
120
 	-- fire event only if configured to do so and our session is not already hibernated or destroyed
121
 	-- fire event only if configured to do so and our session is not already hibernated or destroyed
121
 	if delayed_ack_timeout > 0 and session.awaiting_ack
122
 	if delayed_ack_timeout > 0 and session.awaiting_ack
122
 	and not session.hibernating and not session.destroyed then
123
 	and not session.hibernating and not session.destroyed then
123
 		session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d",
124
 		session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d",
124
 			session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0);
125
 			session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0);
125
-		module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue});
126
+		module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue, stanza = stanza});
126
 	end
127
 	end
127
 	session.delayed_ack_timer = nil;
128
 	session.delayed_ack_timer = nil;
128
 end
129
 end
158
 			end
159
 			end
159
 		end);
160
 		end);
160
 
161
 
161
-local function request_ack_if_needed(session, force, reason)
162
+local function request_ack_if_needed(session, force, reason, stanza)
162
 	local queue = session.outgoing_stanza_queue;
163
 	local queue = session.outgoing_stanza_queue;
163
 	local expected_h = session.last_acknowledged_stanza + #queue;
164
 	local expected_h = session.last_acknowledged_stanza + #queue;
164
 	-- session.log("debug", "*** SMACKS(1) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating));
165
 	-- session.log("debug", "*** SMACKS(1) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating));
165
 	if session.awaiting_ack == nil and not session.hibernating then
166
 	if session.awaiting_ack == nil and not session.hibernating then
167
+		local max_unacked = max_unacked_stanzas;
168
+		if session.state == "inactive"  then
169
+			max_unacked = max_inactive_unacked_stanzas;
170
+		end
166
 		-- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
171
 		-- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
167
 		-- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
172
 		-- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
168
 		-- further requests until a higher h-value would be expected.
173
 		-- further requests until a higher h-value would be expected.
169
 		-- session.log("debug", "*** SMACKS(2) ***: #queue=%s, max_unacked_stanzas=%s, expected_h=%s, last_requested_h=%s", tostring(#queue), tostring(max_unacked_stanzas), tostring(expected_h), tostring(session.last_requested_h));
174
 		-- session.log("debug", "*** SMACKS(2) ***: #queue=%s, max_unacked_stanzas=%s, expected_h=%s, last_requested_h=%s", tostring(#queue), tostring(max_unacked_stanzas), tostring(expected_h), tostring(session.last_requested_h));
170
-		if (#queue > max_unacked_stanzas and expected_h ~= session.last_requested_h) or force then
175
+		if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then
171
 			session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue);
176
 			session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue);
172
 			session.awaiting_ack = false;
177
 			session.awaiting_ack = false;
173
 			session.awaiting_ack_timer = stoppable_timer(1e-06, function ()
178
 			session.awaiting_ack_timer = stoppable_timer(1e-06, function ()
176
 				if not session.awaiting_ack and not session.hibernating and not session.destroyed then
181
 				if not session.awaiting_ack and not session.hibernating and not session.destroyed then
177
 					session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue);
182
 					session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue);
178
 					(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
183
 					(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
184
+					if session.destroyed then return end -- sending something can trigger destruction
179
 					session.awaiting_ack = true;
185
 					session.awaiting_ack = true;
180
 					-- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
186
 					-- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
181
 					session.last_requested_h = session.last_acknowledged_stanza + #queue;
187
 					session.last_requested_h = session.last_acknowledged_stanza + #queue;
182
 					session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
188
 					session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
183
 					if not session.delayed_ack_timer then
189
 					if not session.delayed_ack_timer then
184
 						session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
190
 						session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
185
-							delayed_ack_function(session);
191
+							delayed_ack_function(session, nil);		-- we don't know if this is the only new stanza in the queue
186
 						end);
192
 						end);
187
 					end
193
 					end
188
 				end
194
 				end
196
 	-- would not trigger this event (again).
202
 	-- would not trigger this event (again).
197
 	if #queue > max_unacked_stanzas and session.awaiting_ack and session.delayed_ack_timer == nil then
203
 	if #queue > max_unacked_stanzas and session.awaiting_ack and session.delayed_ack_timer == nil then
198
 		session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)");
204
 		session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)");
199
-		delayed_ack_function(session);
205
+		delayed_ack_function(session, stanza);		-- this is the only new stanza in the queue --> provide it to other modules
200
 	end
206
 	end
201
 end
207
 end
202
 
208
 
224
 
230
 
225
 		queue[#queue+1] = cached_stanza;
231
 		queue[#queue+1] = cached_stanza;
226
 		if session.hibernating then
232
 		if session.hibernating then
227
-			session.log("debug", "hibernating, stanza queued");
233
+			session.log("debug", "hibernating since %s, stanza queued", datetime.datetime(session.hibernating));
228
 			module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue, stanza = cached_stanza});
234
 			module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue, stanza = cached_stanza});
229
 			return nil;
235
 			return nil;
230
 		end
236
 		end
231
-		request_ack_if_needed(session, false, "outgoing_stanza_filter");
237
+		request_ack_if_needed(session, false, "outgoing_stanza_filter", stanza);
232
 	end
238
 	end
233
 	return stanza;
239
 	return stanza;
234
 end
240
 end
348
 	-- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h)
354
 	-- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h)
349
 	local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue;
355
 	local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue;
350
 	if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then
356
 	if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then
351
-		request_ack_if_needed(origin, true, "piggybacked by handle_r");
357
+		request_ack_if_needed(origin, true, "piggybacked by handle_r", nil);
352
 	end
358
 	end
353
 	return true;
359
 	return true;
354
 end
360
 end
390
 
396
 
391
 	origin.log("debug", "#queue = %d", #queue);
397
 	origin.log("debug", "#queue = %d", #queue);
392
 	origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
398
 	origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
393
-	request_ack_if_needed(origin, false, "handle_a")
399
+	request_ack_if_needed(origin, false, "handle_a", nil)
394
 	return true;
400
 	return true;
395
 end
401
 end
396
 module:hook_stanza(xmlns_sm2, "a", handle_a);
402
 module:hook_stanza(xmlns_sm2, "a", handle_a);
500
 						session.log("debug", "No push happened since hibernation started, hibernating session for up to %d extra seconds", resume_timeout);
506
 						session.log("debug", "No push happened since hibernation started, hibernating session for up to %d extra seconds", resume_timeout);
501
 						return resume_timeout;
507
 						return resume_timeout;
502
 					end
508
 					end
503
-					if current_time-timeout_start < resume_timeout and session.push_identifier ~= nil then
504
-						session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds", current_time-timeout_start);
505
-						return current_time-timeout_start;		-- time left to wait
509
+					if session.push_identifier ~= nil and current_time-timeout_start < resume_timeout then
510
+						session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds", resume_timeout-(current_time-timeout_start));
511
+						return resume_timeout-(current_time-timeout_start);		-- time left to wait
506
 					end
512
 					end
507
 					session.log("debug", "Destroying session for hibernating too long");
513
 					session.log("debug", "Destroying session for hibernating too long");
508
 					session_registry.set(session.username, session.resumption_token, nil);
514
 					session_registry.set(session.username, session.resumption_token, nil);
623
 			return false;
629
 			return false;
624
 		end
630
 		end
625
 		module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue});
631
 		module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue});
626
-		request_ack_if_needed(original_session, true, "handle_resume");
632
+		request_ack_if_needed(original_session, true, "handle_resume", nil);
627
 	else
633
 	else
628
 		module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]",
634
 		module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]",
629
 			session.username or "?", session.host or "?", session.type,
635
 			session.username or "?", session.host or "?", session.type,
636
 module:hook_stanza(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
642
 module:hook_stanza(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
637
 module:hook_stanza(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);
643
 module:hook_stanza(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);
638
 
644
 
645
+module:hook("csi-client-active", function (event)
646
+	if event.origin.smacks then
647
+		request_ack_if_needed(event.origin, true, "csi-active", nil);
648
+	end
649
+end);
650
+
651
+module:hook("csi-flushing", function (event)
652
+	if event.session.smacks then
653
+		request_ack_if_needed(event.session, true, "csi-active", nil);
654
+	end
655
+end);
656
+
639
 local function handle_read_timeout(event)
657
 local function handle_read_timeout(event)
640
 	local session = event.session;
658
 	local session = event.session;
641
 	if session.smacks then
659
 	if session.smacks then
654
 		session.awaiting_ack = true;
672
 		session.awaiting_ack = true;
655
 		if not session.delayed_ack_timer then
673
 		if not session.delayed_ack_timer then
656
 			session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
674
 			session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
657
-				delayed_ack_function(session);
675
+				delayed_ack_function(session, nil);
658
 			end);
676
 			end);
659
 		end
677
 		end
660
 		return true;
678
 		return true;

Loading…
Cancel
Save