diff --git a/package.json b/package.json index 627b4ba0..4b9c8e11 100755 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "frankerfacez", "author": "Dan Salvato LLC", - "version": "4.55.3", + "version": "4.56.0", "description": "FrankerFaceZ is a Twitch enhancement suite.", "private": true, "license": "Apache-2.0", @@ -56,6 +56,7 @@ "@popperjs/core": "^2.10.2", "crypto-js": "^3.3.0", "dayjs": "^1.10.7", + "denoflare-mqtt": "^0.0.2", "displacejs": "^1.4.1", "emoji-regex": "^9.2.2", "file-saver": "^2.0.5", @@ -73,7 +74,6 @@ "sortablejs": "^1.14.0", "sourcemapped-stacktrace": "^1.1.11", "text-diff": "^1.0.1", - "u8-mqtt": "^0.5.3", "vue": "^2.6.14", "vue-clickaway": "^2.2.2", "vue-color": "^2.8.1", @@ -83,6 +83,7 @@ "pnpm": { "overrides": { "ansi-regex@>2.1.1 <5.0.1": ">=5.0.1", + "chalk@<4": ">=4 <5", "set-value@<4.0.1": ">=4.0.1", "glob-parent@<5.1.2": ">=5.1.2" } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5d8234a7..0951ca20 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1,7 +1,12 @@ lockfileVersion: '6.0' +settings: + autoInstallPeers: true + excludeLinksFromLockfile: false + overrides: ansi-regex@>2.1.1 <5.0.1: '>=5.0.1' + chalk@<4: '>=4 <5' set-value@<4.0.1: '>=4.0.1' glob-parent@<5.1.2: '>=5.1.2' @@ -18,6 +23,9 @@ dependencies: dayjs: specifier: ^1.10.7 version: 1.10.7 + denoflare-mqtt: + specifier: ^0.0.2 + version: 0.0.2 displacejs: specifier: ^1.4.1 version: 1.4.1 @@ -69,9 +77,6 @@ dependencies: text-diff: specifier: ^1.0.1 version: 1.0.1 - u8-mqtt: - specifier: ^0.5.3 - version: 0.5.3 vue: specifier: ^2.6.14 version: 2.6.14 @@ -974,11 +979,6 @@ packages: hasBin: true dev: true - /ansi-regex@2.1.1: - resolution: {integrity: sha512-TIGnTpdo+E3+pCyAluZvtED5p5wCqLdezCyhPZzKPcxvFplEt4i+W7OONCKgeZFT3+y5NZZfOOS/Bdcanm1MYA==} - engines: {node: '>=0.10.0'} - dev: true - /ansi-regex@5.0.1: resolution: {integrity: sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ==} engines: {node: '>=8'} @@ -989,11 +989,6 @@ packages: engines: {node: '>=12'} dev: true - /ansi-styles@2.2.1: - resolution: {integrity: sha512-kmCevFghRiWM7HB5zTPULl4r9bVFSWjz62MhqizDGUrq2NWuNMQyuv4tHHoKJHs69M/MF64lEcHdYIocrdWQYA==} - engines: {node: '>=0.10.0'} - dev: true - /ansi-styles@4.3.0: resolution: {integrity: sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==} engines: {node: '>=8'} @@ -1114,7 +1109,7 @@ packages: /babel-code-frame@6.26.0: resolution: {integrity: sha512-XqYMR2dfdGMW+hd0IUZ2PwK+fGeFkOxZJ0wY+JaQAHzt1Zx8LcvpiZD2NiGkEG8qx0CfkAOr5xt76d1e8vG90g==} dependencies: - chalk: 1.1.3 + chalk: 4.1.2 esutils: 2.0.3 js-tokens: 3.0.2 dev: true @@ -1785,17 +1780,6 @@ packages: traverse: 0.3.9 dev: true - /chalk@1.1.3: - resolution: {integrity: sha512-U3lRVLMSlsCfjqYPbLyVv11M9CPW4I728d6TCKMAOJueEeB9/8o+eSsMnxPJD+Q+K909sdESg7C+tIkoH6on1A==} - engines: {node: '>=0.10.0'} - dependencies: - ansi-styles: 2.2.1 - escape-string-regexp: 1.0.5 - has-ansi: 2.0.0 - strip-ansi: 3.0.1 - supports-color: 2.0.0 - dev: true - /chalk@4.1.2: resolution: {integrity: sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==} engines: {node: '>=10'} @@ -2255,6 +2239,10 @@ packages: engines: {node: '>=0.4.0'} dev: true + /denoflare-mqtt@0.0.2: + resolution: {integrity: sha512-D9DpC1Y3T5vL+wwZnhKoXwYEgcE5359eZjeCv06d649pOIW+6GKYX9BoB7KIriMoB2j936CV4MHXrZM5ZcUu5A==} + dev: false + /depd@1.1.2: resolution: {integrity: sha512-7emPTl6Dpo6JRXOXjLRxck+FlLRX5847cLKEn00PLAgc3g2hTZZgr+e4c2v6QpSmLeFP3n5yUo7ft6avBK/5jQ==} engines: {node: '>= 0.6'} @@ -2514,11 +2502,6 @@ packages: resolution: {integrity: sha512-NiSupZ4OeuGwr68lGIeym/ksIZMJodUGOSCZ/FSnTxcrekbvqrgdUxlJOMpijaKZVjAJrWrGs/6Jy8OMuyj9ow==} dev: true - /escape-string-regexp@1.0.5: - resolution: {integrity: sha512-vbRorB5FUQWvla16U8R/qgaFIya2qGzwDrNmCZuYKrbdSUMG6I1ZCGQRefkRVhuOkIGVne7BQ35DSfo1qvJqFg==} - engines: {node: '>=0.8.0'} - dev: true - /escape-string-regexp@4.0.0: resolution: {integrity: sha512-TtpcNJ3XAzx3Gq8sWRzJaVajRs0uVxA2YAkdb1jm2YkPz4G6egUFAyA3n5vtEIZefPk5Wa4UXbKuS5fKkJWdgA==} engines: {node: '>=10'} @@ -3170,13 +3153,6 @@ packages: resolution: {integrity: sha512-9Qn4yBxelxoh2Ow62nP+Ka/kMnOXRi8BXnRaUwezLNhqelnN49xKz4F/dPP8OYLxLxq6JDtZb2i9XznUQbNPTg==} dev: true - /has-ansi@2.0.0: - resolution: {integrity: sha512-C8vBJ8DwUCx19vhm7urhTuUsr4/IyP6l4VzNQDv+ryHQObW3TTTp9yB68WpYgRe2bbaGuZ/se74IqFeVnMnLZg==} - engines: {node: '>=0.10.0'} - dependencies: - ansi-regex: 2.1.1 - dev: true - /has-bigints@1.0.2: resolution: {integrity: sha512-tSvCKtBr9lkF0Ex0aQiP9N+OpV4zi2r/Nee5VkRDbaqv35RLYMzbwQfFSZZH0kR+Rd6302UJZ2p/bJCEoR3VoQ==} dev: true @@ -5106,13 +5082,6 @@ packages: dependencies: safe-buffer: 5.1.2 - /strip-ansi@3.0.1: - resolution: {integrity: sha512-VhumSSbBqDTP8p2ZLKj40UjBCV4+v8bUSEpUb4KjRgWk9pbqGF4REFj6KEagidb2f/M6AzC0EmFyDNGaw9OCzg==} - engines: {node: '>=0.10.0'} - dependencies: - ansi-regex: 2.1.1 - dev: true - /strip-ansi@6.0.1: resolution: {integrity: sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A==} engines: {node: '>=8'} @@ -5142,11 +5111,6 @@ packages: engines: {node: '>=8'} dev: true - /supports-color@2.0.0: - resolution: {integrity: sha512-KKNVtd6pCYgPIKU4cp2733HWYCpplQhddZLBUryaAHou723x+FRzQ5Df824Fj+IyyuiQTRoub4SnIFfIcrp70g==} - engines: {node: '>=0.8.0'} - dev: true - /supports-color@7.2.0: resolution: {integrity: sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==} engines: {node: '>=8'} @@ -5323,10 +5287,6 @@ packages: is-typed-array: 1.1.12 dev: true - /u8-mqtt@0.5.3: - resolution: {integrity: sha512-C9eaN2/kxtmMhLVrKT8Yk6a3pRj12K+nNpylDqUn/rKYwAaMEUnvXNWqd4QMd/EaKKcMxpeA9cyCU8DlUOvKsw==} - dev: false - /uc.micro@1.0.6: resolution: {integrity: sha512-8Y75pvTYkLJW2hWQHXxoqRgV7qb9B+9vFEtidML+7koHUFapnVJAZ6cKs+Qjz5Aw3aZWHMC6u0wJE3At+nSGwA==} dev: false diff --git a/src/experiments.json b/src/experiments.json index 077f7910..601e384c 100644 --- a/src/experiments.json +++ b/src/experiments.json @@ -16,7 +16,7 @@ {"value": false, "weight": 60} ] }, - "pubsub": { + "cf_pubsub": { "name": "MQTT-Based PubSub", "description": "An experimental new pubsub system that should be more reliable than the existing socket cluster.", "groups": [ diff --git a/src/modules/chat/tokenizers.jsx b/src/modules/chat/tokenizers.jsx index dcbf2ef8..8c75c85a 100644 --- a/src/modules/chat/tokenizers.jsx +++ b/src/modules/chat/tokenizers.jsx @@ -1230,7 +1230,7 @@ const render_emote = (token, createElement, wrapped) => { emote = createElement('img', { class: `${EMOTE_CLASS} ffz-tooltip${hoverSrc ? ' ffz-hover-emote' : ''}${token.provider === 'twitch' ? ' twitch-emote' : token.provider === 'ffz' ? ' ffz-emote' : token.provider === 'emoji' ? ' ffz-emoji' : ''}`, attrs: { - src: IS_FIREFOX ? undefined : src, + src: (IS_FIREFOX && srcSet?.length) ? undefined : src, srcSet, alt: token.text, height: (token.big && ! token.can_big && token.height) ? `${token.height * 2}px` : undefined, @@ -1429,7 +1429,7 @@ export const AddonEmotes = { else emote = ( { + if ( this.experiments.getAssignment('cf_pubsub') ) + return 'Staging'; + return null; + }, ui: { path: 'Debugging @{"expanded": false, "sort": 9999} > PubSub >> General', @@ -42,52 +39,42 @@ export default class PubSubClient extends Module { }))) }, - changed: () => { - if ( this.experiments.getAssignment('pubsub') ) - this.reconnect(); - } + changed: () => this.reconnect() }); this._topics = new Map; this._client = null; - this._state = 0; } - loadMQTT() { + loadPubSubClient() { if ( this._mqtt ) return Promise.resolve(this._mqtt); - if ( this._mqtt_loader ) - return new Promise((s,f) => this._mqtt_loader.push([s,f])); - - return new Promise((s,f) => { - const loaders = this._mqtt_loader = [[s,f]]; - - import('u8-mqtt') + if ( ! this._mqtt_loader ) + this._mqtt_loader = import('utilities/pubsub') .then(thing => { - this._mqtt = thing; - this._mqtt_loader = null; - for(const pair of loaders) - pair[0](thing); + this._mqtt = thing.default; + return thing.default; }) - .catch(err => { - this._mqtt_loader = null; - for(const pair of loaders) - pair[1](err); - }); - }); + .finally(() => this._mqtt_loader = null); + + return this._mqtt_loader; } onEnable() { - // Check to see if we should be using PubSub. - if ( ! this.experiments.getAssignment('pubsub') ) - return; + this.on('experiments:changed:cf_pubsub', this._updateSetting, this); this.connect(); } onDisable() { this.disconnect(); + + this.off('experiments:changed:cf_pubsub', this._updateSetting, this); + } + + _updateSetting() { + this.settings.update('pubsub.use-cluster'); } @@ -96,17 +83,18 @@ export default class PubSubClient extends Module { // ======================================================================== get connected() { - return this._state === State.CONNECTED; + return this._client?.connected ?? false; } get connecting() { - return this._state === State.CONNECTING; + return this._client?.connecting ?? false; } get disconnected() { - return this._state === State.DISCONNECTED; + // If this is null, we have no client, so we aren't connected. + return this._client?.disconnected ?? true; } @@ -136,43 +124,39 @@ export default class PubSubClient extends Module { cluster = PUBSUB_CLUSTERS.Production; } - this.log.info(`Using Cluster: ${cluster_id}`); + this.log.info(`Using PubSub: ${cluster_id} (${cluster})`); - this._state = State.CONNECTING; - let client; + const user = this.resolve('site')?.getUser?.(); - try { - const mqtt = await this.loadMQTT(); - client = this._client = mqtt.mqtt_v5({ - keep_alive: 30 - }) - .with_websock(cluster) - .with_autoreconnect(); + // The client class handles everything for us. We only + // maintain a separate list of topics in case topics are + // subscribed when the client does not exist, or if the + // client needs to be recreated. - await client.connect({ - client_id: [`ffz_${FrankerFaceZ.version_info}--`, ''] - }); - this._state = State.CONNECTED; + const PubSubClient = await this.loadPubSubClient(); - } catch(err) { - this._state = State.DISCONNECTED; - if ( this._client ) - try { - this._client.end(true); - } catch(err) { /* no-op */ } - this._client = null; - throw err; - } + const client = this._client = new PubSubClient(cluster, { + user: user?.id ? { + provider: 'twitch', + id: user.id + } : null + }); - client.on_topic('*', pkt => { - const topic = pkt.topic; - let data; - try { - data = pkt.json(); - } catch(err) { - this.log.warn(`Error decoding PubSub message on topic "${topic}":`, err); - return; - } + client.on('connect', () => { + this.log.info('Connected to PubSub.'); + }); + + client.on('disconnect', () => { + this.log.info('Disconnected from PubSub.'); + }); + + client.on('error', err => { + this.log.error('Error in PubSub', err); + }); + + client.on('message', event => { + const topic = event.topic, + data = event.data; if ( ! data?.cmd ) { this.log.warn(`Received invalid PubSub message on topic "${topic}":`, data); @@ -188,6 +172,9 @@ export default class PubSubClient extends Module { // Subscribe to topics. const topics = [...this._topics.keys()]; client.subscribe(topics); + + // And start the client. + await client.connect(); } disconnect() { @@ -196,7 +183,6 @@ export default class PubSubClient extends Module { this._client.disconnect(); this._client = null; - this._state = State.DISCONNECTED; } @@ -253,6 +239,3 @@ export default class PubSubClient extends Module { } } - - -PubSubClient.State = State; diff --git a/src/sites/twitch-twilight/modules/channel.jsx b/src/sites/twitch-twilight/modules/channel.jsx index 8bf636c3..ec3a524a 100644 --- a/src/sites/twitch-twilight/modules/channel.jsx +++ b/src/sites/twitch-twilight/modules/channel.jsx @@ -307,7 +307,7 @@ export default class Channel extends Module { return; if ( this._subbed_id ) { - this.pubsub.unsubscribe(this, `twitch/${this._subbed_id}/channel/#`); + this.pubsub.unsubscribe(this, `twitch/${this._subbed_id}/channel`); this._subbed_id = null; } diff --git a/src/utilities/constants.js b/src/utilities/constants.js index 66feb4f6..2587934d 100644 --- a/src/utilities/constants.js +++ b/src/utilities/constants.js @@ -270,9 +270,9 @@ export const LINK_DATA_HOSTS = { export const PUBSUB_CLUSTERS = { - Production: 'wss://pubsub.frankerfacez.com/mqtt', - Staging: 'wss://pubsub-staging.frankerfacez.com/mqtt', - Development: 'wss://stendec.dev/mqtt/ws' + Production: `https://pubsub.frankerfacez.com`, + Staging: `https://pubsub-staging-alt.frankerfacez.com`, + Development: `https://stendec.dev/ps/` } diff --git a/src/utilities/object.js b/src/utilities/object.js index fc3a7885..c7f46e4b 100644 --- a/src/utilities/object.js +++ b/src/utilities/object.js @@ -864,4 +864,49 @@ export class SourcedSet { old_val.splice(idx, 1); this._rebuild(); } -} \ No newline at end of file +} + + +export function b64ToArrayBuffer(input) { + const bin = atob(input), + len = bin.length, + buffer = new ArrayBuffer(len), + view = new Uint8Array(buffer); + + for(let i = 0, len = bin.length; i < len; i++) + view[i] = bin.charCodeAt(i); + + return buffer; +} + + +const PEM_HEADER = /-----BEGIN (.+?) KEY-----/, + PEM_FOOTER = /-----END (.+?) KEY-----/; + +export function importRsaKey(pem, uses = ['verify']) { + const start_match = PEM_HEADER.exec(pem), + end_match = PEM_FOOTER.exec(pem); + + if ( ! start_match || ! end_match || start_match[1] !== end_match[1] ) + throw new Error('invalid key'); + + const is_private = /\bPRIVATE\b/i.test(start_match[1]), + start = start_match.index + start_match[0].length, + end = end_match.index; + + const content = pem.slice(start, end).replace(/\n/g, '').trim(); + //console.debug('content', JSON.stringify(content)); + + const buffer = b64ToArrayBuffer(content); + + return crypto.subtle.importKey( + is_private ? 'pkcs8' : 'spki', + buffer, + { + name: "RSA-PSS", + hash: "SHA-256" + }, + true, + uses + ); +} diff --git a/src/utilities/pubsub.js b/src/utilities/pubsub.js new file mode 100644 index 00000000..776dfce1 --- /dev/null +++ b/src/utilities/pubsub.js @@ -0,0 +1,529 @@ +import { EventEmitter } from "./events"; + +import { MqttClient, DISCONNECT, SUBSCRIBE } from "denoflare-mqtt"; +import { b64ToArrayBuffer, debounce, importRsaKey, make_enum, sleep } from "./object"; + +const SUBTOPIC_MATCHER = /\/(\d+)$/; + + +function makeSignal() { + const out = {}; + + out.promise = new Promise((s,f) => { + out.resolve = s; + out.reject = f; + }); + + return out; +} + + + +MqttClient.prototype.reschedulePing = function reschedulePing() { + this.clearPing(); + let delay = this.keepAliveSeconds; + if ( this.keepAliveOverride > 0 ) + delay = Math.min(delay, this.keepAliveOverride); + + this.pingTimeout = setTimeout(async () => { + try { + await this.ping(); + } catch(err) { /* no-op */ } + this.reschedulePing(); + }, delay * 1000); +} + + +MqttClient.prototype.ffzUnsubscribe = function ffzUnsubscribe(topics) { + // TODO: This +} + + +MqttClient.prototype.ffzSubscribe = function ffzSubscribe(topics) { + const packetId = this.obtainPacketId(); + const signal = this.pendingSubscribes[packetId] = makeSignal(); + + if ( ! Array.isArray(topics) ) + topics = [topics]; + + return this.sendMessage({ + type: SUBSCRIBE, + packetId, + subscriptions: topics.map(topic => ({topicFilter: topic})) + }).then(() => signal.promise); +} + + +export const State = make_enum( + 'Disconnected', + 'Connecting', + 'Connected' +); + + +export default class PubSubClient extends EventEmitter { + + constructor(server, options = {}) { + super(); + + this.server = server; + this.user = options?.user; + this.logger = options?.log ?? options?.logger; + + this._should_connect = false; + this._state = State.Disconnected; + + // Topics is a map of topics to sub-topic IDs. + this._topics = new Map; + + // Live Topics is a set of every topic we have sent subscribe + // packets to the server for. + this._live_topics = new Set; + + // Active Topics is a set of every topic we SHOULD be subscribed to. + this._active_topics = new Set; + + // Pending Topics is a set of topics that we should be subscribed to + // but that we don't yet have a sub-topic assignment. + this._pending_topics = new Set; + + // Debounce a few things. + this._fetchNewTopics = this._fetchNewTopics.bind(this); + this._sendSubscribes = debounce(this._sendSubscribes, 250); + this._sendUnsubscribes = debounce(this._sendUnsubscribes, 250); + } + + // ======================================================================== + // Properties + // ======================================================================== + + get id() { return this._data?.client_id ?? null } + + get topics() { return [...this._active_topics] } + + get disconnected() { + return this._state === State.Disconnected; + }; + + get connecting() { + return this._state === State.Connecting; + } + + get connected() { + return this._state === State.Connected; + } + + + // ======================================================================== + // Data Loading + // ======================================================================== + + loadData() { + // If we have all the data we need, don't do anything. + if ( this._data && ! this._pending_topics.size ) + return Promise.resolve(this._data); + + if ( ! this._data_loader ) + this._data_loader = this._loadData() + .finally(() => this._data_loader = null); + + return this._data_loader; + } + + async _loadData() { + let response, data; + try { + // TODO: Send removed topics. + response = await fetch(this.server, { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ + id: this.id, + user: this.user ?? null, + topics: this.topics + }) + }); + + if ( response.ok ) + data = await response.json(); + + } catch(err) { + throw new Error( + 'Unable to load PubSub data from server.', + { + cause: err + } + ); + } + + if ( ! data?.endpoint ) + throw new Error('Received invalid PubSub data from server.'); + + // If there's a signing key, parse it. + if ( data.public_key ) + try { + data.public_key = await importRsaKey(data.public_key); + } catch(err) { + throw new Error('Received invalid public key from server.', { + cause: err + }); + } + + else + data.public_key = null; + + if ( data.require_signing && ! data.public_key ) + throw new Error('Server requires signing but did not provide public key.'); + + // If we already had a password, preserve it. + if ( this._data?.password && ! data.password ) + data.password = this._data.password; + + // Record all the topic mappings we just got. + // TODO: Check for subtopic mismatches. + if ( data.topics ) + for(const [key, val] of Object.entries(data.topics)) + this._topics.set(key, val); + + this._data = data; + return data; + } + + async _fetchNewTopics(attempts = 0) { + this._fetch_timer = null; + let needs_fetch = false; + for(const topic of [...this._pending_topics]) { + if ( ! this._topics.has(topic) ) + needs_fetch = true; + } + + if ( needs_fetch ) + try { + await this.loadData(); + } catch(err) { + if ( attempts > 10 ) { + this._fetch_timer = null; + throw err; + } + + let delay = (attempts + 1) * (Math.floor(Math.random() * 10) + 2) * 1000; + if ( delay > 60000 ) + delay = (Math.floor(Math.random() * 60) + 30) * 1000; + + return sleep(delay).then(() => this._fetchNewTopics(attempts + 1)); + } + + if ( this._client ) + this._sendSubscribes(); + } + + + // ======================================================================== + // Connecting + // ======================================================================== + + connect() { + return this._connect(); + } + + async _connect(attempts = 0) { + if ( this._state === State.Connected ) + return; + + this._state = State.Connecting; + + let data; + try { + data = await this.loadData(); + } catch(err) { + if ( attempts > 10 ) { + this._state = State.Disconnected; + throw err; + } + + let delay = (attempts + 1) * (Math.floor(Math.random() * 10) + 2) * 1000; + if ( delay > 60000 ) + delay = (Math.floor(Math.random() * 60) + 30) * 1000; + + return sleep(delay).then(() => this._connect(attempts + 1)); + } + + if ( this.logger ) + this.logger.debug('Received Configuration', data); + + // We have our configuration. Now, create our client. + this._should_connect = true; + this._createClient(data); + + // Set up a heartbeat to keep us alive. + // TODO: Make this random / staggered maybe. + if ( this._heartbeat ) + clearInterval(this._heartbeat); + + this._heartbeat = setInterval( + () => this._sendHeartbeat(), + 5 * 60 * 1000 + ); // every 5 minutes. + } + + disconnect() { + this._should_connect = false; + this._destroyClient(); + this._state = State.Disconnected; + + if ( this._heartbeat ) { + clearInterval(this._heartbeat); + this._heartbeat = null; + } + } + + subscribe(topic) { + if ( Array.isArray(topic) ) { + for(const item of topic) + this.subscribe(item); + return; + } + + // If this is already an active topic, there's nothing + // else to do. + if ( this._active_topics.has(topic) ) + return; + + this._active_topics.add(topic); + + // If we don't have a sub-topic mapping, then we need to + // request a new one. Mark this topic as pending. + if ( ! this._topics.has(topic) ) + this._pending_topics.add(topic); + + // If we have a client, and we have pending topics, and there + // isn't a pending fetch, then schedule a fetch. + if ( this._client && this._pending_topics.size && ! this._fetch_timer ) + this._fetch_timer = setTimeout(this._fetchNewTopics, 5000); + + // Finally, if we have a client, send out a subscribe packet. + // This method is debounced by 250ms. + if ( this._client ) + this._sendSubscribes(); + } + + unsubscribe(topic) { + if ( Array.isArray(topic) ) { + for(const item of topic) + this.unsubscribe(item); + return; + } + + // If this topic isn't an active topic, we have nothing to do. + if ( ! this._active_topics.has(topic) ) + return; + + // Remove the topic from the active and pending topics. Don't + // remove it from the topic map though, since our client is + // still live. + this._active_topics.delete(topic); + this._pending_topics.delete(topic); + + if ( this._client ) + this._sendUnsubscribes(); + } + + // ======================================================================== + // Client Management + // ======================================================================== + + _sendHeartbeat() { + if ( this._client && this._data?.client_id ) { + return this._client.publish({ + topic: 'heartbeats', + payload: JSON.stringify({ + id: this._data.client_id + }) + }); + } + } + + _destroyClient() { + if ( ! this._client ) + return; + + try { + this._client.disconnect().catch(() => {}); + } catch(err) { /* no-op */ } + this._client = null; + + this._live_topics.clear(); + } + + _createClient(data) { + // If there is an existing client, destroy it first. + if ( this._client ) + this._destroyClient(); + + // Now, create a new instance of our client. + // This requires a parsed URL because the dumb client doesn't + // take URLs like every other client ever. + const url = new URL(data.endpoint); + + this._live_topics.clear(); + this._state = State.Connecting; + + const client = this._client = new MqttClient({ + hostname: url.hostname, + port: url.port ?? undefined, + protocol: 'wss', + maxMessagesPerSecond: 10 + }); + + this._client.onMqttMessage = message => { + if ( message.type === DISCONNECT ) { + this.emit('disconnect', message); + this._destroyClient(); + + if ( this._should_connect ) + this._createClient(data); + } + } + + this._client.onReceive = async message => { + // Get the topic, and remove the subtopic from it. + let topic = message.topic; + const match = SUBTOPIC_MATCHER.exec(topic); + if ( match ) + topic = topic.slice(0, match.index); + + if ( ! this._active_topics.has(topic) ) { + if ( this.logger ) + this.logger.debug('Received message for unsubscribed topic:', topic); + return; + } + + let msg; + try { + msg = JSON.parse(message.payload); + } catch(err) { + if ( this.logger ) + this.logger.warn(`Error decoding PubSub message on topic "${topic}":`, err); + return; + } + + if ( data.require_signing ) { + let valid = false; + const sig = msg.sig; + delete msg.sig; + + if ( sig ) + try { + const encoded = new TextEncoder().encode(JSON.stringify(msg)); + + valid = await crypto.subtle.verify( + { + name: "RSA-PSS", + saltLength: 32 + }, + data.public_key, + b64ToArrayBuffer(sig), + encoded + ); + + } catch(err) { + if ( this.logger ) + this.logger.warn('Error attempting to verify signature for message.', err); + return; + } + + if ( ! valid ) { + msg.sig = sig; + if ( this.logger ) + this.logger.debug(`Received message on topic "${topic}" that failed signature verification:`, msg); + return; + } + } + + this.emit('message', { topic, data: msg }); + } + + // We want to send a keep-alive every 60 seconds, despite + // requesting a keepAlive of 120 to the server. We do this + // because of how background tabs are throttled by browsers. + client.keepAliveOverride = 60; + + return this._client.connect({ + clientId: data.client_id, + password: data.password, + keepAlive: 120 + }).then(() => { + this._state = State.Connected; + this.emit('connect'); + + this._sendHeartbeat(); + return this._sendSubscribes() + }); + } + + _sendSubscribes() { + if ( ! this._client ) + return Promise.resolve(); + + const topics = []; + + for(const topic of this._active_topics) { + if ( this._live_topics.has(topic) ) + continue; + + const subtopic = this._topics.get(topic); + if ( subtopic != null ) { + // Make sure this topic isn't considered pending. + this._pending_topics.delete(topic); + + if ( subtopic === 0 ) + topics.push(topic); + else + topics.push(`${topic}/${subtopic}`); + + // Make a note, we're subscribing to this topic. + this._live_topics.add(topic); + } + } + + if ( topics.length ) + return this._client.ffzSubscribe(topics); + else + return Promise.resolve(); + } + + _sendUnsubscribes() { + if ( ! this._client ) + return Promise.resolve(); + + const topics = []; + + // iterate over a copy to support removal + for(const topic of [...this._live_topics]) { + if ( this._active_topics.has(topic) ) + continue; + + // Should never be null, but be safe. + const subtopic = this._topics.get(topic); + if ( subtopic == null ) + continue; + + let real_topic; + if ( subtopic === 0 ) + real_topic = topic; + else + real_topic = `${topic}/${subtopic}`; + + topics.push(real_topic); + this._live_topics.delete(topic); + } + + if ( topics.length ) + return this._client.ffzUnsubscribe(topics); + else + return Promise.resolve(); + } + +} \ No newline at end of file diff --git a/src/utilities/rich_tokens.js b/src/utilities/rich_tokens.js index 29c1bd3f..b57dbbc0 100644 --- a/src/utilities/rich_tokens.js +++ b/src/utilities/rich_tokens.js @@ -498,60 +498,73 @@ TOKEN_TYPES.format = function(token, createElement, ctx) { // ============================================================================ TOKEN_TYPES.gallery = function(token, createElement, ctx) { - if ( ! token.items ) + + if ( ! Array.isArray(token.items) || ! token.items.length ) return null; - let items = token.items - .map(item => renderTokens(item, createElement, ctx)) - .filter(x => x); - if ( ! items.length ) - return null; + let first_column = [], + second_column = [], + first = true, + i = 0; - if ( items.length > 4 ) - items = items.slice(0, 4); + for(const item of token.items) { + const content = renderTokens(item, createElement, ctx); + if ( content ) { + (first ? first_column : second_column).push(content); + first = ! first; + i++; + if ( i >= 4 ) + break; + } + } - const divisions = [], - count = items.length < 4 ? 1 : 2; + if ( second_column.length && first_column.length > second_column.length ) + second_column.push(first_column.pop()); - divisions.push(ctx.vue ? + if ( ! i ) + return null + + const columns = []; + + columns.push(ctx.vue ? createElement('div', { class: 'ffz--gallery-column', attrs: { - 'data-items': count + 'data-items': first_column.length } - }, items.slice(0, count)) : + }, first_column) : createElement('div', { className: 'ffz--gallery-column', - 'data-items': count - }, items.slice(0, count)) + 'data-items': first_column.length + }, first_column) ); - if ( items.length > 1 ) - divisions.push(ctx.vue ? + if ( second_column.length ) + columns.push(ctx.vue ? createElement('div', { class: 'ffz--gallery-column', attrs: { - 'data-items': items.length - count + 'data-items': second_column.length } - }, items.slice(count)) : + }, second_column) : createElement('div', { className: 'ffz--gallery-column', - 'data-items': items.length - count - }, items.slice(count)) + 'data-items': second_column.length + }, second_column) ); if ( ctx.vue ) return createElement('div', { class: 'ffz--rich-gallery', attrs: { - 'data-items': items.length + 'data-items': first_column.length + second_column.length } - }, divisions); + }, columns); return createElement('div', { className: 'ffz--rich-gallery', - 'data-items': items.length - }, divisions); + 'data-items': first_column.length + second_column.length + }, columns); }