diff --git a/package.json b/package.json index 721c69c4..13767bb2 100755 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "frankerfacez", "author": "Dan Salvato LLC", - "version": "4.75.7", + "version": "4.76.0", "description": "FrankerFaceZ is a Twitch enhancement suite.", "private": true, "license": "Apache-2.0", diff --git a/src/addons.ts b/src/addons.ts index 128b4129..6ed065db 100644 --- a/src/addons.ts +++ b/src/addons.ts @@ -346,7 +346,7 @@ export default class AddonManager extends Module<'addons'> { } getAddon(id: string) { - const addon = this.addons[id]; + const addon = this.addons[id] ?? null; return Array.isArray(addon) ? null : addon; } diff --git a/src/entry_ext.js b/src/entry_ext.js index bd59b44c..586f3f75 100644 --- a/src/entry_ext.js +++ b/src/entry_ext.js @@ -27,18 +27,67 @@ type: 'ffz_injecting' }); - // Set up the extension message bridge. - window.addEventListener('message', evt => { - if (evt.source !== window) + // Set up a bridge for connections, since Firefox + // doesn't support externally_connectable. + const connections = new Map; + + function handleConnect(id) { + if ( connections.has(id) ) return; - if (evt.data && evt.data.type === 'ffz_to_ext') - browser.runtime.sendMessage(evt.data.data, resp => { - if (resp?.type === 'ffz_to_page') - window.postMessage(resp.data, '*'); + const port = browser.runtime.connect(); + connections.set(id, port); + + port.onMessage.addListener(msg => { + window.postMessage({ + type: 'ffz-con-message', + id, + payload: msg + }) + }); + + port.onDisconnect.addListener(() => { + connections.delete(id); + window.postMessage({ + type: 'ffz-con-disconnect', + id }); + }); + } + + function handleDisconnect(id) { + const port = connections.get(id); + if ( port ) { + connections.delete(id); + port.disconnect(); + } + } + + function handleMessage(id, payload) { + const port = connection.get(id); + if ( port ) { + port.postMessage(payload); + } + } + + window.addEventListener('message', evt => { + if (evt.source !== window || ! evt.data ) + return; + + const { type, id, payload } = evt.data; + + if ( type === 'ffz-con-connect' ) + handleConnect(id); + + else if ( type === 'ffz-con-message' ) + handleMessage(id, payload); + + else if ( type === 'ffz-con-disconnect' ) + handleDisconnect(id); }); + + // Let the extension send messages to the page directly. browser.runtime.onMessage.addListener((msg, sender) => { if (msg?.type === 'ffz_to_page') window.postMessage(msg.data, '*'); diff --git a/src/experiments.json b/src/experiments.json index 248eb9e8..45ef6607 100644 --- a/src/experiments.json +++ b/src/experiments.json @@ -1,5 +1,6 @@ { "api_load": { + "in_use": false, "name": "New API Stress Testing", "description": "Send duplicate requests to the new API server for load testing.", "groups": [ @@ -8,25 +9,27 @@ ] }, "api_links": { - "name": "API-Based Link Lookups", - "description": "Use the new API to look up links instead of the socket cluster.", + "in_use": false, "groups": [ - {"value": true, "weight": 0}, - {"value": "cf", "weight": 100}, - {"value": false, "weight": 0} + {"value": false, "weight": 100} ] }, "emqx_pubsub": { - "name": "EMQX MQTT-Based PubSub", - "description": "An experimental pubsub system running on an EMQX cluster, to see how that performs.", + "in_use": false, + "groups": [ + {"value": false, "weight": 100} + ] + }, + "cf_pubsub": { + "in_use": false, "groups": [ {"value": true, "weight": 0}, {"value": false, "weight": 100} ] }, - "cf_pubsub": { - "name": "CF MQTT-Based PubSub", - "description": "An experimental new pubsub system that should be more reliable than the existing socket cluster.", + "worker_pubsub": { + "name": "Worker PubSub", + "description": "Whether or not to connect to the new Cloudflare Worker based PubSub system.", "groups": [ {"value": true, "weight": 0}, {"value": false, "weight": 100} diff --git a/src/load_tracker.ts b/src/load_tracker.ts index 1edd73ed..430435be 100644 --- a/src/load_tracker.ts +++ b/src/load_tracker.ts @@ -93,8 +93,6 @@ export default class LoadTracker extends Module<'load_tracker', LoadEvents> { /** @internal */ onEnable() { - this.emit('load_tracker:schedule', 'test', 'fish'); - this.on(':schedule', this.schedule); } diff --git a/src/modules/chat/emotes.js b/src/modules/chat/emotes.js index 8056bf86..2e482459 100644 --- a/src/modules/chat/emotes.js +++ b/src/modules/chat/emotes.js @@ -7,7 +7,7 @@ import Module, { buildAddonProxy } from 'utilities/module'; import {ManagedStyle} from 'utilities/dom'; -import {get, has, timeout, SourcedSet, make_enum_flags, makeAddonIdChecker} from 'utilities/object'; +import {get, has, timeout, SourcedSet, make_enum_flags, makeAddonIdChecker, deep_copy} from 'utilities/object'; import {NEW_API, IS_OSX, EmoteTypes, TWITCH_GLOBAL_SETS, TWITCH_POINTS_SETS, TWITCH_PRIME_SETS, DEBUG} from 'utilities/constants'; import GET_EMOTE from './emote_info.gql'; @@ -463,16 +463,31 @@ export default class Emotes extends Module { this.providers = new Map; - this.providers.set('featured', { - name: 'Featured', - i18n_key: 'emote-menu.featured', + this.setProvider('ffz', { + name: 'FrankerFaceZ', + font_icon: 'ffz-i-zreknarf', + //icon: 'https://cdn.frankerfacez.com/badge/4/4/solid' + }); + + /*this.providers.set('ffz-featured', { + menu_name: 'Featured', + menu_i18n_key: 'emote-menu.featured', sort_key: 75 - }) + });*/ this.emote_sets = {}; this._set_refs = {}; this._set_timers = {}; + this.settings.add('chat.emotes.source-priorities', { + default: null, + ui: { + path: 'Chat > Emote Priorities', + component: 'emote-priorities', + data: () => deep_copy(this.providers) + } + }); + this.settings.add('chat.emotes.enabled', { default: 2, ui: { @@ -631,9 +646,25 @@ export default class Emotes extends Module { return this.addFilter(filter); } + overrides.setProvider = (provider, data) => { + if ( is_dev && ! id_checker.test(provider) ) + module.log.warn('[DEV-CHECK] Call to emotes.setProvider did not include addon ID in provider:', provider); + + if ( data ) + data.__source = addon_id; + + return this.setProvider(provider, data); + } + overrides.addDefaultSet = (provider, set_id, data) => { if ( is_dev && ! id_checker.test(provider) ) - module.log.warn('[DEV-CHECK] Call to emotes.addDefaultSet did not include addon ID in provider:', provider); + module.log.warn('[DEV-CHECK] Call to emotes.addDefaultSet did not include addon ID in provider:', provider); + + if ( ! this.providers.has(provider) ) { + this.inferProvider(provider, addon_id); + if ( is_dev ) + module.log.warn('[DEV-CHECK] Call to emotes.addDefaultSet for provider that has not been registered with emotes.setProvider:', provider); + } if ( data ) { if ( is_dev && ! id_checker.test(set_id) ) @@ -647,7 +678,13 @@ export default class Emotes extends Module { overrides.addSubSet = (provider, set_id, data) => { if ( is_dev && ! id_checker.test(provider) ) - module.log.warn('[DEV-CHECK] Call to emotes.addSubSet did not include addon ID in provider:', provider); + module.log.warn('[DEV-CHECK] Call to emotes.addSubSet did not include addon ID in provider:', provider); + + if ( ! this.providers.has(provider) ) { + this.inferProvider(provider, addon_id); + if ( is_dev ) + module.log.warn('[DEV-CHECK] Call to emotes.addSubSet for provider that has not been registered with emotes.setProvider:', provider); + } if ( data ) { if ( is_dev && ! id_checker.test(set_id) ) @@ -713,6 +750,8 @@ export default class Emotes extends Module { // Generate the base filter CSS. this.base_effect_css = generateBaseFilterCss(); + this.parent.context.getChanges('chat.emotes.source-priorities', this.updatePriorities, this); + this.parent.context.on('changed:chat.effects.enable', this.updateEffects, this); for(const input of EFFECT_STYLES) if ( input.setting && ! Array.isArray(input.setting) ) @@ -807,6 +846,13 @@ export default class Emotes extends Module { } } + for(const [key, data] of this.providers.entries()) { + if ( data?.__source === addon_id ) { + removed++; + this.setProvider(key, null); + } + } + if ( removed ) { this.log.debug(`Cleaned up ${removed} entries when unloading addon:`, addon_id); // TODO: Debounced retokenize all chat messages. @@ -817,6 +863,62 @@ export default class Emotes extends Module { } + // ======================================================================== + // Providers + // ======================================================================== + + updatePriorities(priorities) { + const l = priorities?.length; + + if (! l || l <= 0) + this.sourceSortFn = null; + else + this.sourceSortFn = (first, second) => { + if (first.startsWith('ffz-')) + first = 'ffz'; + if (second.startsWith('ffz-')) + second = 'ffz'; + + let first_priority = priorities.indexOf(first), + second_priority = priorities.indexOf(second); + + if (first_priority === -1) first_priority = l; + if (second_priority === -1) second_priority = l; + + return first_priority - second_priority; + }; + + // Update all existing sourced sets now. + this.default_sets.setSortFunction(this.sourceSortFn); + + this.emit(':update-priorities', this.sourceSortFn); + } + + inferProvider(provider, addon_id) { + if ( this.providers.has(provider) ) + return; + + const data = this.resolve('addons')?.getAddon(addon_id); + if ( data ) + this.setProvider(provider, { + name: data.name, + i18n_key: data.name_i18n, + icon: data.icon, + description: provider, + __source: addon_id + }); + } + + setProvider(provider, data) { + if ( ! data ) + this.providers.delete(provider); + else { + data.id = provider; + this.providers.set(provider, data); + } + } + + // ======================================================================== // Emote Filtering // ======================================================================== @@ -1183,17 +1285,17 @@ export default class Emotes extends Module { emote_sets = room.emote_sets, providers = emote_sets && emote_sets._sources; - if ( providers && providers.has('featured') ) - for(const item of providers.get('featured')) { + if ( providers && providers.has('ffz-featured') ) + for(const item of providers.get('ffz-featured')) { const idx = new_sets.indexOf(item); if ( idx === -1 ) - room.removeSet('featured', item); + room.removeSet('ffz-featured', item); else new_sets.splice(idx, 1); } for(const set_id of new_sets) { - room.addSet('featured', set_id); + room.addSet('ffz-featured', set_id); if ( ! this.emote_sets[set_id] ) this.loadSet(set_id); diff --git a/src/modules/chat/index.js b/src/modules/chat/index.js index 1598645d..d12bccb8 100644 --- a/src/modules/chat/index.js +++ b/src/modules/chat/index.js @@ -135,17 +135,7 @@ export default class Chat extends Module { this.settings.add('debug.link-resolver.source', { process: (ctx, val) => { - if ( val == null ) { - const exp = this.experiments.getAssignment('api_links'); - if ( exp === 'cf' ) - val = 'test-cf'; - else if ( exp ) - val = 'test'; - else - val = 'socket'; - } - - return LINK_DATA_HOSTS[val] ?? LINK_DATA_HOSTS.test; + return LINK_DATA_HOSTS[val] ?? LINK_DATA_HOSTS.Production; }, default: null, @@ -1326,6 +1316,12 @@ export default class Chat extends Module { if ( is_dev && ! id_checker.test(provider) ) module.log.warn('[DEV-CHECK] Call to getUser().addSet() did not include addon ID in provider:', provider); + if ( ! this.manager.emotes.providers.has(provider) ) { + this.manager.emotes.inferProvider(provider, addon_id); + if ( is_dev ) + module.log.warn('[DEV-CHECK] Call to getUser().addSet() for provider that has not been registered with emotes.setProvider:', provider); + } + if ( data ) { if ( is_dev && ! id_checker.test(set_id) ) module.log.warn('[DEV-CHECK] Call to getUser().addSet() loaded set data but did not include addon ID in set ID:', set_id); @@ -1366,6 +1362,12 @@ export default class Chat extends Module { if ( is_dev && ! id_checker.test(provider) ) module.log.warn('[DEV-CHECK] Call to getRoom().addSet() did not include addon ID in provider:', provider); + if ( ! this.manager.emotes.providers.has(provider) ) { + this.manager.emotes.inferProvider(provider, addon_id); + if ( is_dev ) + module.log.warn('[DEV-CHECK] Call to getRoom().addSet() for provider that has not been registered with emotes.setProvider:', provider); + } + if ( data ) { if ( is_dev && ! id_checker.test(set_id) ) module.log.warn('[DEV-CHECK] Call to getRoom().addSet() loaded set data but did not include addon ID in set ID:', set_id); @@ -1524,6 +1526,12 @@ export default class Chat extends Module { this.settings.provider.on('changed', this.onProviderChange, this); this.on('site.subpump:pubsub-message', this.onPubSub, this); + this.on('chat.emotes:update-priorities', fn => { + for(const thing of this.iterateAllRoomsAndUsers()) { + if (thing.emote_sets) + thing.emote_sets.setSortFunction(fn); + } + }); if ( this.context.get('chat.filtering.need-colors') ) this.createColorCache().then(() => this.emit(':update-line-tokens')); diff --git a/src/modules/chat/room.js b/src/modules/chat/room.js index f15497e1..236f27fd 100644 --- a/src/modules/chat/room.js +++ b/src/modules/chat/room.js @@ -369,10 +369,10 @@ export default class Room { this.data = d; - this.removeAllSets('main'); + this.removeAllSets('ffz-main'); if ( d.set ) - this.addSet('main', d.set); + this.addSet('ffz-main', d.set); if ( data.sets ) for(const set_id in data.sets) @@ -408,7 +408,7 @@ export default class Room { return; if ( ! this.emote_sets ) - this.emote_sets = new SourcedSet; + this.emote_sets = new SourcedSet(false, this.manager.emotes.sourceSortFn); if ( typeof set_id === 'number' ) set_id = `${set_id}`; diff --git a/src/modules/chat/user.ts b/src/modules/chat/user.ts index 50cb962e..ea1cbc65 100644 --- a/src/modules/chat/user.ts +++ b/src/modules/chat/user.ts @@ -138,7 +138,7 @@ export default class User { data = {id: badge_id}; if ( ! this.badges ) - this.badges = new SourcedSet; + this.badges = new SourcedSet(false, this.manager.emotes.sourceSortFn); const existing = this.badges.get(provider); if ( existing ) diff --git a/src/modules/emote_card/index.jsx b/src/modules/emote_card/index.jsx index bc236f9f..86fb24fe 100644 --- a/src/modules/emote_card/index.jsx +++ b/src/modules/emote_card/index.jsx @@ -24,6 +24,8 @@ function getEmoteTypeFromTwitchType(type) { return EmoteTypes.LimitedTime; if ( type === 'BITS_BADGE_TIERS' ) return EmoteTypes.BitsTier; + if ( type === 'CHANNEL_POINTS' ) + return EmoteTypes.ChannelPoints; if ( type === 'TWO_FACTOR' ) return EmoteTypes.TwoFactor; if ( type === 'PRIME' ) diff --git a/src/modules/main_menu/components/emote-priorities.vue b/src/modules/main_menu/components/emote-priorities.vue new file mode 100644 index 00000000..58f78d3d --- /dev/null +++ b/src/modules/main_menu/components/emote-priorities.vue @@ -0,0 +1,159 @@ + + + diff --git a/src/modules/main_menu/components/experiments.vue b/src/modules/main_menu/components/experiments.vue index 24c34a03..9cda4183 100644 --- a/src/modules/main_menu/components/experiments.vue +++ b/src/modules/main_menu/components/experiments.vue @@ -60,7 +60,7 @@ >
-

