1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-06-27 21:05:53 +00:00
* Experiment Update: This contains further work on the MQTT-based PubSub experiment.
This commit is contained in:
SirStendec 2023-11-01 14:17:11 -04:00
parent 0460c02994
commit 4d2b65e7c2
4 changed files with 1188 additions and 66 deletions

View file

@ -1,7 +1,7 @@
{
"name": "frankerfacez",
"author": "Dan Salvato LLC",
"version": "4.56.1",
"version": "4.56.2",
"description": "FrankerFaceZ is a Twitch enhancement suite.",
"private": true,
"license": "Apache-2.0",

View file

@ -640,7 +640,25 @@ export default class Emotes extends Module {
this.on('pubsub:command:follow_sets', this.updateFollowSets, this);
// TODO: Implement emote updates.
this.on('pubsub:command:add_emote', msg => {
const set_id = msg.set_id,
emote = msg.emote;
if ( ! this.emote_sets[set_id] )
return;
this.addEmoteToSet(set_id, emote);
});
this.on('pubsub:command:remove_emote', msg => {
const set_id = msg.set_id,
emote_id = msg.emote_id;
if ( ! this.emote_sets[set_id] )
return;
this.removeEmoteFromSet(set_id, emote_id);
});
this.on('chat:reload-data', flags => {
if ( ! flags || flags.emotes )

File diff suppressed because it is too large Load diff

View file

@ -1,22 +1,12 @@
import { EventEmitter } from "./events";
import { MqttClient, DISCONNECT, SUBSCRIBE } from "denoflare-mqtt";
import { MqttClient, DISCONNECT } from './custom_denoflare_mqtt'; // "denoflare-mqtt";
import { b64ToArrayBuffer, debounce, importRsaKey, make_enum, sleep } from "./object";
const SUBTOPIC_MATCHER = /\/(\d+)$/;
function makeSignal() {
const out = {};
out.promise = new Promise((s,f) => {
out.resolve = s;
out.reject = f;
});
return out;
}
// Only match 1-4 digit numbers, to avoid matching twitch IDs.
// 9999 gives us millions of clients on a topic, so we're never
// going to have more subtopics than 4 digits.
const SUBTOPIC_MATCHER = /\/(?:s(\d+)|(\d{1,4}))$/;
MqttClient.prototype.reschedulePing = function reschedulePing() {
@ -34,26 +24,6 @@ MqttClient.prototype.reschedulePing = function reschedulePing() {
}
MqttClient.prototype.ffzUnsubscribe = function ffzUnsubscribe(topics) {
// TODO: This
}
MqttClient.prototype.ffzSubscribe = function ffzSubscribe(topics) {
const packetId = this.obtainPacketId();
const signal = this.pendingSubscribes[packetId] = makeSignal();
if ( ! Array.isArray(topics) )
topics = [topics];
return this.sendMessage({
type: SUBSCRIBE,
packetId,
subscriptions: topics.map(topic => ({topicFilter: topic}))
}).then(() => signal.promise);
}
export const State = make_enum(
'Disconnected',
'Connecting',
@ -88,6 +58,9 @@ export default class PubSubClient extends EventEmitter {
this._pending_topics = new Set;
// Debounce a few things.
this.scheduleHeartbeat = this.scheduleHeartbeat.bind(this);
this._sendHeartbeat = this._sendHeartbeat.bind(this);
this._fetchNewTopics = this._fetchNewTopics.bind(this);
this._sendSubscribes = debounce(this._sendSubscribes, 250);
this._sendUnsubscribes = debounce(this._sendUnsubscribes, 250);
@ -118,9 +91,9 @@ export default class PubSubClient extends EventEmitter {
// Data Loading
// ========================================================================
loadData() {
loadData(force = false) {
// If we have all the data we need, don't do anything.
if ( this._data && ! this._pending_topics.size )
if ( ! force && this._data && ! this._pending_topics.size )
return Promise.resolve(this._data);
if ( ! this._data_loader )
@ -183,10 +156,14 @@ export default class PubSubClient extends EventEmitter {
// Record all the topic mappings we just got.
// TODO: Check for subtopic mismatches.
// TODO: Check for removed subtopic assignments.
if ( data.topics )
for(const [key, val] of Object.entries(data.topics))
this._topics.set(key, val);
// Update the heartbeat timer.
this.scheduleHeartbeat();
this._data = data;
return data;
}
@ -258,14 +235,7 @@ export default class PubSubClient extends EventEmitter {
this._createClient(data);
// Set up a heartbeat to keep us alive.
// TODO: Make this random / staggered maybe.
if ( this._heartbeat )
clearInterval(this._heartbeat);
this._heartbeat = setInterval(
() => this._sendHeartbeat(),
5 * 60 * 1000
); // every 5 minutes.
this.scheduleHeartbeat();
}
disconnect() {
@ -273,10 +243,16 @@ export default class PubSubClient extends EventEmitter {
this._destroyClient();
this._state = State.Disconnected;
if ( this._heartbeat ) {
clearInterval(this._heartbeat);
this._heartbeat = null;
}
this.clearHeartbeat();
// Reset all our state except active topics.
this._data = null;
this._live_topics.clear();
this._topics.clear();
this._pending_topics.clear();
for(const topic of this._active_topics)
this._pending_topics.add(topic);
}
subscribe(topic) {
@ -334,15 +310,23 @@ export default class PubSubClient extends EventEmitter {
// Client Management
// ========================================================================
clearHeartbeat() {
if ( this._heartbeat )
clearTimeout(this._heartbeat);
}
scheduleHeartbeat() {
if ( this._heartbeat )
clearTimeout(this._heartbeat);
this._heartbeat = setTimeout(this._sendHeartbeat, 5 * 60 * 1000);
}
_sendHeartbeat() {
if ( this._client && this._data?.client_id ) {
return this._client.publish({
topic: 'heartbeats',
payload: JSON.stringify({
id: this._data.client_id
})
});
}
if ( ! this._data?.client_id )
return this.scheduleHeartbeat();
this.loadData(true)
.finally(this.scheduleHeartbeat);
}
_destroyClient() {
@ -453,12 +437,12 @@ export default class PubSubClient extends EventEmitter {
return this._client.connect({
clientId: data.client_id,
password: data.password,
keepAlive: 120
}).then(() => {
keepAlive: 120,
clean: true
}).then(msg => {
this._state = State.Connected;
this.emit('connect');
this._sendHeartbeat();
return this._sendSubscribes()
});
}
@ -481,7 +465,7 @@ export default class PubSubClient extends EventEmitter {
if ( subtopic === 0 )
topics.push(topic);
else
topics.push(`${topic}/${subtopic}`);
topics.push(`${topic}/s${subtopic}`);
// Make a note, we're subscribing to this topic.
this._live_topics.add(topic);
@ -489,7 +473,15 @@ export default class PubSubClient extends EventEmitter {
}
if ( topics.length )
return this._client.ffzSubscribe(topics);
return this._client.subscribe({topicFilter: topics })
.catch(() => {
// If there was an error, we did NOT subscribe.
for(const topic of topics)
this._live_topics.delete(topic);
// Call sendSubscribes again after a bit.
return delay(2000).then(() => this._sendSubscribes());
});
else
return Promise.resolve();
}
@ -514,14 +506,18 @@ export default class PubSubClient extends EventEmitter {
if ( subtopic === 0 )
real_topic = topic;
else
real_topic = `${topic}/${subtopic}`;
real_topic = `${topic}/s${subtopic}`;
topics.push(real_topic);
this._live_topics.delete(topic);
}
if ( topics.length )
return this._client.ffzUnsubscribe(topics);
return this._client.unsubscribe({topicFilter: topics})
.catch(error => {
if ( this.logger )
this.logger.warn('Received error when unsubscribing from topics:', error);
});
else
return Promise.resolve();
}