1
0
Fork 0
mirror of https://github.com/miniflux/v2.git synced 2025-09-15 18:57:04 +00:00

feat: add POLLING_LIMIT_PER_HOST to limit concurrent requests per host

Each batch of feeds sent to the worker pool is now guaranteed to contain unique feed URLs.

When `POLLING_LIMIT_PER_HOST` is set, an additional limit is applied to the number of concurrent requests per hostname, helping to prevent overloading a single server.

Note: Additional requests may still be made during feed refresh. For example, to fetch feed icons or when the web scraper is enabled for a particular feed.
This commit is contained in:
Frédéric Guillot 2025-08-08 12:19:01 -07:00
parent a4f672b589
commit 34499b887b
13 changed files with 146 additions and 34 deletions

View file

@ -143,6 +143,7 @@ func (h *handler) refreshCategory(w http.ResponseWriter, r *http.Request) {
batchBuilder.WithUserID(userID)
batchBuilder.WithCategoryID(categoryID)
batchBuilder.WithNextCheckExpired()
batchBuilder.WithLimitPerHost(config.Opts.PollingLimitPerHost())
jobs, err := batchBuilder.FetchJobs()
if err != nil {

View file

@ -76,6 +76,7 @@ func (h *handler) refreshAllFeeds(w http.ResponseWriter, r *http.Request) {
batchBuilder.WithoutDisabledFeeds()
batchBuilder.WithNextCheckExpired()
batchBuilder.WithUserID(userID)
batchBuilder.WithLimitPerHost(config.Opts.PollingLimitPerHost())
jobs, err := batchBuilder.FetchJobs()
if err != nil {

View file

@ -25,6 +25,7 @@ func refreshFeeds(store *storage.Storage) {
batchBuilder.WithErrorLimit(config.Opts.PollingParsingErrorLimit())
batchBuilder.WithoutDisabledFeeds()
batchBuilder.WithNextCheckExpired()
batchBuilder.WithLimitPerHost(config.Opts.PollingLimitPerHost())
jobs, err := batchBuilder.FetchJobs()
if err != nil {
@ -39,6 +40,8 @@ func refreshFeeds(store *storage.Storage) {
slog.Int("batch_size", config.Opts.BatchSize()),
)
slog.Debug("Feed URLs in this batch", slog.Any("feed_urls", jobs.FeedURLs()))
var jobQueue = make(chan model.Job, nbJobs)
slog.Info("Starting a pool of workers",

View file

@ -21,6 +21,7 @@ func runScheduler(store *storage.Storage, pool *worker.Pool) {
config.Opts.PollingFrequency(),
config.Opts.BatchSize(),
config.Opts.PollingParsingErrorLimit(),
config.Opts.PollingLimitPerHost(),
)
go cleanupScheduler(
@ -29,7 +30,7 @@ func runScheduler(store *storage.Storage, pool *worker.Pool) {
)
}
func feedScheduler(store *storage.Storage, pool *worker.Pool, frequency, batchSize, errorLimit int) {
func feedScheduler(store *storage.Storage, pool *worker.Pool, frequency, batchSize, errorLimit, limitPerHost int) {
for range time.Tick(time.Duration(frequency) * time.Minute) {
// Generate a batch of feeds for any user that has feeds to refresh.
batchBuilder := store.NewBatchBuilder()
@ -37,6 +38,7 @@ func feedScheduler(store *storage.Storage, pool *worker.Pool, frequency, batchSi
batchBuilder.WithErrorLimit(errorLimit)
batchBuilder.WithoutDisabledFeeds()
batchBuilder.WithNextCheckExpired()
batchBuilder.WithLimitPerHost(limitPerHost)
if jobs, err := batchBuilder.FetchJobs(); err != nil {
slog.Error("Unable to fetch jobs from database", slog.Any("error", err))
@ -44,6 +46,7 @@ func feedScheduler(store *storage.Storage, pool *worker.Pool, frequency, batchSi
slog.Info("Created a batch of feeds",
slog.Int("nb_jobs", len(jobs)),
)
slog.Debug("Feed URLs in this batch", slog.Any("feed_urls", jobs.FeedURLs()))
pool.Push(jobs)
}
}

View file

@ -2104,3 +2104,36 @@ func TestInvalidHTTPClientProxy(t *testing.T) {
t.Fatalf(`Expected error for invalid HTTP_CLIENT_PROXY value, but got none`)
}
}
func TestDefaultPollingLimitPerHost(t *testing.T) {
os.Clearenv()
parser := NewParser()
opts, err := parser.ParseEnvironmentVariables()
if err != nil {
t.Fatalf(`Parsing failure: %v`, err)
}
expected := 0
result := opts.PollingLimitPerHost()
if result != expected {
t.Fatalf(`Unexpected default PollingLimitPerHost value, got %v instead of %v`, result, expected)
}
}
func TestCustomPollingLimitPerHost(t *testing.T) {
os.Clearenv()
os.Setenv("POLLING_LIMIT_PER_HOST", "10")
parser := NewParser()
opts, err := parser.ParseEnvironmentVariables()
if err != nil {
t.Fatalf(`Parsing failure: %v`, err)
}
expected := 10
result := opts.PollingLimitPerHost()
if result != expected {
t.Fatalf(`Unexpected custom PollingLimitPerHost value, got %v instead of %v`, result, expected)
}
}

View file

@ -130,16 +130,17 @@ type options struct {
cleanupArchiveUnreadDays int
cleanupArchiveBatchSize int
cleanupRemoveSessionsDays int
pollingFrequency int
forceRefreshInterval int
batchSize int
pollingScheduler string
schedulerEntryFrequencyMinInterval int
schedulerEntryFrequencyMaxInterval int
schedulerEntryFrequencyFactor int
schedulerRoundRobinMinInterval int
schedulerRoundRobinMaxInterval int
pollingFrequency int
pollingLimitPerHost int
pollingParsingErrorLimit int
pollingScheduler string
workerPoolSize int
createAdmin bool
adminUsername string
@ -390,11 +391,6 @@ func (o *options) WorkerPoolSize() int {
return o.workerPoolSize
}
// PollingFrequency returns the interval to refresh feeds in the background.
func (o *options) PollingFrequency() int {
return o.pollingFrequency
}
// ForceRefreshInterval returns the force refresh interval
func (o *options) ForceRefreshInterval() int {
return o.forceRefreshInterval
@ -405,6 +401,22 @@ func (o *options) BatchSize() int {
return o.batchSize
}
// PollingFrequency returns the interval to refresh feeds in the background.
func (o *options) PollingFrequency() int {
return o.pollingFrequency
}
// PollingLimitPerHost returns the limit of concurrent requests per host.
// Set to zero to disable.
func (o *options) PollingLimitPerHost() int {
return o.pollingLimitPerHost
}
// PollingParsingErrorLimit returns the limit of errors when to stop polling.
func (o *options) PollingParsingErrorLimit() int {
return o.pollingParsingErrorLimit
}
// PollingScheduler returns the scheduler used for polling feeds.
func (o *options) PollingScheduler() string {
return o.pollingScheduler
@ -433,11 +445,6 @@ func (o *options) SchedulerRoundRobinMaxInterval() int {
return o.schedulerRoundRobinMaxInterval
}
// PollingParsingErrorLimit returns the limit of errors when to stop polling.
func (o *options) PollingParsingErrorLimit() int {
return o.pollingParsingErrorLimit
}
// IsOAuth2UserCreationAllowed returns true if user creation is allowed for OAuth2 users.
func (o *options) IsOAuth2UserCreationAllowed() bool {
return o.oauth2UserCreationAllowed
@ -762,8 +769,9 @@ func (o *options) SortedOptions(redactSecret bool) []*option {
"OAUTH2_REDIRECT_URL": o.oauth2RedirectURL,
"OAUTH2_USER_CREATION": o.oauth2UserCreationAllowed,
"DISABLE_LOCAL_AUTH": o.disableLocalAuth,
"POLLING_FREQUENCY": o.pollingFrequency,
"FORCE_REFRESH_INTERVAL": o.forceRefreshInterval,
"POLLING_FREQUENCY": o.pollingFrequency,
"POLLING_LIMIT_PER_HOST": o.pollingLimitPerHost,
"POLLING_PARSING_ERROR_LIMIT": o.pollingParsingErrorLimit,
"POLLING_SCHEDULER": o.pollingScheduler,
"MEDIA_PROXY_HTTP_CLIENT_TIMEOUT": o.mediaProxyHTTPClientTimeout,

View file

@ -137,12 +137,16 @@ func (p *parser) parseLines(lines []string) (err error) {
p.opts.cleanupRemoveSessionsDays = parseInt(value, defaultCleanupRemoveSessionsDays)
case "WORKER_POOL_SIZE":
p.opts.workerPoolSize = parseInt(value, defaultWorkerPoolSize)
case "POLLING_FREQUENCY":
p.opts.pollingFrequency = parseInt(value, defaultPollingFrequency)
case "FORCE_REFRESH_INTERVAL":
p.opts.forceRefreshInterval = parseInt(value, defaultForceRefreshInterval)
case "BATCH_SIZE":
p.opts.batchSize = parseInt(value, defaultBatchSize)
case "POLLING_FREQUENCY":
p.opts.pollingFrequency = parseInt(value, defaultPollingFrequency)
case "POLLING_LIMIT_PER_HOST":
p.opts.pollingLimitPerHost = parseInt(value, 0)
case "POLLING_PARSING_ERROR_LIMIT":
p.opts.pollingParsingErrorLimit = parseInt(value, defaultPollingParsingErrorLimit)
case "POLLING_SCHEDULER":
p.opts.pollingScheduler = strings.ToLower(parseString(value, defaultPollingScheduler))
case "SCHEDULER_ENTRY_FREQUENCY_MAX_INTERVAL":
@ -155,8 +159,6 @@ func (p *parser) parseLines(lines []string) (err error) {
p.opts.schedulerRoundRobinMinInterval = parseInt(value, defaultSchedulerRoundRobinMinInterval)
case "SCHEDULER_ROUND_ROBIN_MAX_INTERVAL":
p.opts.schedulerRoundRobinMaxInterval = parseInt(value, defaultSchedulerRoundRobinMaxInterval)
case "POLLING_PARSING_ERROR_LIMIT":
p.opts.pollingParsingErrorLimit = parseInt(value, defaultPollingParsingErrorLimit)
case "MEDIA_PROXY_HTTP_CLIENT_TIMEOUT":
p.opts.mediaProxyHTTPClientTimeout = parseInt(value, defaultMediaProxyHTTPClientTimeout)
case "MEDIA_PROXY_MODE":

View file

@ -5,9 +5,20 @@ package model // import "miniflux.app/v2/internal/model"
// Job represents a payload sent to the processing queue.
type Job struct {
UserID int64
FeedID int64
UserID int64
FeedID int64
FeedURL string
}
// JobList represents a list of jobs.
type JobList []Job
// FeedURLs returns a list of feed URLs from the job list.
// This is useful for logging or debugging purposes to see which feeds are being processed.
func (jl *JobList) FeedURLs() []string {
feedURLs := make([]string, len(*jl))
for i, job := range *jl {
feedURLs[i] = job.FeedURL
}
return feedURLs
}

View file

@ -6,16 +6,19 @@ package storage // import "miniflux.app/v2/internal/storage"
import (
"database/sql"
"fmt"
"log/slog"
"strings"
"miniflux.app/v2/internal/model"
"miniflux.app/v2/internal/urllib"
)
type BatchBuilder struct {
db *sql.DB
args []any
conditions []string
limit int
db *sql.DB
args []any
conditions []string
limit int
limitPerHost int
}
func (s *Storage) NewBatchBuilder() *BatchBuilder {
@ -59,15 +62,27 @@ func (b *BatchBuilder) WithoutDisabledFeeds() *BatchBuilder {
return b
}
func (b *BatchBuilder) WithLimitPerHost(limit int) *BatchBuilder {
if limit > 0 {
b.limitPerHost = limit
}
return b
}
// FetchJobs retrieves a batch of jobs based on the conditions set in the builder.
// It ensures that each job is unique by feed URL to avoid making too many concurrent requests to the same website.
// When limitPerHost is set, it limits the number of jobs per feed hostname to prevent overwhelming a single host.
func (b *BatchBuilder) FetchJobs() (model.JobList, error) {
query := `SELECT id, user_id FROM feeds`
query := `SELECT DISTINCT ON (feed_url) id, user_id, feed_url FROM feeds`
if len(b.conditions) > 0 {
query += " WHERE " + strings.Join(b.conditions, " AND ")
}
query += " ORDER BY feed_url, next_check_at ASC"
if b.limit > 0 {
query += fmt.Sprintf(" ORDER BY next_check_at ASC LIMIT %d", b.limit)
query += fmt.Sprintf(" LIMIT %d", b.limit)
}
rows, err := b.db.Query(query, b.args...)
@ -77,15 +92,34 @@ func (b *BatchBuilder) FetchJobs() (model.JobList, error) {
defer rows.Close()
jobs := make(model.JobList, 0, b.limit)
hosts := make(map[string]int)
for rows.Next() {
var job model.Job
if err := rows.Scan(&job.FeedID, &job.UserID); err != nil {
return nil, fmt.Errorf(`store: unable to fetch job: %v`, err)
if err := rows.Scan(&job.FeedID, &job.UserID, &job.FeedURL); err != nil {
return nil, fmt.Errorf(`store: unable to fetch job record: %v`, err)
}
if b.limitPerHost > 0 {
feedHostname := urllib.Domain(job.FeedURL)
if hosts[feedHostname] >= b.limitPerHost {
slog.Debug("Feed host limit reached for this batch",
slog.String("feed_url", job.FeedURL),
slog.String("feed_hostname", feedHostname),
slog.Int("limit_per_host", b.limitPerHost),
slog.Int("current", hosts[feedHostname]),
)
continue
}
hosts[feedHostname]++
}
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf(`store: error iterating on job records: %v`, err)
}
return jobs, nil
}

View file

@ -43,6 +43,7 @@ func (h *handler) refreshCategory(w http.ResponseWriter, r *http.Request) int64
batchBuilder.WithoutDisabledFeeds()
batchBuilder.WithUserID(userID)
batchBuilder.WithCategoryID(categoryID)
batchBuilder.WithLimitPerHost(config.Opts.PollingLimitPerHost())
jobs, err := batchBuilder.FetchJobs()
if err != nil {

View file

@ -47,6 +47,7 @@ func (h *handler) refreshAllFeeds(w http.ResponseWriter, r *http.Request) {
batchBuilder := h.store.NewBatchBuilder()
batchBuilder.WithoutDisabledFeeds()
batchBuilder.WithUserID(userID)
batchBuilder.WithLimitPerHost(config.Opts.PollingLimitPerHost())
jobs, err := batchBuilder.FetchJobs()
if err != nil {

View file

@ -32,6 +32,7 @@ func (w *worker) Run(c <-chan model.Job) {
slog.Int("worker_id", w.id),
slog.Int64("user_id", job.UserID),
slog.Int64("feed_id", job.FeedID),
slog.String("feed_url", job.FeedURL),
)
startTime := time.Now()