{{ exp.name }}

+

{{ exp.name ? exp.name : key }}

{{ exp.description }}
@@ -359,8 +359,8 @@ export default { if ( a_r > b_r ) return 1; } - const a_n = a.exp.name.toLowerCase(), - b_n = b.exp.name.toLowerCase(); + const a_n = a.exp.name?.toLowerCase() ?? a.key?.toLowerCase(), + b_n = b.exp.name?.toLowerCase() ?? b.key?.toLowerCase(); if ( a_n < b_n ) return -1; if ( a_n > b_n ) return 1; diff --git a/src/pubsub/index.ts b/src/pubsub/index.ts index 8ca22c6d..4b6789f8 100644 --- a/src/pubsub/index.ts +++ b/src/pubsub/index.ts @@ -5,12 +5,10 @@ // ============================================================================ import Module, { GenericModule } from 'utilities/module'; -import { PUBSUB_CLUSTERS } from 'utilities/constants'; +import { PubSubClient } from './client'; import type ExperimentManager from '../experiments'; import type SettingsManager from '../settings'; -import type PubSubClient from 'utilities/pubsub'; import type { PubSubCommands } from 'utilities/types'; -import type { SettingUi_Select_Entry } from '../settings/types'; declare module 'utilities/types' { interface ModuleMap { @@ -20,10 +18,10 @@ declare module 'utilities/types' { pubsub: PubSubEvents; } interface SettingsTypeMap { - 'pubsub.use-cluster': keyof typeof PUBSUB_CLUSTERS | null; + 'pubsub.enabled': boolean; } interface ExperimentTypeMap { - cf_pubsub: boolean; + worker_pubsub: boolean; } } @@ -54,81 +52,47 @@ export default class PubSub extends Module<'pubsub', PubSubEvents> { _topics: Map>; _client: PubSubClient | null; - _mqtt?: typeof PubSubClient | null; - _mqtt_loader?: Promise | null; - constructor(name?: string, parent?: GenericModule) { super(name, parent); this.inject('settings'); this.inject('experiments'); - this.settings.add('pubsub.use-cluster', { - default: () => { - if ( this.experiments.getAssignment('emqx_pubsub') ) - return 'EMQXTest'; - if ( this.experiments.getAssignment('cf_pubsub') ) - return 'Staging'; - return null; - }, + this.settings.add('pubsub.enabled', { + default: () => this.experiments.getAssignment('worker_pubsub') ?? false, ui: { path: 'Debugging @{"expanded": false, "sort": 9999} > PubSub >> General', - title: 'Server Cluster', - description: 'Which server cluster to connect to. You can use this setting to disable PubSub if you want, but should otherwise leave this on the default value unless you know what you\'re doing.', + title: 'Enable PubSub.', + description: 'Whether or not you want your client to connect to FrankerFaceZ\'s PubSub system. This is still in testing and should be left alone unless you know what you\'re doing.', force_seen: true, - component: 'setting-select-box', - - data: [{ - value: null, - title: 'Disabled' - } as SettingUi_Select_Entry].concat(Object.keys(PUBSUB_CLUSTERS).map(x => ({ - value: x, - title: x - }))) + component: 'setting-check-box', }, - changed: () => this.reconnect() + changed: val => val ? this.connect() : this.disconnect() }); this._topics = new Map; this._client = null; } - loadPubSubClient() { - if ( this._mqtt ) - return Promise.resolve(this._mqtt); - - if ( ! this._mqtt_loader ) - this._mqtt_loader = import(/* webpackChunkName: 'pubsub' */ 'utilities/pubsub') - .then(thing => { - this._mqtt = thing.default; - return thing.default; - }) - .finally(() => this._mqtt_loader = null); - - return this._mqtt_loader; - } - onEnable() { - this.on('experiments:changed:cf_pubsub', this._updateSetting, this); - this.subscribe(null, 'global'); - this.connect(); + + this.on('experiments:changed:worker_pubsub', this._updateSetting, this); } onDisable() { + this.off('experiments:changed:worker_pubsub', this._updateSetting, this); + this.disconnect(); - this.unsubscribe(null, 'global'); - - this.off('experiments:changed:cf_pubsub', this._updateSetting, this); } _updateSetting() { - this.settings.update('pubsub.use-cluster'); + this.settings.update('pubsub.enabled'); } @@ -137,18 +101,7 @@ export default class PubSub extends Module<'pubsub', PubSubEvents> { // ======================================================================== get connected() { - return this._client?.connected ?? false; - } - - - get connecting() { - return this._client?.connecting ?? false; - } - - - get disconnected() { - // If this is null, we have no client, so we aren't connected. - return this._client?.disconnected ?? true; + return this._client != null; } @@ -162,47 +115,21 @@ export default class PubSub extends Module<'pubsub', PubSubEvents> { } async connect() { - - if ( this._client ) + // If there's already a client, or PubSub is disabled + // then we have nothing to do. + if ( this._client || ! this.settings.get('pubsub.enabled') ) return; - let cluster_id = this.settings.get('pubsub.use-cluster'); - if ( cluster_id === null ) - return; + // Create a new instance of the PubSubClient class. + const client = this._client = new PubSubClient(); - let cluster = PUBSUB_CLUSTERS[cluster_id]; - - // If we didn't get a valid cluster, use production. - if ( ! cluster?.length ) { - cluster_id = 'Production'; - cluster = PUBSUB_CLUSTERS.Production; - } - - this.log.info(`Using PubSub: ${cluster_id} (${cluster})`); - - const user = this.resolve('site')?.getUser?.(); - - // The client class handles everything for us. We only - // maintain a separate list of topics in case topics are - // subscribed when the client does not exist, or if the - // client needs to be recreated. - - const PubSubClient = await this.loadPubSubClient(); - - const client = this._client = new PubSubClient(cluster, { - logger: this.log.get('client'), - user: user?.id ? { - provider: 'twitch', - id: user.id - } : null + // Connect to the various events. + client.on('connect', () => { + this.log.info('Connected to PubSub.'); }); - client.on('connect', msg => { - this.log.info('Connected to PubSub.', msg); - }); - - client.on('disconnect', msg => { - this.log.info('Disconnected from PubSub.', msg); + client.on('disconnect', () => { + this.log.info('Disconnected from PubSub.'); }); client.on('error', err => { @@ -225,7 +152,7 @@ export default class PubSub extends Module<'pubsub', PubSubEvents> { this.emit(`:command:${data.cmd}` as PubSubCommandKey, data.data, data); }); - // Subscribe to topics. + // Subscribe to our topics. const topics = [...this._topics.keys()]; client.subscribe(topics); diff --git a/src/sites/shared/player.jsx b/src/sites/shared/player.jsx index bef61e65..06b3288f 100644 --- a/src/sites/shared/player.jsx +++ b/src/sites/shared/player.jsx @@ -93,6 +93,34 @@ export default class PlayerBase extends Module { } }); + this.settings.add('player.clip-button.hide-native', { + default: null, + requires: ['player.clip-button.custom'], + process: (ctx, val) => val ?? ctx.get('player.clip-button.custom'), + ui: { + path: 'Player > General >> Appearance', + component: 'setting-check-box', + title: 'Hide the native Clip button.', + description: 'By default, this is enabled when using the setting to add a custom Clip button.' + }, + + changed: val => this.css_tweaks.toggle('player-hide-native-clip', val) + }); + + this.settings.add('player.clip-button.custom', { + default: false, + ui: { + path: 'Player > General >> Appearance', + component: 'setting-check-box', + title: 'Display a custom Clip button.', + description: 'Add a custom Clip button to the player that better fits the style of the other buttons.' + }, + changed: () => { + for(const inst of this.Player.instances) + this.addClipButton(inst); + } + }); + this.settings.add('player.fade-pause-buffer', { default: false, ui: { @@ -600,6 +628,7 @@ export default class PlayerBase extends Module { await this.settings.provider.awaitReady(); this.css_tweaks.toggleHide('player-gain-volume', this.settings.get('player.gain.no-volume')); + this.css_tweaks.toggle('player-hide-native-clip', this.settings.get('player.clip-button.hide-native')); this.css_tweaks.toggle('player-volume', this.settings.get('player.volume-always-shown')); this.css_tweaks.toggle('player-ext-mouse', !this.settings.get('player.ext-interaction')); this.css_tweaks.toggle('player-hide-mouse', this.settings.get('player.hide-mouse')); @@ -1237,6 +1266,7 @@ export default class PlayerBase extends Module { this.skipContentWarnings(inst); this.addPiPButton(inst); this.addResetButton(inst); + this.addClipButton(inst); this.addCompressorButton(inst, false); this.addGainSlider(inst, false); this.addMetadata(inst); @@ -1982,6 +2012,81 @@ export default class PlayerBase extends Module { } + addClipButton(inst, tries = 0) { + const outer = inst.props.containerRef || this.fine.getChildNode(inst), + container = outer && outer.querySelector(RIGHT_CONTROLS), + has_clip_button = this.settings.get('player.clip-button.custom'); + + if ( ! container ) { + if ( ! has_clip_button ) + return; + + if ( tries < 5 ) + return setTimeout(this.addClipButton.bind(this, inst, (tries || 0) + 1), 250); + + return; // this.log.warn('Unable to find container element for Clip button.'); + } + + let tip, btn, cont = container.querySelector('.ffz--player-clip'); + if ( ! has_clip_button ) { + if ( cont ) + cont.remove(); + return; + } + + if ( ! cont ) { + // We need the native clip button, so we can dispatch a click. + const native_clip = container.querySelector('button[aria-label*="alt+x"]'); + if ( ! native_clip ) + return; + + const on_click = e => native_clip.click(); + + cont = (
+ {btn = ()} + {tip = (); + + const thing = container.querySelector('.ffz--player-reset button') || + container.querySelector('.ffz--player-pip button') || + container.querySelector('button[data-a-target="player-theatre-mode-button"]') || + //container.querySelector('div:not(:has(.tw-tooltip)) button:not([data-a-target])') || + container.querySelector('button[aria-label*="Theat"]') || + container.querySelector('button[data-a-target="player-fullscreen-button"]'); + + if ( thing ) { + container.insertBefore(cont, thing.parentElement); + } else + container.appendChild(cont); + + } else { + btn = cont.querySelector('button'); + tip = cont.querySelector('.ffz-il-tooltip'); + } + + btn.setAttribute('aria-label', + tip.textContent = this.i18n.t( + 'player.clip-button', + 'Clip (Alt+X)' + )); + } + + clickClip(inst, e) { + console.log('clicked clip', inst, e); + } + + addResetButton(inst, tries = 0) { const outer = inst.props.containerRef || this.fine.getChildNode(inst), container = outer && outer.querySelector(RIGHT_CONTROLS), diff --git a/src/sites/twitch-twilight/modules/chat/emote_menu.jsx b/src/sites/twitch-twilight/modules/chat/emote_menu.jsx index 41e55fce..2219296b 100644 --- a/src/sites/twitch-twilight/modules/chat/emote_menu.jsx +++ b/src/sites/twitch-twilight/modules/chat/emote_menu.jsx @@ -2483,13 +2483,13 @@ export default class EmoteMenu extends Module { const key = `${emote_set.merge_source || fav_key}-${emote_set.merge_id || emote_set.id}`, pdata = t.emotes.providers.get(provider), - source = pdata && pdata.name ? - (pdata.i18n_key ? - t.i18n.t(pdata.i18n_key, pdata.name, pdata) : - pdata.name) : + source = pdata && pdata.menu_name ? + (pdata.menu_i18n_key ? + t.i18n.t(pdata.menu_i18n_key, pdata.menu_name, pdata) : + pdata.menu_name) : emote_set.source || 'FFZ', - title = (provider === 'main' || emote_set.title_is_channel) + title = (provider === 'ffz-main' || emote_set.title_is_channel) ? source_name ? t.i18n.t('emote-menu.source-set', '{channel}\'s Emotes', {channel: source_name}) : t.i18n.t('emote-menu.main-set', 'Channel Emotes') diff --git a/src/sites/twitch-twilight/modules/chat/viewer_card.jsx b/src/sites/twitch-twilight/modules/chat/viewer_card.jsx index 724da3fb..7fc9d668 100644 --- a/src/sites/twitch-twilight/modules/chat/viewer_card.jsx +++ b/src/sites/twitch-twilight/modules/chat/viewer_card.jsx @@ -78,7 +78,9 @@ export default class ViewerCards extends Module { else color = 'rgba(128,170,255,0.2)'; - this.css_tweaks.set('viewer-card-highlight', `body .chat-room .chat-line__message:not(.chat-line--inline):nth-child(1n+0)[data-user="${login}"] { + this.css_tweaks.set('viewer-card-highlight', ` +body .chat-room .chat-scrollable-area__message-container > div:nth-child(1n+0) > .chat-line__message:not(.chat-line--inline):not(.something-nonexistent)[data-user="${login}"], +body .chat-room .chat-line__message:not(.chat-line--inline):nth-child(1n+0)[data-user="${login}"] { background-color: ${color} !important; }`); } else @@ -92,4 +94,4 @@ export default class ViewerCards extends Module { unmountCard() { this.updateStyle(); } -} \ No newline at end of file +} diff --git a/src/sites/twitch-twilight/modules/directory/index.jsx b/src/sites/twitch-twilight/modules/directory/index.jsx index b66641a2..48aee535 100644 --- a/src/sites/twitch-twilight/modules/directory/index.jsx +++ b/src/sites/twitch-twilight/modules/directory/index.jsx @@ -660,6 +660,9 @@ export default class Directory extends Module { if ( ! props?.channelLogin ) props = react.return?.return?.return?.memoizedProps; + if ( ! props?.channelLogin ) + props = react.return?.return?.return?.return?.return?.memoizedProps; + if ( ! props?.channelLogin ) return; diff --git a/src/socket.js b/src/socket.js index d91ec355..cf4f9c6b 100644 --- a/src/socket.js +++ b/src/socket.js @@ -126,12 +126,12 @@ export default class SocketClient extends Module { onEnable() { + // We don't connect anymore. // For now, stop connecting to the sockets for people using the // API links experiment. - if ( this.experiments.getAssignment('api_links') ) - return; - - this.connect(); + //if ( this.experiments.getAssignment('api_links') ) + // return; + //this.connect(); } onDisable() { this.disconnect() } diff --git a/src/utilities/constants.ts b/src/utilities/constants.ts index 91dc1565..f8819a97 100644 --- a/src/utilities/constants.ts +++ b/src/utilities/constants.ts @@ -279,10 +279,10 @@ export const WS_CLUSTERS = { export const LINK_DATA_HOSTS = { - socket: { + /*socket: { title: 'Socket Cluster (Deprecated)', value: 'special:socket' - }, + },*/ localhost: { title: 'Local Dev Server (Port 8002)', value: 'https://localhost:8002', @@ -292,17 +292,17 @@ export const LINK_DATA_HOSTS = { title: 'Local Dev Worker (Wrangler, Port 8787)', value: 'https://localhost:8787' }, - test: { + /*test: { title: 'API Test Server', value: 'https://api-test.frankerfacez.com/v2/link' }, 'test-cf': { title: 'Cloudflare Test Worker', value: 'https://link-service.workers.frankerfacez.com' - }, + },*/ Production: { title: 'Production', - value: 'https://api.frankerfacez.com/v2/link' + value: 'https://link-service.workers.frankerfacez.com' } }; diff --git a/src/utilities/module.ts b/src/utilities/module.ts index a6d71ee0..f333959d 100644 --- a/src/utilities/module.ts +++ b/src/utilities/module.ts @@ -225,6 +225,13 @@ export class Module< get addon_id() { return this.__addon_id } /** If this module is part of an add-on, the add-on's root module. */ get addon_root() { return this.__addon_root } + /** If this module is part of an add-on, the add-on's manifest. */ + get addon_manifest() { + if (this.__addon_id) + return this.resolve('addons')?.getAddon(this.__addon_id!) ?? + (this.__addon_root?.constructor as typeof Addon)?.info ?? + undefined; + } /** A Logger instance for this module. */ get log(): Logger { diff --git a/src/utilities/object.ts b/src/utilities/object.ts index 6e3dfa4c..323692b0 100644 --- a/src/utilities/object.ts +++ b/src/utilities/object.ts @@ -1121,6 +1121,8 @@ export function generateHex(length: number = 40) { } +export type StringSortFn = (first: string, second: string) => number; + /** * SourcedSet is similar to a Set, except that entries in the set are kept * track of based upon a `source`. This allows entries from a specific source @@ -1150,6 +1152,9 @@ export class SourcedSet { _cache: Set | T[]; private _sources?: Map; + private _sorted_sources?: T[][] | null; + + private _sourceSortFn?: StringSortFn | null; /** * Create a new SourcedSet. @@ -1159,9 +1164,52 @@ export class SourcedSet { * * @param T The type of object to hold. */ - constructor(use_set = false) { + constructor(use_set = false, source_sorter?: StringSortFn) { this._use_set = use_set; this._cache = use_set ? new Set : []; + this._sourceSortFn = source_sorter; + } + + + resortSources() { + this._sorted_sources = null; + + // We don't need to rebuild if we have less than 2 sources + // since sorting wouldn't change anything. + if (this._sources && this._sources.size > 1) + this._rebuild(); + } + + + setSortFunction(fn: StringSortFn | null) { + // If the method isn't changing, then we don't need + // to do anything. + if (this._sourceSortFn === fn) + return; + + // Set the new sort function. + this._sourceSortFn = fn; + + // And re-sort. + this.resortSources(); + } + + + _sortSources() { + if (!this._sources || this._sources.size === 0) { + this._sorted_sources = []; + return; + } + + if (this._sources.size === 1 || !this._sourceSortFn) { + this._sorted_sources = [...this._sources.values()]; + return; + } + + const sources = [...this._sources?.entries()]; + sources.sort((a, b) => this._sourceSortFn!(a[0], b[0])); + + this._sorted_sources = sources.map(entry => entry[1]); } /** @@ -1174,7 +1222,11 @@ export class SourcedSet { const use_set = this._use_set, cache = this._cache = use_set ? new Set : []; - for(const items of this._sources.values()) + + if (!this._sorted_sources) + this._sortSources(); + + for(const items of this._sorted_sources!) for(const i of items) if ( use_set ) (cache as Set).add(i); @@ -1210,6 +1262,7 @@ export class SourcedSet { delete(source: string) { if ( this._sources && this._sources.has(source) ) { this._sources.delete(source); + this._sorted_sources = null; this._rebuild(); } } @@ -1224,7 +1277,19 @@ export class SourcedSet { items = [...existing, ...items]; this._sources.set(source, items); - if ( existing ) + + // If we have a sort function and more than one source, then + // we need to do a rebuild when making modifications. + const need_sorting = this._sourceSortFn != null && this._sources.size > 1; + + // If this is a new source, we need to clear the cache for + // future rebuilds to include this. + if ( ! existing ) + this._sorted_sources = null; + + // If we need sorting, do a rebuild, otherwise go ahead + // and add the items normally. + if ( need_sorting ) this._rebuild(); else { const use_set = this._use_set, @@ -1256,7 +1321,20 @@ export class SourcedSet { const existing = this._sources.has(source); this._sources.set(source, items); - if ( existing ) + + // If we have a sort function and more than one source, then + // we need to do a rebuild when making modifications. + const need_sorting = this._sourceSortFn != null && this._sources.size > 1; + + // If this is a new source, we need to clear the cache for + // future rebuilds to include this. + if ( ! existing ) + this._sorted_sources = null; + + // If we need sorting, or if we replaced an existing source, + // then we need a rebuild. Otherwise, go ahead and add the + // items normally. + if ( existing || need_sorting ) this._rebuild(); else { const use_set = this._use_set, @@ -1284,7 +1362,14 @@ export class SourcedSet { return; existing.push(item); - if ( this._use_set ) + + // If we have a sort function and more than one source, then + // we need to do a rebuild when making modifications. + const need_sorting = this._sourceSortFn != null && this._sources.size > 1; + + if ( need_sorting ) + this._rebuild(); + else if ( this._use_set ) (this._cache as Set).add(item); else if ( ! (this._cache as T[]).includes(item) ) (this._cache as T[]).push(item); @@ -1302,6 +1387,8 @@ export class SourcedSet { return; (existing as T[]).splice(idx, 1); + + // Removal always requires a rebuild. this._rebuild(); } } diff --git a/src/utilities/pubsub.js b/src/utilities/pubsub.js deleted file mode 100644 index 89cab845..00000000 --- a/src/utilities/pubsub.js +++ /dev/null @@ -1,724 +0,0 @@ -import { EventEmitter } from "./events"; - -import { MqttClient, DISCONNECT } from './custom_denoflare_mqtt'; // "denoflare-mqtt"; -import { b64ToArrayBuffer, debounce, importRsaKey, make_enum, sleep } from "./object"; -import { EMQX_SERVERS } from "./constants"; - -// Only match 1-4 digit numbers, to avoid matching twitch IDs. -// 9999 gives us millions of clients on a topic, so we're never -// going to have more subtopics than 4 digits. -const SUBTOPIC_MATCHER = /\/(?:s(\d+)|(\d{1,4}))$/; - - -MqttClient.prototype.reschedulePing = function reschedulePing() { - this.clearPing(); - let delay = this.keepAliveSeconds; - if ( this.keepAliveOverride > 0 ) - delay = Math.min(delay, this.keepAliveOverride); - - this.pingTimeout = setTimeout(async () => { - try { - await this.ping(); - } catch(err) { /* no-op */ } - this.reschedulePing(); - }, delay * 1000); -} - - -export const State = make_enum( - 'Disconnected', - 'Connecting', - 'Connected' -); - - -export default class PubSubClient extends EventEmitter { - - constructor(server, options = {}) { - super(); - - this.server = server; - this.user = options?.user; - this.logger = options?.log ?? options?.logger; - - this._should_connect = false; - this._state = State.Disconnected; - - // Topics is a map of topics to sub-topic IDs. - this._topics = new Map; - - // Incorrect Topics is a map of every incorrect topic mapping - // we are currently subscribed to, for the purpose of unsubscribing - // from them. - this._incorrect_topics = new Map; - - // Live Topics is a set of every topic we have sent subscribe - // packets to the server for. - this._live_topics = new Set; - - // Active Topics is a set of every topic we SHOULD be subscribed to. - this._active_topics = new Set; - - // Pending Topics is a set of topics that we should be subscribed to - // but that we don't yet have a sub-topic assignment. - this._pending_topics = new Set; - - // Debounce a few things. - this.scheduleHeartbeat = this.scheduleHeartbeat.bind(this); - this._sendHeartbeat = this._sendHeartbeat.bind(this); - this.scheduleResub = this.scheduleResub.bind(this); - this._sendResub = this._sendResub.bind(this); - - this._fetchNewTopics = this._fetchNewTopics.bind(this); - this._sendSubscribes = debounce(this._sendSubscribes, 250); - this._sendUnsubscribes = debounce(this._sendUnsubscribes, 250); - } - - // ======================================================================== - // Properties - // ======================================================================== - - get id() { return this._data?.client_id ?? null } - - get topics() { return [...this._active_topics] } - - get disconnected() { - return this._state === State.Disconnected; - }; - - get connecting() { - return this._state === State.Connecting; - } - - get connected() { - return this._state === State.Connected; - } - - - // ======================================================================== - // Data Loading - // ======================================================================== - - loadData(force = false) { - // If we have all the data we need, don't do anything. - if ( ! force && this._data && ! this._pending_topics.size ) - return Promise.resolve(this._data); - - if ( ! this._data_loader ) - this._data_loader = this._loadData() - .finally(() => this._data_loader = null); - - return this._data_loader; - } - - async _loadData() { - let response, data; - - if ( this.server === 'emqx-test' ) { - // Hard-coded data. - const server = EMQX_SERVERS[Math.floor(Math.random() * EMQX_SERVERS.length)]; - - data = { - require_signing: false, - endpoint: `wss://${server}/mqtt`, - - username: 'anonymous', - password: `eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhbm9ueW1vdXMifQ.5DZP1bScMz4-MV_jGZveUKq4pFy9x_PJF9gSzAvj-wA`, - topics: Object.fromEntries(this.topics.map(x => [x, 0])) - } - - } else { - try { - // TODO: Send removed topics. - response = await fetch(this.server, { - method: 'POST', - headers: { - 'Content-Type': 'application/json' - }, - body: JSON.stringify({ - id: this.id, - user: this.user ?? null, - topics: this.topics - }) - }); - - if ( response.ok ) - data = await response.json(); - - } catch(err) { - throw new Error( - 'Unable to load PubSub data from server.', - { - cause: err - } - ); - } - } - - if ( ! data?.endpoint ) - throw new Error('Received invalid PubSub data from server.'); - - // If there's a signing key, parse it. - if ( data.public_key ) - try { - data.public_key = await importRsaKey(data.public_key); - } catch(err) { - throw new Error('Received invalid public key from server.', { - cause: err - }); - } - - else - data.public_key = null; - - if ( data.require_signing && ! data.public_key ) - throw new Error('Server requires signing but did not provide public key.'); - - // If we already had a password, preserve it. - if ( this._data?.password && ! data.password ) - data.password = this._data.password; - - // Record all the topic mappings we just got. - // TODO: Check for subtopic mismatches. - // TODO: Check for removed subtopic assignments. - if ( data.topics ) { - const removed = new Set(this._topics.keys()); - - for(const [key, val] of Object.entries(data.topics)) { - removed.delete(key); - const existing = this._topics.get(key); - if ( existing != null && existing != val ) - this._incorrect_topics.set(key, existing); - this._topics.set(key, val); - } - - for(const key of removed) { - const existing = this._topics.get(key); - this._topics.delete(key); - this._incorrect_topics.set(key, existing); - } - - // If we have a mismatch, handle it. - if ( this._incorrect_topics.size ) - this._sendUnsubscribes() - .then(() => this._sendSubscribes()); - } - - // Update the heartbeat timer. - this.scheduleHeartbeat(); - - this._data = data; - return data; - } - - async _fetchNewTopics(attempts = 0) { - this._fetch_timer = null; - let needs_fetch = false; - for(const topic of [...this._pending_topics]) { - if ( ! this._topics.has(topic) ) - needs_fetch = true; - } - - if ( needs_fetch ) - try { - await this.loadData(); - } catch(err) { - if ( attempts > 10 ) { - this._fetch_timer = null; - throw err; - } - - let delay = (attempts + 1) * (Math.floor(Math.random() * 10) + 2) * 1000; - if ( delay > 60000 ) - delay = (Math.floor(Math.random() * 60) + 30) * 1000; - - return sleep(delay).then(() => this._fetchNewTopics(attempts + 1)); - } - - if ( this._client ) - this._sendSubscribes(); - } - - - // ======================================================================== - // Connecting - // ======================================================================== - - connect() { - return this._connect(); - } - - async _connect(attempts = 0) { - if ( this._state === State.Connected ) - return; - - this._state = State.Connecting; - - let data; - try { - data = await this.loadData(); - } catch(err) { - if ( attempts > 10 ) { - this._state = State.Disconnected; - throw err; - } - - let delay = (attempts + 1) * (Math.floor(Math.random() * 10) + 2) * 1000; - if ( delay > 60000 ) - delay = (Math.floor(Math.random() * 60) + 30) * 1000; - - return sleep(delay).then(() => this._connect(attempts + 1)); - } - - if ( this.logger ) - this.logger.debug('Received Configuration', data); - - // We have our configuration. Now, create our client. - this._should_connect = true; - this._createClient(data); - - // Set up a heartbeat to keep us alive. - this.scheduleHeartbeat(); - } - - disconnect() { - this._should_connect = false; - this._destroyClient(); - this._state = State.Disconnected; - - this.clearHeartbeat(); - - // Reset all our state except active topics. - this._data = null; - this._live_topics.clear(); - this._topics.clear(); - this._pending_topics.clear(); - - for(const topic of this._active_topics) - this._pending_topics.add(topic); - } - - subscribe(topic) { - if ( Array.isArray(topic) ) { - for(const item of topic) - this.subscribe(item); - return; - } - - // If this is already an active topic, there's nothing - // else to do. - if ( this._active_topics.has(topic) ) - return; - - this._active_topics.add(topic); - - // If we don't have a sub-topic mapping, then we need to - // request a new one. Mark this topic as pending. - if ( ! this._topics.has(topic) ) - this._pending_topics.add(topic); - - // If we have a client, and we have pending topics, and there - // isn't a pending fetch, then schedule a fetch. - if ( this._client && this._pending_topics.size && ! this._fetch_timer ) - this._fetch_timer = setTimeout(this._fetchNewTopics, 5000); - - // Finally, if we have a client, send out a subscribe packet. - // This method is debounced by 250ms. - if ( this._client ) - this._sendSubscribes(); - } - - unsubscribe(topic) { - if ( Array.isArray(topic) ) { - for(const item of topic) - this.unsubscribe(item); - return; - } - - // If this topic isn't an active topic, we have nothing to do. - if ( ! this._active_topics.has(topic) ) - return; - - // Remove the topic from the active and pending topics. Don't - // remove it from the topic map though, since our client is - // still live. - this._active_topics.delete(topic); - this._pending_topics.delete(topic); - - if ( this._client ) - this._sendUnsubscribes(); - } - - // ======================================================================== - // Keep Alives - // ======================================================================== - - clearHeartbeat() { - if ( this._heartbeat ) - clearTimeout(this._heartbeat); - } - - scheduleHeartbeat() { - if ( this._heartbeat ) - clearTimeout(this._heartbeat); - this._heartbeat = setTimeout(this._sendHeartbeat, 5 * 60 * 1000); - } - - _sendHeartbeat() { - if ( ! this._data?.client_id ) - return this.scheduleHeartbeat(); - - this.loadData(true) - .finally(this.scheduleHeartbeat); - } - - - clearResubTimer() { - if ( this._resub_timer ) { - clearTimeout(this._resub_timer); - this._resub_timer = null; - } - } - - scheduleResub() { - if ( this._resub_timer ) - clearTimeout(this._resub_timer); - - // Send a resubscription every 30 minutes. - this._resub_timer = setTimeout(this._sendResub, 30 * 60 * 1000); - } - - _sendResub() { - this._resendSubscribes() - .finally(this.scheduleResub); - } - - - // ======================================================================== - // Client Management - // ======================================================================== - - _destroyClient() { - if ( ! this._client ) - return; - - this.clearResubTimer(); - - try { - this._client.disconnect().catch(() => {}); - } catch(err) { /* no-op */ } - this._client = null; - - this._live_topics.clear(); - } - - _createClient(data) { - // If there is an existing client, destroy it first. - if ( this._client ) - this._destroyClient(); - - // Now, create a new instance of our client. - // This requires a parsed URL because the dumb client doesn't - // take URLs like every other client ever. - const url = new URL(data.endpoint); - - this._live_topics.clear(); - this._state = State.Connecting; - - const client = this._client = new MqttClient({ - hostname: url.hostname, - port: url.port ?? undefined, - pathname: url.pathname ?? undefined, - protocol: 'wss', - maxMessagesPerSecond: 10 - }); - - let disconnected = false; - - this._client.onMqttMessage = message => { - if ( message.type === DISCONNECT ) { - if ( disconnected ) - return; - - disconnected = true; - this.emit('disconnect', message); - this._destroyClient(); - - if ( this._should_connect ) - this._createClient(data); - } - } - - this._client.onReceive = async message => { - // Get the topic, and remove the subtopic from it. - let topic = message.topic; - const match = SUBTOPIC_MATCHER.exec(topic); - if ( match ) - topic = topic.slice(0, match.index); - - if ( ! this._active_topics.has(topic) ) { - if ( this.logger ) - this.logger.debug('Received message for unsubscribed topic:', topic); - return; - } - - let payload = message.payload; - if ( payload instanceof Uint8Array ) - payload = new TextDecoder().decode(payload); - - let msg; - try { - msg = JSON.parse(payload); - } catch(err) { - if ( this.logger ) - this.logger.warn(`Error decoding PubSub message on topic "${topic}":`, err); - return; - } - - if ( data.require_signing ) { - let valid = false; - const sig = msg.sig; - delete msg.sig; - - if ( sig ) - try { - const encoded = new TextEncoder().encode(JSON.stringify(msg)); - - valid = await crypto.subtle.verify( - { - name: "RSA-PSS", - saltLength: 32 - }, - data.public_key, - b64ToArrayBuffer(sig), - encoded - ); - - } catch(err) { - if ( this.logger ) - this.logger.warn('Error attempting to verify signature for message.', err); - return; - } - - if ( ! valid ) { - msg.sig = sig; - if ( this.logger ) - this.logger.debug(`Received message on topic "${topic}" that failed signature verification:`, msg); - return; - } - } - - this.emit('message', { topic, data: msg }); - } - - // We want to send a keep-alive every 60 seconds, despite - // requesting a keepAlive of 120 to the server. We do this - // because of how background tabs are throttled by browsers. - client.keepAliveOverride = 60; - - return this._client.connect({ - clientId: data.client_id, - username: data.username, - password: data.password, - keepAlive: 120, - clean: true - }).then(msg => { - this._conn_failures = 0; - this._state = State.Connected; - this.emit('connect', msg); - - // Periodically re-send our subscriptions. This - // is done because the broker seems to be forgetful - // for long-lived connections. - this.scheduleResub(); - - // Reconnect when this connection ends. - if ( client.connectionCompletion ) - client.connectionCompletion.finally(() => { - if ( disconnected ) - return; - - disconnected = true; - this.emit('disconnect', null); - this._destroyClient(); - - if ( this._should_connect ) - this._createClient(data); - }); - - return this._sendSubscribes() - }).catch(err => { - if ( this.logger ) - this.logger.debug('Error connecting to MQTT.', err); - - disconnected = true; - this.emit('disconnect', null); - - this._destroyClient(); - - if ( ! this._should_connect ) - return; - - this._conn_failures = (this._conn_failures || 0) + 1; - let delay = (this._conn_failures * Math.floor(Math.random() * 10) + 2) * 1000; - if ( delay > 60000 ) - delay = (Math.floor(Math.random() * 60) + 30) * 1000; - - if ( delay <= 2000 ) - delay = 2000; - - return sleep(delay).then(() => { - if ( this._should_connect && ! this._client ) - this._createClient(data); - }) - }); - } - - _resendSubscribes() { - if ( ! this._client ) - return Promise.resolve(); - - const topics = [], - batch = []; - - for(const topic of this._live_topics) { - const subtopic = this._topics.get(topic); - if ( subtopic != null ) { - if ( subtopic === 0 ) - topics.push(topic); - else - topics.push(`${topic}/s${subtopic}`); - - batch.push(topic); - } - } - - if ( ! topics.length ) - return Promise.resolve(); - - return this._client.subscribe({topicFilter: topics}) - .catch(() => { - // If there was an error, we did NOT subscribe. - for(const topic of batch) - this._live_topics.delete(topic); - - if ( this._live_topics.size != this._active_topics.size ) - return sleep(2000).then(() => this._sendSubscribes()); - }); - } - - _sendSubscribes() { - if ( ! this._client ) - return Promise.resolve(); - - const topics = [], - batch = []; - - for(const topic of this._active_topics) { - if ( this._live_topics.has(topic) ) - continue; - - const subtopic = this._topics.get(topic); - if ( subtopic != null ) { - // Make sure this topic isn't considered pending. - this._pending_topics.delete(topic); - - if ( subtopic === 0 ) - topics.push(topic); - else - topics.push(`${topic}/s${subtopic}`); - - // Make a note, we're subscribing to this topic. - this._live_topics.add(topic); - batch.push(topic); - } - } - - if ( ! topics.length ) - return Promise.resolve(); - - return this._client.subscribe({topicFilter: topics }) - .then(() => { - // Success. Reset the subscribe failures count. - this._sub_failures = 0; - }) - .catch(msg => { - // TODO: Check the reason why. - if ( this.logger ) - this.logger.debug('Subscribe failure for topics:', batch.join(', '), ' reason:', msg); - - // If there was an error, we did NOT subscribe. - for(const topic of batch) - this._live_topics.delete(topic); - - // See if we have more work. - if ( this._live_topics.size >= this._active_topics.size ) - return; - - // Call sendSubscribes again after a bit. - this._sub_failures = (this._sub_failures || 0) + 1; - - let delay = (this._sub_failures * Math.floor(Math.random() * 10) + 2) * 1000; - if ( delay > 60000 ) - delay = (Math.floor(Math.random() * 60) + 30) * 1000; - - if ( delay <= 2000 ) - delay = 2000; - - return sleep(delay).then(() => this._sendSubscribes()); - }); - } - - _sendUnsubscribes() { - if ( ! this._client ) - return Promise.resolve(); - - const topics = [], - batch = new Set; - - // iterate over a copy to support removal - for(const topic of [...this._live_topics]) { - if ( this._active_topics.has(topic) ) - continue; - - // Should never be null, but be safe. - const subtopic = this._topics.get(topic); - if ( subtopic == null ) - continue; - - let real_topic; - if ( subtopic === 0 ) - real_topic = topic; - else - real_topic = `${topic}/s${subtopic}`; - - topics.push(real_topic); - batch.add(topic); - this._live_topics.delete(topic); - } - - // handle incorrect topics - for(const [topic, subtopic] of this._incorrect_topics) { - if ( batch.has(topic) ) - continue; - - batch.add(topic); - if ( subtopic === 0 ) - topics.push(topic); - else - topics.push(`${topic}/s${subtopic}`); - - this._live_topics.delete(topic); - } - - if ( ! topics.length ) - return Promise.resolve(); - - return this._client.unsubscribe({topicFilter: topics}) - .catch(error => { - if ( this.logger ) - this.logger.warn('Received error when unsubscribing from topics:', error); - }); - } - -}