mirror of
https://github.com/FrankerFaceZ/FrankerFaceZ.git
synced 2025-06-27 12:55:55 +00:00
4.76.0
* Added: Setting to control the priority of third-party emote providers. This lets you, as an example, prioritize emotes from 7TV over BetterTTV. Note that these priorities are still secondary to emote source priorities (personal emotes > channel emotes > global emotes). (Closes #1587) * Added: Setting to replace the native player 'Clip' button with a custom button that better fits the style of the other buttons. (Closes #1580) * Fixed: Highlighting messages from users with open viewer cards not working correctly with alternating background colors enabled. (Closes #1581) * Fixed: Various directory features not working correctly. (Closes #1588) * Fixed: Emotes unlocked with channel points appearing on FFZ emote cards with the source 'CHANNEL_POINTS'. * Fixed: The Experiments UI failing to load correctly if an experiment has no name. * Experiments: Preparing for a new PubSub experiment. * API Added: `emotes.setProvider(provider: string, data: {name: string, icon: string, description: string})` for registering an emote provider. This is used by the new setting to let users customize emote provider priorities. * API Added: All modules now have a `addon_manifest` property which will contain the add-on's manifest if the module belongs to an add-on.
This commit is contained in:
parent
6f8c20640f
commit
1d046fca2b
22 changed files with 622 additions and 894 deletions
|
@ -1,7 +1,7 @@
|
|||
{
|
||||
"name": "frankerfacez",
|
||||
"author": "Dan Salvato LLC",
|
||||
"version": "4.75.7",
|
||||
"version": "4.76.0",
|
||||
"description": "FrankerFaceZ is a Twitch enhancement suite.",
|
||||
"private": true,
|
||||
"license": "Apache-2.0",
|
||||
|
|
|
@ -346,7 +346,7 @@ export default class AddonManager extends Module<'addons'> {
|
|||
}
|
||||
|
||||
getAddon(id: string) {
|
||||
const addon = this.addons[id];
|
||||
const addon = this.addons[id] ?? null;
|
||||
return Array.isArray(addon) ? null : addon;
|
||||
}
|
||||
|
||||
|
|
|
@ -27,18 +27,67 @@
|
|||
type: 'ffz_injecting'
|
||||
});
|
||||
|
||||
// Set up the extension message bridge.
|
||||
window.addEventListener('message', evt => {
|
||||
if (evt.source !== window)
|
||||
// Set up a bridge for connections, since Firefox
|
||||
// doesn't support externally_connectable.
|
||||
const connections = new Map;
|
||||
|
||||
function handleConnect(id) {
|
||||
if ( connections.has(id) )
|
||||
return;
|
||||
|
||||
if (evt.data && evt.data.type === 'ffz_to_ext')
|
||||
browser.runtime.sendMessage(evt.data.data, resp => {
|
||||
if (resp?.type === 'ffz_to_page')
|
||||
window.postMessage(resp.data, '*');
|
||||
const port = browser.runtime.connect();
|
||||
connections.set(id, port);
|
||||
|
||||
port.onMessage.addListener(msg => {
|
||||
window.postMessage({
|
||||
type: 'ffz-con-message',
|
||||
id,
|
||||
payload: msg
|
||||
})
|
||||
});
|
||||
|
||||
port.onDisconnect.addListener(() => {
|
||||
connections.delete(id);
|
||||
window.postMessage({
|
||||
type: 'ffz-con-disconnect',
|
||||
id
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function handleDisconnect(id) {
|
||||
const port = connections.get(id);
|
||||
if ( port ) {
|
||||
connections.delete(id);
|
||||
port.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
function handleMessage(id, payload) {
|
||||
const port = connection.get(id);
|
||||
if ( port ) {
|
||||
port.postMessage(payload);
|
||||
}
|
||||
}
|
||||
|
||||
window.addEventListener('message', evt => {
|
||||
if (evt.source !== window || ! evt.data )
|
||||
return;
|
||||
|
||||
const { type, id, payload } = evt.data;
|
||||
|
||||
if ( type === 'ffz-con-connect' )
|
||||
handleConnect(id);
|
||||
|
||||
else if ( type === 'ffz-con-message' )
|
||||
handleMessage(id, payload);
|
||||
|
||||
else if ( type === 'ffz-con-disconnect' )
|
||||
handleDisconnect(id);
|
||||
});
|
||||
|
||||
|
||||
// Let the extension send messages to the page directly.
|
||||
browser.runtime.onMessage.addListener((msg, sender) => {
|
||||
if (msg?.type === 'ffz_to_page')
|
||||
window.postMessage(msg.data, '*');
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
{
|
||||
"api_load": {
|
||||
"in_use": false,
|
||||
"name": "New API Stress Testing",
|
||||
"description": "Send duplicate requests to the new API server for load testing.",
|
||||
"groups": [
|
||||
|
@ -8,25 +9,27 @@
|
|||
]
|
||||
},
|
||||
"api_links": {
|
||||
"name": "API-Based Link Lookups",
|
||||
"description": "Use the new API to look up links instead of the socket cluster.",
|
||||
"in_use": false,
|
||||
"groups": [
|
||||
{"value": true, "weight": 0},
|
||||
{"value": "cf", "weight": 100},
|
||||
{"value": false, "weight": 0}
|
||||
{"value": false, "weight": 100}
|
||||
]
|
||||
},
|
||||
"emqx_pubsub": {
|
||||
"name": "EMQX MQTT-Based PubSub",
|
||||
"description": "An experimental pubsub system running on an EMQX cluster, to see how that performs.",
|
||||
"in_use": false,
|
||||
"groups": [
|
||||
{"value": false, "weight": 100}
|
||||
]
|
||||
},
|
||||
"cf_pubsub": {
|
||||
"in_use": false,
|
||||
"groups": [
|
||||
{"value": true, "weight": 0},
|
||||
{"value": false, "weight": 100}
|
||||
]
|
||||
},
|
||||
"cf_pubsub": {
|
||||
"name": "CF MQTT-Based PubSub",
|
||||
"description": "An experimental new pubsub system that should be more reliable than the existing socket cluster.",
|
||||
"worker_pubsub": {
|
||||
"name": "Worker PubSub",
|
||||
"description": "Whether or not to connect to the new Cloudflare Worker based PubSub system.",
|
||||
"groups": [
|
||||
{"value": true, "weight": 0},
|
||||
{"value": false, "weight": 100}
|
||||
|
|
|
@ -93,8 +93,6 @@ export default class LoadTracker extends Module<'load_tracker', LoadEvents> {
|
|||
|
||||
/** @internal */
|
||||
onEnable() {
|
||||
this.emit('load_tracker:schedule', 'test', 'fish');
|
||||
|
||||
this.on(':schedule', this.schedule);
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
import Module, { buildAddonProxy } from 'utilities/module';
|
||||
import {ManagedStyle} from 'utilities/dom';
|
||||
|
||||
import {get, has, timeout, SourcedSet, make_enum_flags, makeAddonIdChecker} from 'utilities/object';
|
||||
import {get, has, timeout, SourcedSet, make_enum_flags, makeAddonIdChecker, deep_copy} from 'utilities/object';
|
||||
import {NEW_API, IS_OSX, EmoteTypes, TWITCH_GLOBAL_SETS, TWITCH_POINTS_SETS, TWITCH_PRIME_SETS, DEBUG} from 'utilities/constants';
|
||||
|
||||
import GET_EMOTE from './emote_info.gql';
|
||||
|
@ -463,16 +463,31 @@ export default class Emotes extends Module {
|
|||
|
||||
this.providers = new Map;
|
||||
|
||||
this.providers.set('featured', {
|
||||
name: 'Featured',
|
||||
i18n_key: 'emote-menu.featured',
|
||||
this.setProvider('ffz', {
|
||||
name: 'FrankerFaceZ',
|
||||
font_icon: 'ffz-i-zreknarf',
|
||||
//icon: 'https://cdn.frankerfacez.com/badge/4/4/solid'
|
||||
});
|
||||
|
||||
/*this.providers.set('ffz-featured', {
|
||||
menu_name: 'Featured',
|
||||
menu_i18n_key: 'emote-menu.featured',
|
||||
sort_key: 75
|
||||
})
|
||||
});*/
|
||||
|
||||
this.emote_sets = {};
|
||||
this._set_refs = {};
|
||||
this._set_timers = {};
|
||||
|
||||
this.settings.add('chat.emotes.source-priorities', {
|
||||
default: null,
|
||||
ui: {
|
||||
path: 'Chat > Emote Priorities',
|
||||
component: 'emote-priorities',
|
||||
data: () => deep_copy(this.providers)
|
||||
}
|
||||
});
|
||||
|
||||
this.settings.add('chat.emotes.enabled', {
|
||||
default: 2,
|
||||
ui: {
|
||||
|
@ -631,9 +646,25 @@ export default class Emotes extends Module {
|
|||
return this.addFilter(filter);
|
||||
}
|
||||
|
||||
overrides.setProvider = (provider, data) => {
|
||||
if ( is_dev && ! id_checker.test(provider) )
|
||||
module.log.warn('[DEV-CHECK] Call to emotes.setProvider did not include addon ID in provider:', provider);
|
||||
|
||||
if ( data )
|
||||
data.__source = addon_id;
|
||||
|
||||
return this.setProvider(provider, data);
|
||||
}
|
||||
|
||||
overrides.addDefaultSet = (provider, set_id, data) => {
|
||||
if ( is_dev && ! id_checker.test(provider) )
|
||||
module.log.warn('[DEV-CHECK] Call to emotes.addDefaultSet did not include addon ID in provider:', provider);
|
||||
module.log.warn('[DEV-CHECK] Call to emotes.addDefaultSet did not include addon ID in provider:', provider);
|
||||
|
||||
if ( ! this.providers.has(provider) ) {
|
||||
this.inferProvider(provider, addon_id);
|
||||
if ( is_dev )
|
||||
module.log.warn('[DEV-CHECK] Call to emotes.addDefaultSet for provider that has not been registered with emotes.setProvider:', provider);
|
||||
}
|
||||
|
||||
if ( data ) {
|
||||
if ( is_dev && ! id_checker.test(set_id) )
|
||||
|
@ -647,7 +678,13 @@ export default class Emotes extends Module {
|
|||
|
||||
overrides.addSubSet = (provider, set_id, data) => {
|
||||
if ( is_dev && ! id_checker.test(provider) )
|
||||
module.log.warn('[DEV-CHECK] Call to emotes.addSubSet did not include addon ID in provider:', provider);
|
||||
module.log.warn('[DEV-CHECK] Call to emotes.addSubSet did not include addon ID in provider:', provider);
|
||||
|
||||
if ( ! this.providers.has(provider) ) {
|
||||
this.inferProvider(provider, addon_id);
|
||||
if ( is_dev )
|
||||
module.log.warn('[DEV-CHECK] Call to emotes.addSubSet for provider that has not been registered with emotes.setProvider:', provider);
|
||||
}
|
||||
|
||||
if ( data ) {
|
||||
if ( is_dev && ! id_checker.test(set_id) )
|
||||
|
@ -713,6 +750,8 @@ export default class Emotes extends Module {
|
|||
// Generate the base filter CSS.
|
||||
this.base_effect_css = generateBaseFilterCss();
|
||||
|
||||
this.parent.context.getChanges('chat.emotes.source-priorities', this.updatePriorities, this);
|
||||
|
||||
this.parent.context.on('changed:chat.effects.enable', this.updateEffects, this);
|
||||
for(const input of EFFECT_STYLES)
|
||||
if ( input.setting && ! Array.isArray(input.setting) )
|
||||
|
@ -807,6 +846,13 @@ export default class Emotes extends Module {
|
|||
}
|
||||
}
|
||||
|
||||
for(const [key, data] of this.providers.entries()) {
|
||||
if ( data?.__source === addon_id ) {
|
||||
removed++;
|
||||
this.setProvider(key, null);
|
||||
}
|
||||
}
|
||||
|
||||
if ( removed ) {
|
||||
this.log.debug(`Cleaned up ${removed} entries when unloading addon:`, addon_id);
|
||||
// TODO: Debounced retokenize all chat messages.
|
||||
|
@ -817,6 +863,62 @@ export default class Emotes extends Module {
|
|||
}
|
||||
|
||||
|
||||
// ========================================================================
|
||||
// Providers
|
||||
// ========================================================================
|
||||
|
||||
updatePriorities(priorities) {
|
||||
const l = priorities?.length;
|
||||
|
||||
if (! l || l <= 0)
|
||||
this.sourceSortFn = null;
|
||||
else
|
||||
this.sourceSortFn = (first, second) => {
|
||||
if (first.startsWith('ffz-'))
|
||||
first = 'ffz';
|
||||
if (second.startsWith('ffz-'))
|
||||
second = 'ffz';
|
||||
|
||||
let first_priority = priorities.indexOf(first),
|
||||
second_priority = priorities.indexOf(second);
|
||||
|
||||
if (first_priority === -1) first_priority = l;
|
||||
if (second_priority === -1) second_priority = l;
|
||||
|
||||
return first_priority - second_priority;
|
||||
};
|
||||
|
||||
// Update all existing sourced sets now.
|
||||
this.default_sets.setSortFunction(this.sourceSortFn);
|
||||
|
||||
this.emit(':update-priorities', this.sourceSortFn);
|
||||
}
|
||||
|
||||
inferProvider(provider, addon_id) {
|
||||
if ( this.providers.has(provider) )
|
||||
return;
|
||||
|
||||
const data = this.resolve('addons')?.getAddon(addon_id);
|
||||
if ( data )
|
||||
this.setProvider(provider, {
|
||||
name: data.name,
|
||||
i18n_key: data.name_i18n,
|
||||
icon: data.icon,
|
||||
description: provider,
|
||||
__source: addon_id
|
||||
});
|
||||
}
|
||||
|
||||
setProvider(provider, data) {
|
||||
if ( ! data )
|
||||
this.providers.delete(provider);
|
||||
else {
|
||||
data.id = provider;
|
||||
this.providers.set(provider, data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ========================================================================
|
||||
// Emote Filtering
|
||||
// ========================================================================
|
||||
|
@ -1183,17 +1285,17 @@ export default class Emotes extends Module {
|
|||
emote_sets = room.emote_sets,
|
||||
providers = emote_sets && emote_sets._sources;
|
||||
|
||||
if ( providers && providers.has('featured') )
|
||||
for(const item of providers.get('featured')) {
|
||||
if ( providers && providers.has('ffz-featured') )
|
||||
for(const item of providers.get('ffz-featured')) {
|
||||
const idx = new_sets.indexOf(item);
|
||||
if ( idx === -1 )
|
||||
room.removeSet('featured', item);
|
||||
room.removeSet('ffz-featured', item);
|
||||
else
|
||||
new_sets.splice(idx, 1);
|
||||
}
|
||||
|
||||
for(const set_id of new_sets) {
|
||||
room.addSet('featured', set_id);
|
||||
room.addSet('ffz-featured', set_id);
|
||||
|
||||
if ( ! this.emote_sets[set_id] )
|
||||
this.loadSet(set_id);
|
||||
|
|
|
@ -135,17 +135,7 @@ export default class Chat extends Module {
|
|||
|
||||
this.settings.add('debug.link-resolver.source', {
|
||||
process: (ctx, val) => {
|
||||
if ( val == null ) {
|
||||
const exp = this.experiments.getAssignment('api_links');
|
||||
if ( exp === 'cf' )
|
||||
val = 'test-cf';
|
||||
else if ( exp )
|
||||
val = 'test';
|
||||
else
|
||||
val = 'socket';
|
||||
}
|
||||
|
||||
return LINK_DATA_HOSTS[val] ?? LINK_DATA_HOSTS.test;
|
||||
return LINK_DATA_HOSTS[val] ?? LINK_DATA_HOSTS.Production;
|
||||
},
|
||||
|
||||
default: null,
|
||||
|
@ -1326,6 +1316,12 @@ export default class Chat extends Module {
|
|||
if ( is_dev && ! id_checker.test(provider) )
|
||||
module.log.warn('[DEV-CHECK] Call to getUser().addSet() did not include addon ID in provider:', provider);
|
||||
|
||||
if ( ! this.manager.emotes.providers.has(provider) ) {
|
||||
this.manager.emotes.inferProvider(provider, addon_id);
|
||||
if ( is_dev )
|
||||
module.log.warn('[DEV-CHECK] Call to getUser().addSet() for provider that has not been registered with emotes.setProvider:', provider);
|
||||
}
|
||||
|
||||
if ( data ) {
|
||||
if ( is_dev && ! id_checker.test(set_id) )
|
||||
module.log.warn('[DEV-CHECK] Call to getUser().addSet() loaded set data but did not include addon ID in set ID:', set_id);
|
||||
|
@ -1366,6 +1362,12 @@ export default class Chat extends Module {
|
|||
if ( is_dev && ! id_checker.test(provider) )
|
||||
module.log.warn('[DEV-CHECK] Call to getRoom().addSet() did not include addon ID in provider:', provider);
|
||||
|
||||
if ( ! this.manager.emotes.providers.has(provider) ) {
|
||||
this.manager.emotes.inferProvider(provider, addon_id);
|
||||
if ( is_dev )
|
||||
module.log.warn('[DEV-CHECK] Call to getRoom().addSet() for provider that has not been registered with emotes.setProvider:', provider);
|
||||
}
|
||||
|
||||
if ( data ) {
|
||||
if ( is_dev && ! id_checker.test(set_id) )
|
||||
module.log.warn('[DEV-CHECK] Call to getRoom().addSet() loaded set data but did not include addon ID in set ID:', set_id);
|
||||
|
@ -1524,6 +1526,12 @@ export default class Chat extends Module {
|
|||
this.settings.provider.on('changed', this.onProviderChange, this);
|
||||
|
||||
this.on('site.subpump:pubsub-message', this.onPubSub, this);
|
||||
this.on('chat.emotes:update-priorities', fn => {
|
||||
for(const thing of this.iterateAllRoomsAndUsers()) {
|
||||
if (thing.emote_sets)
|
||||
thing.emote_sets.setSortFunction(fn);
|
||||
}
|
||||
});
|
||||
|
||||
if ( this.context.get('chat.filtering.need-colors') )
|
||||
this.createColorCache().then(() => this.emit(':update-line-tokens'));
|
||||
|
|
|
@ -369,10 +369,10 @@ export default class Room {
|
|||
|
||||
this.data = d;
|
||||
|
||||
this.removeAllSets('main');
|
||||
this.removeAllSets('ffz-main');
|
||||
|
||||
if ( d.set )
|
||||
this.addSet('main', d.set);
|
||||
this.addSet('ffz-main', d.set);
|
||||
|
||||
if ( data.sets )
|
||||
for(const set_id in data.sets)
|
||||
|
@ -408,7 +408,7 @@ export default class Room {
|
|||
return;
|
||||
|
||||
if ( ! this.emote_sets )
|
||||
this.emote_sets = new SourcedSet;
|
||||
this.emote_sets = new SourcedSet(false, this.manager.emotes.sourceSortFn);
|
||||
|
||||
if ( typeof set_id === 'number' )
|
||||
set_id = `${set_id}`;
|
||||
|
|
|
@ -138,7 +138,7 @@ export default class User {
|
|||
data = {id: badge_id};
|
||||
|
||||
if ( ! this.badges )
|
||||
this.badges = new SourcedSet;
|
||||
this.badges = new SourcedSet(false, this.manager.emotes.sourceSortFn);
|
||||
|
||||
const existing = this.badges.get(provider);
|
||||
if ( existing )
|
||||
|
|
|
@ -24,6 +24,8 @@ function getEmoteTypeFromTwitchType(type) {
|
|||
return EmoteTypes.LimitedTime;
|
||||
if ( type === 'BITS_BADGE_TIERS' )
|
||||
return EmoteTypes.BitsTier;
|
||||
if ( type === 'CHANNEL_POINTS' )
|
||||
return EmoteTypes.ChannelPoints;
|
||||
if ( type === 'TWO_FACTOR' )
|
||||
return EmoteTypes.TwoFactor;
|
||||
if ( type === 'PRIME' )
|
||||
|
|
159
src/modules/main_menu/components/emote-priorities.vue
Normal file
159
src/modules/main_menu/components/emote-priorities.vue
Normal file
|
@ -0,0 +1,159 @@
|
|||
<template lang="html">
|
||||
<div class="ffz--emote-priorities tw-border-t tw-pd-y-1">
|
||||
<div
|
||||
v-if="source && source !== profile"
|
||||
class="tw-c-background-accent tw-c-text-overlay tw-pd-1 tw-mg-b-1"
|
||||
>
|
||||
<span class="ffz-i-info" />
|
||||
{{ t('setting.warn-inheritence', 'These values are being overridden by another profile and may not take effect.') }}
|
||||
</div>
|
||||
|
||||
<div class="tw-border-b tw-mg-b-1 tw-pd-b-1">
|
||||
<p>{{ t('setting.priorities.about', 'Here, you can change the priorities of different emote providers. Please note that the provider priority (FFZ, etc.) is still secondary to the source priority (personal emotes > room emotes > global emotes).') }}</p>
|
||||
</div>
|
||||
|
||||
<div class="tw-flex tw-align-items-center tw-pd-b-05">
|
||||
<div class="tw-flex-grow-1">
|
||||
{{ t('setting.priorities.drag', 'Drag providers to change their priority.') }}
|
||||
</div>
|
||||
<button
|
||||
v-if="val.length"
|
||||
class="tw-mg-l-1 tw-button tw-button--text ffz-il-tooltip__container"
|
||||
@click="clear"
|
||||
>
|
||||
<span class="tw-button__text ffz-i-trash">
|
||||
{{ t('setting.delete', 'Delete') }}
|
||||
</span>
|
||||
<span class="ffz-il-tooltip ffz-il-tooltip--down ffz-il-tooltip--align-right">
|
||||
{{ t('setting.priorities.delete', "Delete the priorities in this settings profile.") }}
|
||||
</span>
|
||||
</button>
|
||||
</div>
|
||||
|
||||
<div ref="list" class="ffz--action-list">
|
||||
<div v-if="! val.length" class="tw-c-text-alt-2 tw-font-size-4 tw-align-center tw-c-text-alt-2 tw-pd-1">
|
||||
{{ t('setting.priorities.none', 'no priorities are defined in this profile') }}
|
||||
|
||||
<div class="tw-mg-t-1">
|
||||
<button
|
||||
class="tw-button tw-button--text"
|
||||
@click="addPriorities"
|
||||
>
|
||||
<span class="tw-button__text ffz-i-add">
|
||||
{{ t('setting.priorities.add', 'Add Priorities') }}
|
||||
</span>
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<section v-for="provider in val" :key="provider.id">
|
||||
<div class="ffz--action tw-elevation-1 tw-c-background-base tw-border tw-pd-y-05 tw-pd-r-1 tw-mg-y-05 tw-flex tw-flex-nowrap tw-align-items-center">
|
||||
<div class="tw-flex tw-flex-shrink-0 tw-align-items-start handle tw-pd-x-05 tw-pd-t-1 tw-pd-b-05">
|
||||
<span class="ffz-i-ellipsis-vert" />
|
||||
</div>
|
||||
<figure v-if="provider.font_icon" class="tw-font-size-2 tw-mg-r-1" :class="provider.font_icon" />
|
||||
<div v-else-if="provider.icon" class="tw-flex-shrink-0 ffz-card-img--size-4 tw-overflow-hidden tw-mg-r-1">
|
||||
<img
|
||||
:src="provider.icon"
|
||||
class="tw-image"
|
||||
/>
|
||||
</div>
|
||||
<div>
|
||||
<h4 v-if="! provider.name">
|
||||
{{ t('emote-source.unknown', 'Unknown ({id})', provider) }}
|
||||
</h4>
|
||||
<h4 v-else>{{
|
||||
provider.i18n_key
|
||||
? t(provider.i18n_key, provider.name, provider)
|
||||
: provider.name
|
||||
}}</h4>
|
||||
<div v-if="provider.description">
|
||||
{{ provider.desc_i18n_key ? t(provider.desc_i18n_key, provider.description, provider) : provider.description }}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</section>
|
||||
</div>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
|
||||
import settingMixin from '../setting-mixin';
|
||||
import Sortable from 'sortablejs';
|
||||
import { deep_copy } from 'utilities/object';
|
||||
|
||||
let last_id = 0;
|
||||
|
||||
export default {
|
||||
mixins: [settingMixin],
|
||||
props: ['item', 'context'],
|
||||
|
||||
data() {
|
||||
return {
|
||||
|
||||
}
|
||||
},
|
||||
|
||||
computed: {
|
||||
val() {
|
||||
if ( ! this.has_value )
|
||||
return [];
|
||||
|
||||
const missing = new Set(this.data.keys());
|
||||
const out = this.value.map(id => {
|
||||
missing.delete(id);
|
||||
const data = this.data.get(id);
|
||||
return data
|
||||
? data
|
||||
: {id};
|
||||
});
|
||||
|
||||
for(const key of missing) {
|
||||
const data = this.data.get(key);
|
||||
out.push(data);
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
},
|
||||
|
||||
mounted() {
|
||||
this._sortable = Sortable.create(this.$refs.list, {
|
||||
draggable: 'section',
|
||||
filter: 'button',
|
||||
|
||||
onUpdate: event => {
|
||||
if ( event.newIndex === event.oldIndex )
|
||||
return;
|
||||
|
||||
const new_val = Array.from(this.val);
|
||||
new_val.splice(event.newIndex, 0, ...new_val.splice(event.oldIndex, 1));
|
||||
this.setValue(new_val);
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
beforeDestroy() {
|
||||
if ( this._sortable )
|
||||
this._sortable.destroy();
|
||||
|
||||
this._sortable = null;
|
||||
},
|
||||
|
||||
methods: {
|
||||
setValue(input) {
|
||||
if (input?.length > 0)
|
||||
this.set(input.map(x => x.id));
|
||||
else
|
||||
this.clear();
|
||||
},
|
||||
|
||||
addPriorities() {
|
||||
this.set([...this.data.keys()]);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
</script>
|
|
@ -60,7 +60,7 @@
|
|||
>
|
||||
<div class="tw-elevation-1 tw-c-background-base tw-border tw-pd-y-05 tw-pd-x-1 tw-mg-y-05 tw-flex tw-flex-nowrap">
|
||||
<div class="tw-flex-grow-1">
|
||||
<h4>{{ exp.name }}</h4>
|
||||
<h4>{{ exp.name ? exp.name : key }}</h4>
|
||||
<div v-if="exp.description" class="description">
|
||||
{{ exp.description }}
|
||||
</div>
|
||||
|
@ -359,8 +359,8 @@ export default {
|
|||
if ( a_r > b_r ) return 1;
|
||||
}
|
||||
|
||||
const a_n = a.exp.name.toLowerCase(),
|
||||
b_n = b.exp.name.toLowerCase();
|
||||
const a_n = a.exp.name?.toLowerCase() ?? a.key?.toLowerCase(),
|
||||
b_n = b.exp.name?.toLowerCase() ?? b.key?.toLowerCase();
|
||||
|
||||
if ( a_n < b_n ) return -1;
|
||||
if ( a_n > b_n ) return 1;
|
||||
|
|
|
@ -5,12 +5,10 @@
|
|||
// ============================================================================
|
||||
|
||||
import Module, { GenericModule } from 'utilities/module';
|
||||
import { PUBSUB_CLUSTERS } from 'utilities/constants';
|
||||
import { PubSubClient } from './client';
|
||||
import type ExperimentManager from '../experiments';
|
||||
import type SettingsManager from '../settings';
|
||||
import type PubSubClient from 'utilities/pubsub';
|
||||
import type { PubSubCommands } from 'utilities/types';
|
||||
import type { SettingUi_Select_Entry } from '../settings/types';
|
||||
|
||||
declare module 'utilities/types' {
|
||||
interface ModuleMap {
|
||||
|
@ -20,10 +18,10 @@ declare module 'utilities/types' {
|
|||
pubsub: PubSubEvents;
|
||||
}
|
||||
interface SettingsTypeMap {
|
||||
'pubsub.use-cluster': keyof typeof PUBSUB_CLUSTERS | null;
|
||||
'pubsub.enabled': boolean;
|
||||
}
|
||||
interface ExperimentTypeMap {
|
||||
cf_pubsub: boolean;
|
||||
worker_pubsub: boolean;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -54,81 +52,47 @@ export default class PubSub extends Module<'pubsub', PubSubEvents> {
|
|||
_topics: Map<string, Set<unknown>>;
|
||||
_client: PubSubClient | null;
|
||||
|
||||
_mqtt?: typeof PubSubClient | null;
|
||||
_mqtt_loader?: Promise<typeof PubSubClient> | null;
|
||||
|
||||
constructor(name?: string, parent?: GenericModule) {
|
||||
super(name, parent);
|
||||
|
||||
this.inject('settings');
|
||||
this.inject('experiments');
|
||||
|
||||
this.settings.add('pubsub.use-cluster', {
|
||||
default: () => {
|
||||
if ( this.experiments.getAssignment('emqx_pubsub') )
|
||||
return 'EMQXTest';
|
||||
if ( this.experiments.getAssignment('cf_pubsub') )
|
||||
return 'Staging';
|
||||
return null;
|
||||
},
|
||||
this.settings.add('pubsub.enabled', {
|
||||
default: () => this.experiments.getAssignment('worker_pubsub') ?? false,
|
||||
|
||||
ui: {
|
||||
path: 'Debugging @{"expanded": false, "sort": 9999} > PubSub >> General',
|
||||
title: 'Server Cluster',
|
||||
description: 'Which server cluster to connect to. You can use this setting to disable PubSub if you want, but should otherwise leave this on the default value unless you know what you\'re doing.',
|
||||
title: 'Enable PubSub.',
|
||||
description: 'Whether or not you want your client to connect to FrankerFaceZ\'s PubSub system. This is still in testing and should be left alone unless you know what you\'re doing.',
|
||||
force_seen: true,
|
||||
|
||||
component: 'setting-select-box',
|
||||
|
||||
data: [{
|
||||
value: null,
|
||||
title: 'Disabled'
|
||||
} as SettingUi_Select_Entry<string | null>].concat(Object.keys(PUBSUB_CLUSTERS).map(x => ({
|
||||
value: x,
|
||||
title: x
|
||||
})))
|
||||
component: 'setting-check-box',
|
||||
},
|
||||
|
||||
changed: () => this.reconnect()
|
||||
changed: val => val ? this.connect() : this.disconnect()
|
||||
});
|
||||
|
||||
this._topics = new Map;
|
||||
this._client = null;
|
||||
}
|
||||
|
||||
loadPubSubClient() {
|
||||
if ( this._mqtt )
|
||||
return Promise.resolve(this._mqtt);
|
||||
|
||||
if ( ! this._mqtt_loader )
|
||||
this._mqtt_loader = import(/* webpackChunkName: 'pubsub' */ 'utilities/pubsub')
|
||||
.then(thing => {
|
||||
this._mqtt = thing.default;
|
||||
return thing.default;
|
||||
})
|
||||
.finally(() => this._mqtt_loader = null);
|
||||
|
||||
return this._mqtt_loader;
|
||||
}
|
||||
|
||||
onEnable() {
|
||||
this.on('experiments:changed:cf_pubsub', this._updateSetting, this);
|
||||
|
||||
this.subscribe(null, 'global');
|
||||
|
||||
this.connect();
|
||||
|
||||
this.on('experiments:changed:worker_pubsub', this._updateSetting, this);
|
||||
}
|
||||
|
||||
onDisable() {
|
||||
this.off('experiments:changed:worker_pubsub', this._updateSetting, this);
|
||||
|
||||
this.disconnect();
|
||||
|
||||
this.unsubscribe(null, 'global');
|
||||
|
||||
this.off('experiments:changed:cf_pubsub', this._updateSetting, this);
|
||||
}
|
||||
|
||||
_updateSetting() {
|
||||
this.settings.update('pubsub.use-cluster');
|
||||
this.settings.update('pubsub.enabled');
|
||||
}
|
||||
|
||||
|
||||
|
@ -137,18 +101,7 @@ export default class PubSub extends Module<'pubsub', PubSubEvents> {
|
|||
// ========================================================================
|
||||
|
||||
get connected() {
|
||||
return this._client?.connected ?? false;
|
||||
}
|
||||
|
||||
|
||||
get connecting() {
|
||||
return this._client?.connecting ?? false;
|
||||
}
|
||||
|
||||
|
||||
get disconnected() {
|
||||
// If this is null, we have no client, so we aren't connected.
|
||||
return this._client?.disconnected ?? true;
|
||||
return this._client != null;
|
||||
}
|
||||
|
||||
|
||||
|
@ -162,47 +115,21 @@ export default class PubSub extends Module<'pubsub', PubSubEvents> {
|
|||
}
|
||||
|
||||
async connect() {
|
||||
|
||||
if ( this._client )
|
||||
// If there's already a client, or PubSub is disabled
|
||||
// then we have nothing to do.
|
||||
if ( this._client || ! this.settings.get('pubsub.enabled') )
|
||||
return;
|
||||
|
||||
let cluster_id = this.settings.get('pubsub.use-cluster');
|
||||
if ( cluster_id === null )
|
||||
return;
|
||||
// Create a new instance of the PubSubClient class.
|
||||
const client = this._client = new PubSubClient();
|
||||
|
||||
let cluster = PUBSUB_CLUSTERS[cluster_id];
|
||||
|
||||
// If we didn't get a valid cluster, use production.
|
||||
if ( ! cluster?.length ) {
|
||||
cluster_id = 'Production';
|
||||
cluster = PUBSUB_CLUSTERS.Production;
|
||||
}
|
||||
|
||||
this.log.info(`Using PubSub: ${cluster_id} (${cluster})`);
|
||||
|
||||
const user = this.resolve('site')?.getUser?.();
|
||||
|
||||
// The client class handles everything for us. We only
|
||||
// maintain a separate list of topics in case topics are
|
||||
// subscribed when the client does not exist, or if the
|
||||
// client needs to be recreated.
|
||||
|
||||
const PubSubClient = await this.loadPubSubClient();
|
||||
|
||||
const client = this._client = new PubSubClient(cluster, {
|
||||
logger: this.log.get('client'),
|
||||
user: user?.id ? {
|
||||
provider: 'twitch',
|
||||
id: user.id
|
||||
} : null
|
||||
// Connect to the various events.
|
||||
client.on('connect', () => {
|
||||
this.log.info('Connected to PubSub.');
|
||||
});
|
||||
|
||||
client.on('connect', msg => {
|
||||
this.log.info('Connected to PubSub.', msg);
|
||||
});
|
||||
|
||||
client.on('disconnect', msg => {
|
||||
this.log.info('Disconnected from PubSub.', msg);
|
||||
client.on('disconnect', () => {
|
||||
this.log.info('Disconnected from PubSub.');
|
||||
});
|
||||
|
||||
client.on('error', err => {
|
||||
|
@ -225,7 +152,7 @@ export default class PubSub extends Module<'pubsub', PubSubEvents> {
|
|||
this.emit(`:command:${data.cmd}` as PubSubCommandKey, data.data, data);
|
||||
});
|
||||
|
||||
// Subscribe to topics.
|
||||
// Subscribe to our topics.
|
||||
const topics = [...this._topics.keys()];
|
||||
client.subscribe(topics);
|
||||
|
||||
|
|
|
@ -93,6 +93,34 @@ export default class PlayerBase extends Module {
|
|||
}
|
||||
});
|
||||
|
||||
this.settings.add('player.clip-button.hide-native', {
|
||||
default: null,
|
||||
requires: ['player.clip-button.custom'],
|
||||
process: (ctx, val) => val ?? ctx.get('player.clip-button.custom'),
|
||||
ui: {
|
||||
path: 'Player > General >> Appearance',
|
||||
component: 'setting-check-box',
|
||||
title: 'Hide the native Clip button.',
|
||||
description: 'By default, this is enabled when using the setting to add a custom Clip button.'
|
||||
},
|
||||
|
||||
changed: val => this.css_tweaks.toggle('player-hide-native-clip', val)
|
||||
});
|
||||
|
||||
this.settings.add('player.clip-button.custom', {
|
||||
default: false,
|
||||
ui: {
|
||||
path: 'Player > General >> Appearance',
|
||||
component: 'setting-check-box',
|
||||
title: 'Display a custom Clip button.',
|
||||
description: 'Add a custom Clip button to the player that better fits the style of the other buttons.'
|
||||
},
|
||||
changed: () => {
|
||||
for(const inst of this.Player.instances)
|
||||
this.addClipButton(inst);
|
||||
}
|
||||
});
|
||||
|
||||
this.settings.add('player.fade-pause-buffer', {
|
||||
default: false,
|
||||
ui: {
|
||||
|
@ -600,6 +628,7 @@ export default class PlayerBase extends Module {
|
|||
await this.settings.provider.awaitReady();
|
||||
|
||||
this.css_tweaks.toggleHide('player-gain-volume', this.settings.get('player.gain.no-volume'));
|
||||
this.css_tweaks.toggle('player-hide-native-clip', this.settings.get('player.clip-button.hide-native'));
|
||||
this.css_tweaks.toggle('player-volume', this.settings.get('player.volume-always-shown'));
|
||||
this.css_tweaks.toggle('player-ext-mouse', !this.settings.get('player.ext-interaction'));
|
||||
this.css_tweaks.toggle('player-hide-mouse', this.settings.get('player.hide-mouse'));
|
||||
|
@ -1237,6 +1266,7 @@ export default class PlayerBase extends Module {
|
|||
this.skipContentWarnings(inst);
|
||||
this.addPiPButton(inst);
|
||||
this.addResetButton(inst);
|
||||
this.addClipButton(inst);
|
||||
this.addCompressorButton(inst, false);
|
||||
this.addGainSlider(inst, false);
|
||||
this.addMetadata(inst);
|
||||
|
@ -1982,6 +2012,81 @@ export default class PlayerBase extends Module {
|
|||
}
|
||||
|
||||
|
||||
addClipButton(inst, tries = 0) {
|
||||
const outer = inst.props.containerRef || this.fine.getChildNode(inst),
|
||||
container = outer && outer.querySelector(RIGHT_CONTROLS),
|
||||
has_clip_button = this.settings.get('player.clip-button.custom');
|
||||
|
||||
if ( ! container ) {
|
||||
if ( ! has_clip_button )
|
||||
return;
|
||||
|
||||
if ( tries < 5 )
|
||||
return setTimeout(this.addClipButton.bind(this, inst, (tries || 0) + 1), 250);
|
||||
|
||||
return; // this.log.warn('Unable to find container element for Clip button.');
|
||||
}
|
||||
|
||||
let tip, btn, cont = container.querySelector('.ffz--player-clip');
|
||||
if ( ! has_clip_button ) {
|
||||
if ( cont )
|
||||
cont.remove();
|
||||
return;
|
||||
}
|
||||
|
||||
if ( ! cont ) {
|
||||
// We need the native clip button, so we can dispatch a click.
|
||||
const native_clip = container.querySelector('button[aria-label*="alt+x"]');
|
||||
if ( ! native_clip )
|
||||
return;
|
||||
|
||||
const on_click = e => native_clip.click();
|
||||
|
||||
cont = (<div class="ffz--player-clip tw-inline-flex tw-relative ffz-il-tooltip__container">
|
||||
{btn = (<button
|
||||
class="tw-align-items-center tw-align-middle tw-border-bottom-left-radius-medium tw-border-bottom-right-radius-medium tw-border-top-left-radius-medium tw-border-top-right-radius-medium tw-button-icon tw-button-icon--overlay ffz-core-button ffz-core-button--border ffz-core-button--overlay tw-inline-flex tw-interactive tw-justify-content-center tw-overflow-hidden tw-relative"
|
||||
type="button"
|
||||
data-a-target="ffz-player-clip-button"
|
||||
onClick={on_click}
|
||||
>
|
||||
<div class="tw-align-items-center tw-flex tw-flex-grow-0">
|
||||
<div class="tw-button-icon__icon">
|
||||
<figure class="ffz-player-icon ffz-i-clip" />
|
||||
</div>
|
||||
</div>
|
||||
</button>)}
|
||||
{tip = (<div class="ffz-il-tooltip ffz-il-tooltip--align-right ffz-il-tooltip--up" role="tooltip" />)}
|
||||
</div>);
|
||||
|
||||
const thing = container.querySelector('.ffz--player-reset button') ||
|
||||
container.querySelector('.ffz--player-pip button') ||
|
||||
container.querySelector('button[data-a-target="player-theatre-mode-button"]') ||
|
||||
//container.querySelector('div:not(:has(.tw-tooltip)) button:not([data-a-target])') ||
|
||||
container.querySelector('button[aria-label*="Theat"]') ||
|
||||
container.querySelector('button[data-a-target="player-fullscreen-button"]');
|
||||
|
||||
if ( thing ) {
|
||||
container.insertBefore(cont, thing.parentElement);
|
||||
} else
|
||||
container.appendChild(cont);
|
||||
|
||||
} else {
|
||||
btn = cont.querySelector('button');
|
||||
tip = cont.querySelector('.ffz-il-tooltip');
|
||||
}
|
||||
|
||||
btn.setAttribute('aria-label',
|
||||
tip.textContent = this.i18n.t(
|
||||
'player.clip-button',
|
||||
'Clip (Alt+X)'
|
||||
));
|
||||
}
|
||||
|
||||
clickClip(inst, e) {
|
||||
console.log('clicked clip', inst, e);
|
||||
}
|
||||
|
||||
|
||||
addResetButton(inst, tries = 0) {
|
||||
const outer = inst.props.containerRef || this.fine.getChildNode(inst),
|
||||
container = outer && outer.querySelector(RIGHT_CONTROLS),
|
||||
|
|
|
@ -2483,13 +2483,13 @@ export default class EmoteMenu extends Module {
|
|||
|
||||
const key = `${emote_set.merge_source || fav_key}-${emote_set.merge_id || emote_set.id}`,
|
||||
pdata = t.emotes.providers.get(provider),
|
||||
source = pdata && pdata.name ?
|
||||
(pdata.i18n_key ?
|
||||
t.i18n.t(pdata.i18n_key, pdata.name, pdata) :
|
||||
pdata.name) :
|
||||
source = pdata && pdata.menu_name ?
|
||||
(pdata.menu_i18n_key ?
|
||||
t.i18n.t(pdata.menu_i18n_key, pdata.menu_name, pdata) :
|
||||
pdata.menu_name) :
|
||||
emote_set.source || 'FFZ',
|
||||
|
||||
title = (provider === 'main' || emote_set.title_is_channel)
|
||||
title = (provider === 'ffz-main' || emote_set.title_is_channel)
|
||||
? source_name
|
||||
? t.i18n.t('emote-menu.source-set', '{channel}\'s Emotes', {channel: source_name})
|
||||
: t.i18n.t('emote-menu.main-set', 'Channel Emotes')
|
||||
|
|
|
@ -78,7 +78,9 @@ export default class ViewerCards extends Module {
|
|||
else
|
||||
color = 'rgba(128,170,255,0.2)';
|
||||
|
||||
this.css_tweaks.set('viewer-card-highlight', `body .chat-room .chat-line__message:not(.chat-line--inline):nth-child(1n+0)[data-user="${login}"] {
|
||||
this.css_tweaks.set('viewer-card-highlight', `
|
||||
body .chat-room .chat-scrollable-area__message-container > div:nth-child(1n+0) > .chat-line__message:not(.chat-line--inline):not(.something-nonexistent)[data-user="${login}"],
|
||||
body .chat-room .chat-line__message:not(.chat-line--inline):nth-child(1n+0)[data-user="${login}"] {
|
||||
background-color: ${color} !important;
|
||||
}`);
|
||||
} else
|
||||
|
@ -92,4 +94,4 @@ export default class ViewerCards extends Module {
|
|||
unmountCard() {
|
||||
this.updateStyle();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -660,6 +660,9 @@ export default class Directory extends Module {
|
|||
if ( ! props?.channelLogin )
|
||||
props = react.return?.return?.return?.memoizedProps;
|
||||
|
||||
if ( ! props?.channelLogin )
|
||||
props = react.return?.return?.return?.return?.return?.memoizedProps;
|
||||
|
||||
if ( ! props?.channelLogin )
|
||||
return;
|
||||
|
||||
|
|
|
@ -126,12 +126,12 @@ export default class SocketClient extends Module {
|
|||
|
||||
|
||||
onEnable() {
|
||||
// We don't connect anymore.
|
||||
// For now, stop connecting to the sockets for people using the
|
||||
// API links experiment.
|
||||
if ( this.experiments.getAssignment('api_links') )
|
||||
return;
|
||||
|
||||
this.connect();
|
||||
//if ( this.experiments.getAssignment('api_links') )
|
||||
// return;
|
||||
//this.connect();
|
||||
}
|
||||
onDisable() { this.disconnect() }
|
||||
|
||||
|
|
|
@ -279,10 +279,10 @@ export const WS_CLUSTERS = {
|
|||
|
||||
|
||||
export const LINK_DATA_HOSTS = {
|
||||
socket: {
|
||||
/*socket: {
|
||||
title: 'Socket Cluster (Deprecated)',
|
||||
value: 'special:socket'
|
||||
},
|
||||
},*/
|
||||
localhost: {
|
||||
title: 'Local Dev Server (Port 8002)',
|
||||
value: 'https://localhost:8002',
|
||||
|
@ -292,17 +292,17 @@ export const LINK_DATA_HOSTS = {
|
|||
title: 'Local Dev Worker (Wrangler, Port 8787)',
|
||||
value: 'https://localhost:8787'
|
||||
},
|
||||
test: {
|
||||
/*test: {
|
||||
title: 'API Test Server',
|
||||
value: 'https://api-test.frankerfacez.com/v2/link'
|
||||
},
|
||||
'test-cf': {
|
||||
title: 'Cloudflare Test Worker',
|
||||
value: 'https://link-service.workers.frankerfacez.com'
|
||||
},
|
||||
},*/
|
||||
Production: {
|
||||
title: 'Production',
|
||||
value: 'https://api.frankerfacez.com/v2/link'
|
||||
value: 'https://link-service.workers.frankerfacez.com'
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -225,6 +225,13 @@ export class Module<
|
|||
get addon_id() { return this.__addon_id }
|
||||
/** If this module is part of an add-on, the add-on's root module. */
|
||||
get addon_root() { return this.__addon_root }
|
||||
/** If this module is part of an add-on, the add-on's manifest. */
|
||||
get addon_manifest() {
|
||||
if (this.__addon_id)
|
||||
return this.resolve('addons')?.getAddon(this.__addon_id!) ??
|
||||
(this.__addon_root?.constructor as typeof Addon)?.info ??
|
||||
undefined;
|
||||
}
|
||||
|
||||
/** A Logger instance for this module. */
|
||||
get log(): Logger {
|
||||
|
|
|
@ -1121,6 +1121,8 @@ export function generateHex(length: number = 40) {
|
|||
}
|
||||
|
||||
|
||||
export type StringSortFn = (first: string, second: string) => number;
|
||||
|
||||
/**
|
||||
* SourcedSet is similar to a Set, except that entries in the set are kept
|
||||
* track of based upon a `source`. This allows entries from a specific source
|
||||
|
@ -1150,6 +1152,9 @@ export class SourcedSet<T> {
|
|||
_cache: Set<T> | T[];
|
||||
|
||||
private _sources?: Map<string, T[]>;
|
||||
private _sorted_sources?: T[][] | null;
|
||||
|
||||
private _sourceSortFn?: StringSortFn | null;
|
||||
|
||||
/**
|
||||
* Create a new SourcedSet.
|
||||
|
@ -1159,9 +1164,52 @@ export class SourcedSet<T> {
|
|||
*
|
||||
* @param T The type of object to hold.
|
||||
*/
|
||||
constructor(use_set = false) {
|
||||
constructor(use_set = false, source_sorter?: StringSortFn) {
|
||||
this._use_set = use_set;
|
||||
this._cache = use_set ? new Set : [];
|
||||
this._sourceSortFn = source_sorter;
|
||||
}
|
||||
|
||||
|
||||
resortSources() {
|
||||
this._sorted_sources = null;
|
||||
|
||||
// We don't need to rebuild if we have less than 2 sources
|
||||
// since sorting wouldn't change anything.
|
||||
if (this._sources && this._sources.size > 1)
|
||||
this._rebuild();
|
||||
}
|
||||
|
||||
|
||||
setSortFunction(fn: StringSortFn | null) {
|
||||
// If the method isn't changing, then we don't need
|
||||
// to do anything.
|
||||
if (this._sourceSortFn === fn)
|
||||
return;
|
||||
|
||||
// Set the new sort function.
|
||||
this._sourceSortFn = fn;
|
||||
|
||||
// And re-sort.
|
||||
this.resortSources();
|
||||
}
|
||||
|
||||
|
||||
_sortSources() {
|
||||
if (!this._sources || this._sources.size === 0) {
|
||||
this._sorted_sources = [];
|
||||
return;
|
||||
}
|
||||
|
||||
if (this._sources.size === 1 || !this._sourceSortFn) {
|
||||
this._sorted_sources = [...this._sources.values()];
|
||||
return;
|
||||
}
|
||||
|
||||
const sources = [...this._sources?.entries()];
|
||||
sources.sort((a, b) => this._sourceSortFn!(a[0], b[0]));
|
||||
|
||||
this._sorted_sources = sources.map(entry => entry[1]);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1174,7 +1222,11 @@ export class SourcedSet<T> {
|
|||
|
||||
const use_set = this._use_set,
|
||||
cache = this._cache = use_set ? new Set : [];
|
||||
for(const items of this._sources.values())
|
||||
|
||||
if (!this._sorted_sources)
|
||||
this._sortSources();
|
||||
|
||||
for(const items of this._sorted_sources!)
|
||||
for(const i of items)
|
||||
if ( use_set )
|
||||
(cache as Set<T>).add(i);
|
||||
|
@ -1210,6 +1262,7 @@ export class SourcedSet<T> {
|
|||
delete(source: string) {
|
||||
if ( this._sources && this._sources.has(source) ) {
|
||||
this._sources.delete(source);
|
||||
this._sorted_sources = null;
|
||||
this._rebuild();
|
||||
}
|
||||
}
|
||||
|
@ -1224,7 +1277,19 @@ export class SourcedSet<T> {
|
|||
items = [...existing, ...items];
|
||||
|
||||
this._sources.set(source, items);
|
||||
if ( existing )
|
||||
|
||||
// If we have a sort function and more than one source, then
|
||||
// we need to do a rebuild when making modifications.
|
||||
const need_sorting = this._sourceSortFn != null && this._sources.size > 1;
|
||||
|
||||
// If this is a new source, we need to clear the cache for
|
||||
// future rebuilds to include this.
|
||||
if ( ! existing )
|
||||
this._sorted_sources = null;
|
||||
|
||||
// If we need sorting, do a rebuild, otherwise go ahead
|
||||
// and add the items normally.
|
||||
if ( need_sorting )
|
||||
this._rebuild();
|
||||
else {
|
||||
const use_set = this._use_set,
|
||||
|
@ -1256,7 +1321,20 @@ export class SourcedSet<T> {
|
|||
|
||||
const existing = this._sources.has(source);
|
||||
this._sources.set(source, items);
|
||||
if ( existing )
|
||||
|
||||
// If we have a sort function and more than one source, then
|
||||
// we need to do a rebuild when making modifications.
|
||||
const need_sorting = this._sourceSortFn != null && this._sources.size > 1;
|
||||
|
||||
// If this is a new source, we need to clear the cache for
|
||||
// future rebuilds to include this.
|
||||
if ( ! existing )
|
||||
this._sorted_sources = null;
|
||||
|
||||
// If we need sorting, or if we replaced an existing source,
|
||||
// then we need a rebuild. Otherwise, go ahead and add the
|
||||
// items normally.
|
||||
if ( existing || need_sorting )
|
||||
this._rebuild();
|
||||
else {
|
||||
const use_set = this._use_set,
|
||||
|
@ -1284,7 +1362,14 @@ export class SourcedSet<T> {
|
|||
return;
|
||||
|
||||
existing.push(item);
|
||||
if ( this._use_set )
|
||||
|
||||
// If we have a sort function and more than one source, then
|
||||
// we need to do a rebuild when making modifications.
|
||||
const need_sorting = this._sourceSortFn != null && this._sources.size > 1;
|
||||
|
||||
if ( need_sorting )
|
||||
this._rebuild();
|
||||
else if ( this._use_set )
|
||||
(this._cache as Set<T>).add(item);
|
||||
else if ( ! (this._cache as T[]).includes(item) )
|
||||
(this._cache as T[]).push(item);
|
||||
|
@ -1302,6 +1387,8 @@ export class SourcedSet<T> {
|
|||
return;
|
||||
|
||||
(existing as T[]).splice(idx, 1);
|
||||
|
||||
// Removal always requires a rebuild.
|
||||
this._rebuild();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,724 +0,0 @@
|
|||
import { EventEmitter } from "./events";
|
||||
|
||||
import { MqttClient, DISCONNECT } from './custom_denoflare_mqtt'; // "denoflare-mqtt";
|
||||
import { b64ToArrayBuffer, debounce, importRsaKey, make_enum, sleep } from "./object";
|
||||
import { EMQX_SERVERS } from "./constants";
|
||||
|
||||
// Only match 1-4 digit numbers, to avoid matching twitch IDs.
|
||||
// 9999 gives us millions of clients on a topic, so we're never
|
||||
// going to have more subtopics than 4 digits.
|
||||
const SUBTOPIC_MATCHER = /\/(?:s(\d+)|(\d{1,4}))$/;
|
||||
|
||||
|
||||
MqttClient.prototype.reschedulePing = function reschedulePing() {
|
||||
this.clearPing();
|
||||
let delay = this.keepAliveSeconds;
|
||||
if ( this.keepAliveOverride > 0 )
|
||||
delay = Math.min(delay, this.keepAliveOverride);
|
||||
|
||||
this.pingTimeout = setTimeout(async () => {
|
||||
try {
|
||||
await this.ping();
|
||||
} catch(err) { /* no-op */ }
|
||||
this.reschedulePing();
|
||||
}, delay * 1000);
|
||||
}
|
||||
|
||||
|
||||
export const State = make_enum(
|
||||
'Disconnected',
|
||||
'Connecting',
|
||||
'Connected'
|
||||
);
|
||||
|
||||
|
||||
export default class PubSubClient extends EventEmitter {
|
||||
|
||||
constructor(server, options = {}) {
|
||||
super();
|
||||
|
||||
this.server = server;
|
||||
this.user = options?.user;
|
||||
this.logger = options?.log ?? options?.logger;
|
||||
|
||||
this._should_connect = false;
|
||||
this._state = State.Disconnected;
|
||||
|
||||
// Topics is a map of topics to sub-topic IDs.
|
||||
this._topics = new Map;
|
||||
|
||||
// Incorrect Topics is a map of every incorrect topic mapping
|
||||
// we are currently subscribed to, for the purpose of unsubscribing
|
||||
// from them.
|
||||
this._incorrect_topics = new Map;
|
||||
|
||||
// Live Topics is a set of every topic we have sent subscribe
|
||||
// packets to the server for.
|
||||
this._live_topics = new Set;
|
||||
|
||||
// Active Topics is a set of every topic we SHOULD be subscribed to.
|
||||
this._active_topics = new Set;
|
||||
|
||||
// Pending Topics is a set of topics that we should be subscribed to
|
||||
// but that we don't yet have a sub-topic assignment.
|
||||
this._pending_topics = new Set;
|
||||
|
||||
// Debounce a few things.
|
||||
this.scheduleHeartbeat = this.scheduleHeartbeat.bind(this);
|
||||
this._sendHeartbeat = this._sendHeartbeat.bind(this);
|
||||
this.scheduleResub = this.scheduleResub.bind(this);
|
||||
this._sendResub = this._sendResub.bind(this);
|
||||
|
||||
this._fetchNewTopics = this._fetchNewTopics.bind(this);
|
||||
this._sendSubscribes = debounce(this._sendSubscribes, 250);
|
||||
this._sendUnsubscribes = debounce(this._sendUnsubscribes, 250);
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// Properties
|
||||
// ========================================================================
|
||||
|
||||
get id() { return this._data?.client_id ?? null }
|
||||
|
||||
get topics() { return [...this._active_topics] }
|
||||
|
||||
get disconnected() {
|
||||
return this._state === State.Disconnected;
|
||||
};
|
||||
|
||||
get connecting() {
|
||||
return this._state === State.Connecting;
|
||||
}
|
||||
|
||||
get connected() {
|
||||
return this._state === State.Connected;
|
||||
}
|
||||
|
||||
|
||||
// ========================================================================
|
||||
// Data Loading
|
||||
// ========================================================================
|
||||
|
||||
loadData(force = false) {
|
||||
// If we have all the data we need, don't do anything.
|
||||
if ( ! force && this._data && ! this._pending_topics.size )
|
||||
return Promise.resolve(this._data);
|
||||
|
||||
if ( ! this._data_loader )
|
||||
this._data_loader = this._loadData()
|
||||
.finally(() => this._data_loader = null);
|
||||
|
||||
return this._data_loader;
|
||||
}
|
||||
|
||||
async _loadData() {
|
||||
let response, data;
|
||||
|
||||
if ( this.server === 'emqx-test' ) {
|
||||
// Hard-coded data.
|
||||
const server = EMQX_SERVERS[Math.floor(Math.random() * EMQX_SERVERS.length)];
|
||||
|
||||
data = {
|
||||
require_signing: false,
|
||||
endpoint: `wss://${server}/mqtt`,
|
||||
|
||||
username: 'anonymous',
|
||||
password: `eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhbm9ueW1vdXMifQ.5DZP1bScMz4-MV_jGZveUKq4pFy9x_PJF9gSzAvj-wA`,
|
||||
topics: Object.fromEntries(this.topics.map(x => [x, 0]))
|
||||
}
|
||||
|
||||
} else {
|
||||
try {
|
||||
// TODO: Send removed topics.
|
||||
response = await fetch(this.server, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: JSON.stringify({
|
||||
id: this.id,
|
||||
user: this.user ?? null,
|
||||
topics: this.topics
|
||||
})
|
||||
});
|
||||
|
||||
if ( response.ok )
|
||||
data = await response.json();
|
||||
|
||||
} catch(err) {
|
||||
throw new Error(
|
||||
'Unable to load PubSub data from server.',
|
||||
{
|
||||
cause: err
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if ( ! data?.endpoint )
|
||||
throw new Error('Received invalid PubSub data from server.');
|
||||
|
||||
// If there's a signing key, parse it.
|
||||
if ( data.public_key )
|
||||
try {
|
||||
data.public_key = await importRsaKey(data.public_key);
|
||||
} catch(err) {
|
||||
throw new Error('Received invalid public key from server.', {
|
||||
cause: err
|
||||
});
|
||||
}
|
||||
|
||||
else
|
||||
data.public_key = null;
|
||||
|
||||
if ( data.require_signing && ! data.public_key )
|
||||
throw new Error('Server requires signing but did not provide public key.');
|
||||
|
||||
// If we already had a password, preserve it.
|
||||
if ( this._data?.password && ! data.password )
|
||||
data.password = this._data.password;
|
||||
|
||||
// Record all the topic mappings we just got.
|
||||
// TODO: Check for subtopic mismatches.
|
||||
// TODO: Check for removed subtopic assignments.
|
||||
if ( data.topics ) {
|
||||
const removed = new Set(this._topics.keys());
|
||||
|
||||
for(const [key, val] of Object.entries(data.topics)) {
|
||||
removed.delete(key);
|
||||
const existing = this._topics.get(key);
|
||||
if ( existing != null && existing != val )
|
||||
this._incorrect_topics.set(key, existing);
|
||||
this._topics.set(key, val);
|
||||
}
|
||||
|
||||
for(const key of removed) {
|
||||
const existing = this._topics.get(key);
|
||||
this._topics.delete(key);
|
||||
this._incorrect_topics.set(key, existing);
|
||||
}
|
||||
|
||||
// If we have a mismatch, handle it.
|
||||
if ( this._incorrect_topics.size )
|
||||
this._sendUnsubscribes()
|
||||
.then(() => this._sendSubscribes());
|
||||
}
|
||||
|
||||
// Update the heartbeat timer.
|
||||
this.scheduleHeartbeat();
|
||||
|
||||
this._data = data;
|
||||
return data;
|
||||
}
|
||||
|
||||
async _fetchNewTopics(attempts = 0) {
|
||||
this._fetch_timer = null;
|
||||
let needs_fetch = false;
|
||||
for(const topic of [...this._pending_topics]) {
|
||||
if ( ! this._topics.has(topic) )
|
||||
needs_fetch = true;
|
||||
}
|
||||
|
||||
if ( needs_fetch )
|
||||
try {
|
||||
await this.loadData();
|
||||
} catch(err) {
|
||||
if ( attempts > 10 ) {
|
||||
this._fetch_timer = null;
|
||||
throw err;
|
||||
}
|
||||
|
||||
let delay = (attempts + 1) * (Math.floor(Math.random() * 10) + 2) * 1000;
|
||||
if ( delay > 60000 )
|
||||
delay = (Math.floor(Math.random() * 60) + 30) * 1000;
|
||||
|
||||
return sleep(delay).then(() => this._fetchNewTopics(attempts + 1));
|
||||
}
|
||||
|
||||
if ( this._client )
|
||||
this._sendSubscribes();
|
||||
}
|
||||
|
||||
|
||||
// ========================================================================
|
||||
// Connecting
|
||||
// ========================================================================
|
||||
|
||||
connect() {
|
||||
return this._connect();
|
||||
}
|
||||
|
||||
async _connect(attempts = 0) {
|
||||
if ( this._state === State.Connected )
|
||||
return;
|
||||
|
||||
this._state = State.Connecting;
|
||||
|
||||
let data;
|
||||
try {
|
||||
data = await this.loadData();
|
||||
} catch(err) {
|
||||
if ( attempts > 10 ) {
|
||||
this._state = State.Disconnected;
|
||||
throw err;
|
||||
}
|
||||
|
||||
let delay = (attempts + 1) * (Math.floor(Math.random() * 10) + 2) * 1000;
|
||||
if ( delay > 60000 )
|
||||
delay = (Math.floor(Math.random() * 60) + 30) * 1000;
|
||||
|
||||
return sleep(delay).then(() => this._connect(attempts + 1));
|
||||
}
|
||||
|
||||
if ( this.logger )
|
||||
this.logger.debug('Received Configuration', data);
|
||||
|
||||
// We have our configuration. Now, create our client.
|
||||
this._should_connect = true;
|
||||
this._createClient(data);
|
||||
|
||||
// Set up a heartbeat to keep us alive.
|
||||
this.scheduleHeartbeat();
|
||||
}
|
||||
|
||||
disconnect() {
|
||||
this._should_connect = false;
|
||||
this._destroyClient();
|
||||
this._state = State.Disconnected;
|
||||
|
||||
this.clearHeartbeat();
|
||||
|
||||
// Reset all our state except active topics.
|
||||
this._data = null;
|
||||
this._live_topics.clear();
|
||||
this._topics.clear();
|
||||
this._pending_topics.clear();
|
||||
|
||||
for(const topic of this._active_topics)
|
||||
this._pending_topics.add(topic);
|
||||
}
|
||||
|
||||
subscribe(topic) {
|
||||
if ( Array.isArray(topic) ) {
|
||||
for(const item of topic)
|
||||
this.subscribe(item);
|
||||
return;
|
||||
}
|
||||
|
||||
// If this is already an active topic, there's nothing
|
||||
// else to do.
|
||||
if ( this._active_topics.has(topic) )
|
||||
return;
|
||||
|
||||
this._active_topics.add(topic);
|
||||
|
||||
// If we don't have a sub-topic mapping, then we need to
|
||||
// request a new one. Mark this topic as pending.
|
||||
if ( ! this._topics.has(topic) )
|
||||
this._pending_topics.add(topic);
|
||||
|
||||
// If we have a client, and we have pending topics, and there
|
||||
// isn't a pending fetch, then schedule a fetch.
|
||||
if ( this._client && this._pending_topics.size && ! this._fetch_timer )
|
||||
this._fetch_timer = setTimeout(this._fetchNewTopics, 5000);
|
||||
|
||||
// Finally, if we have a client, send out a subscribe packet.
|
||||
// This method is debounced by 250ms.
|
||||
if ( this._client )
|
||||
this._sendSubscribes();
|
||||
}
|
||||
|
||||
unsubscribe(topic) {
|
||||
if ( Array.isArray(topic) ) {
|
||||
for(const item of topic)
|
||||
this.unsubscribe(item);
|
||||
return;
|
||||
}
|
||||
|
||||
// If this topic isn't an active topic, we have nothing to do.
|
||||
if ( ! this._active_topics.has(topic) )
|
||||
return;
|
||||
|
||||
// Remove the topic from the active and pending topics. Don't
|
||||
// remove it from the topic map though, since our client is
|
||||
// still live.
|
||||
this._active_topics.delete(topic);
|
||||
this._pending_topics.delete(topic);
|
||||
|
||||
if ( this._client )
|
||||
this._sendUnsubscribes();
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// Keep Alives
|
||||
// ========================================================================
|
||||
|
||||
clearHeartbeat() {
|
||||
if ( this._heartbeat )
|
||||
clearTimeout(this._heartbeat);
|
||||
}
|
||||
|
||||
scheduleHeartbeat() {
|
||||
if ( this._heartbeat )
|
||||
clearTimeout(this._heartbeat);
|
||||
this._heartbeat = setTimeout(this._sendHeartbeat, 5 * 60 * 1000);
|
||||
}
|
||||
|
||||
_sendHeartbeat() {
|
||||
if ( ! this._data?.client_id )
|
||||
return this.scheduleHeartbeat();
|
||||
|
||||
this.loadData(true)
|
||||
.finally(this.scheduleHeartbeat);
|
||||
}
|
||||
|
||||
|
||||
clearResubTimer() {
|
||||
if ( this._resub_timer ) {
|
||||
clearTimeout(this._resub_timer);
|
||||
this._resub_timer = null;
|
||||
}
|
||||
}
|
||||
|
||||
scheduleResub() {
|
||||
if ( this._resub_timer )
|
||||
clearTimeout(this._resub_timer);
|
||||
|
||||
// Send a resubscription every 30 minutes.
|
||||
this._resub_timer = setTimeout(this._sendResub, 30 * 60 * 1000);
|
||||
}
|
||||
|
||||
_sendResub() {
|
||||
this._resendSubscribes()
|
||||
.finally(this.scheduleResub);
|
||||
}
|
||||
|
||||
|
||||
// ========================================================================
|
||||
// Client Management
|
||||
// ========================================================================
|
||||
|
||||
_destroyClient() {
|
||||
if ( ! this._client )
|
||||
return;
|
||||
|
||||
this.clearResubTimer();
|
||||
|
||||
try {
|
||||
this._client.disconnect().catch(() => {});
|
||||
} catch(err) { /* no-op */ }
|
||||
this._client = null;
|
||||
|
||||
this._live_topics.clear();
|
||||
}
|
||||
|
||||
_createClient(data) {
|
||||
// If there is an existing client, destroy it first.
|
||||
if ( this._client )
|
||||
this._destroyClient();
|
||||
|
||||
// Now, create a new instance of our client.
|
||||
// This requires a parsed URL because the dumb client doesn't
|
||||
// take URLs like every other client ever.
|
||||
const url = new URL(data.endpoint);
|
||||
|
||||
this._live_topics.clear();
|
||||
this._state = State.Connecting;
|
||||
|
||||
const client = this._client = new MqttClient({
|
||||
hostname: url.hostname,
|
||||
port: url.port ?? undefined,
|
||||
pathname: url.pathname ?? undefined,
|
||||
protocol: 'wss',
|
||||
maxMessagesPerSecond: 10
|
||||
});
|
||||
|
||||
let disconnected = false;
|
||||
|
||||
this._client.onMqttMessage = message => {
|
||||
if ( message.type === DISCONNECT ) {
|
||||
if ( disconnected )
|
||||
return;
|
||||
|
||||
disconnected = true;
|
||||
this.emit('disconnect', message);
|
||||
this._destroyClient();
|
||||
|
||||
if ( this._should_connect )
|
||||
this._createClient(data);
|
||||
}
|
||||
}
|
||||
|
||||
this._client.onReceive = async message => {
|
||||
// Get the topic, and remove the subtopic from it.
|
||||
let topic = message.topic;
|
||||
const match = SUBTOPIC_MATCHER.exec(topic);
|
||||
if ( match )
|
||||
topic = topic.slice(0, match.index);
|
||||
|
||||
if ( ! this._active_topics.has(topic) ) {
|
||||
if ( this.logger )
|
||||
this.logger.debug('Received message for unsubscribed topic:', topic);
|
||||
return;
|
||||
}
|
||||
|
||||
let payload = message.payload;
|
||||
if ( payload instanceof Uint8Array )
|
||||
payload = new TextDecoder().decode(payload);
|
||||
|
||||
let msg;
|
||||
try {
|
||||
msg = JSON.parse(payload);
|
||||
} catch(err) {
|
||||
if ( this.logger )
|
||||
this.logger.warn(`Error decoding PubSub message on topic "${topic}":`, err);
|
||||
return;
|
||||
}
|
||||
|
||||
if ( data.require_signing ) {
|
||||
let valid = false;
|
||||
const sig = msg.sig;
|
||||
delete msg.sig;
|
||||
|
||||
if ( sig )
|
||||
try {
|
||||
const encoded = new TextEncoder().encode(JSON.stringify(msg));
|
||||
|
||||
valid = await crypto.subtle.verify(
|
||||
{
|
||||
name: "RSA-PSS",
|
||||
saltLength: 32
|
||||
},
|
||||
data.public_key,
|
||||
b64ToArrayBuffer(sig),
|
||||
encoded
|
||||
);
|
||||
|
||||
} catch(err) {
|
||||
if ( this.logger )
|
||||
this.logger.warn('Error attempting to verify signature for message.', err);
|
||||
return;
|
||||
}
|
||||
|
||||
if ( ! valid ) {
|
||||
msg.sig = sig;
|
||||
if ( this.logger )
|
||||
this.logger.debug(`Received message on topic "${topic}" that failed signature verification:`, msg);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
this.emit('message', { topic, data: msg });
|
||||
}
|
||||
|
||||
// We want to send a keep-alive every 60 seconds, despite
|
||||
// requesting a keepAlive of 120 to the server. We do this
|
||||
// because of how background tabs are throttled by browsers.
|
||||
client.keepAliveOverride = 60;
|
||||
|
||||
return this._client.connect({
|
||||
clientId: data.client_id,
|
||||
username: data.username,
|
||||
password: data.password,
|
||||
keepAlive: 120,
|
||||
clean: true
|
||||
}).then(msg => {
|
||||
this._conn_failures = 0;
|
||||
this._state = State.Connected;
|
||||
this.emit('connect', msg);
|
||||
|
||||
// Periodically re-send our subscriptions. This
|
||||
// is done because the broker seems to be forgetful
|
||||
// for long-lived connections.
|
||||
this.scheduleResub();
|
||||
|
||||
// Reconnect when this connection ends.
|
||||
if ( client.connectionCompletion )
|
||||
client.connectionCompletion.finally(() => {
|
||||
if ( disconnected )
|
||||
return;
|
||||
|
||||
disconnected = true;
|
||||
this.emit('disconnect', null);
|
||||
this._destroyClient();
|
||||
|
||||
if ( this._should_connect )
|
||||
this._createClient(data);
|
||||
});
|
||||
|
||||
return this._sendSubscribes()
|
||||
}).catch(err => {
|
||||
if ( this.logger )
|
||||
this.logger.debug('Error connecting to MQTT.', err);
|
||||
|
||||
disconnected = true;
|
||||
this.emit('disconnect', null);
|
||||
|
||||
this._destroyClient();
|
||||
|
||||
if ( ! this._should_connect )
|
||||
return;
|
||||
|
||||
this._conn_failures = (this._conn_failures || 0) + 1;
|
||||
let delay = (this._conn_failures * Math.floor(Math.random() * 10) + 2) * 1000;
|
||||
if ( delay > 60000 )
|
||||
delay = (Math.floor(Math.random() * 60) + 30) * 1000;
|
||||
|
||||
if ( delay <= 2000 )
|
||||
delay = 2000;
|
||||
|
||||
return sleep(delay).then(() => {
|
||||
if ( this._should_connect && ! this._client )
|
||||
this._createClient(data);
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
_resendSubscribes() {
|
||||
if ( ! this._client )
|
||||
return Promise.resolve();
|
||||
|
||||
const topics = [],
|
||||
batch = [];
|
||||
|
||||
for(const topic of this._live_topics) {
|
||||
const subtopic = this._topics.get(topic);
|
||||
if ( subtopic != null ) {
|
||||
if ( subtopic === 0 )
|
||||
topics.push(topic);
|
||||
else
|
||||
topics.push(`${topic}/s${subtopic}`);
|
||||
|
||||
batch.push(topic);
|
||||
}
|
||||
}
|
||||
|
||||
if ( ! topics.length )
|
||||
return Promise.resolve();
|
||||
|
||||
return this._client.subscribe({topicFilter: topics})
|
||||
.catch(() => {
|
||||
// If there was an error, we did NOT subscribe.
|
||||
for(const topic of batch)
|
||||
this._live_topics.delete(topic);
|
||||
|
||||
if ( this._live_topics.size != this._active_topics.size )
|
||||
return sleep(2000).then(() => this._sendSubscribes());
|
||||
});
|
||||
}
|
||||
|
||||
_sendSubscribes() {
|
||||
if ( ! this._client )
|
||||
return Promise.resolve();
|
||||
|
||||
const topics = [],
|
||||
batch = [];
|
||||
|
||||
for(const topic of this._active_topics) {
|
||||
if ( this._live_topics.has(topic) )
|
||||
continue;
|
||||
|
||||
const subtopic = this._topics.get(topic);
|
||||
if ( subtopic != null ) {
|
||||
// Make sure this topic isn't considered pending.
|
||||
this._pending_topics.delete(topic);
|
||||
|
||||
if ( subtopic === 0 )
|
||||
topics.push(topic);
|
||||
else
|
||||
topics.push(`${topic}/s${subtopic}`);
|
||||
|
||||
// Make a note, we're subscribing to this topic.
|
||||
this._live_topics.add(topic);
|
||||
batch.push(topic);
|
||||
}
|
||||
}
|
||||
|
||||
if ( ! topics.length )
|
||||
return Promise.resolve();
|
||||
|
||||
return this._client.subscribe({topicFilter: topics })
|
||||
.then(() => {
|
||||
// Success. Reset the subscribe failures count.
|
||||
this._sub_failures = 0;
|
||||
})
|
||||
.catch(msg => {
|
||||
// TODO: Check the reason why.
|
||||
if ( this.logger )
|
||||
this.logger.debug('Subscribe failure for topics:', batch.join(', '), ' reason:', msg);
|
||||
|
||||
// If there was an error, we did NOT subscribe.
|
||||
for(const topic of batch)
|
||||
this._live_topics.delete(topic);
|
||||
|
||||
// See if we have more work.
|
||||
if ( this._live_topics.size >= this._active_topics.size )
|
||||
return;
|
||||
|
||||
// Call sendSubscribes again after a bit.
|
||||
this._sub_failures = (this._sub_failures || 0) + 1;
|
||||
|
||||
let delay = (this._sub_failures * Math.floor(Math.random() * 10) + 2) * 1000;
|
||||
if ( delay > 60000 )
|
||||
delay = (Math.floor(Math.random() * 60) + 30) * 1000;
|
||||
|
||||
if ( delay <= 2000 )
|
||||
delay = 2000;
|
||||
|
||||
return sleep(delay).then(() => this._sendSubscribes());
|
||||
});
|
||||
}
|
||||
|
||||
_sendUnsubscribes() {
|
||||
if ( ! this._client )
|
||||
return Promise.resolve();
|
||||
|
||||
const topics = [],
|
||||
batch = new Set;
|
||||
|
||||
// iterate over a copy to support removal
|
||||
for(const topic of [...this._live_topics]) {
|
||||
if ( this._active_topics.has(topic) )
|
||||
continue;
|
||||
|
||||
// Should never be null, but be safe.
|
||||
const subtopic = this._topics.get(topic);
|
||||
if ( subtopic == null )
|
||||
continue;
|
||||
|
||||
let real_topic;
|
||||
if ( subtopic === 0 )
|
||||
real_topic = topic;
|
||||
else
|
||||
real_topic = `${topic}/s${subtopic}`;
|
||||
|
||||
topics.push(real_topic);
|
||||
batch.add(topic);
|
||||
this._live_topics.delete(topic);
|
||||
}
|
||||
|
||||
// handle incorrect topics
|
||||
for(const [topic, subtopic] of this._incorrect_topics) {
|
||||
if ( batch.has(topic) )
|
||||
continue;
|
||||
|
||||
batch.add(topic);
|
||||
if ( subtopic === 0 )
|
||||
topics.push(topic);
|
||||
else
|
||||
topics.push(`${topic}/s${subtopic}`);
|
||||
|
||||
this._live_topics.delete(topic);
|
||||
}
|
||||
|
||||
if ( ! topics.length )
|
||||
return Promise.resolve();
|
||||
|
||||
return this._client.unsubscribe({topicFilter: topics})
|
||||
.catch(error => {
|
||||
if ( this.logger )
|
||||
this.logger.warn('Received error when unsubscribing from topics:', error);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue