diff --git a/internal/api/category.go b/internal/api/category.go index 3ae2cdc4..ae543ab7 100644 --- a/internal/api/category.go +++ b/internal/api/category.go @@ -143,6 +143,7 @@ func (h *handler) refreshCategory(w http.ResponseWriter, r *http.Request) { batchBuilder.WithUserID(userID) batchBuilder.WithCategoryID(categoryID) batchBuilder.WithNextCheckExpired() + batchBuilder.WithLimitPerHost(config.Opts.PollingLimitPerHost()) jobs, err := batchBuilder.FetchJobs() if err != nil { diff --git a/internal/api/feed.go b/internal/api/feed.go index 26ce4d50..89b7d249 100644 --- a/internal/api/feed.go +++ b/internal/api/feed.go @@ -76,6 +76,7 @@ func (h *handler) refreshAllFeeds(w http.ResponseWriter, r *http.Request) { batchBuilder.WithoutDisabledFeeds() batchBuilder.WithNextCheckExpired() batchBuilder.WithUserID(userID) + batchBuilder.WithLimitPerHost(config.Opts.PollingLimitPerHost()) jobs, err := batchBuilder.FetchJobs() if err != nil { diff --git a/internal/cli/refresh_feeds.go b/internal/cli/refresh_feeds.go index fa7e3a0e..a0834232 100644 --- a/internal/cli/refresh_feeds.go +++ b/internal/cli/refresh_feeds.go @@ -25,6 +25,7 @@ func refreshFeeds(store *storage.Storage) { batchBuilder.WithErrorLimit(config.Opts.PollingParsingErrorLimit()) batchBuilder.WithoutDisabledFeeds() batchBuilder.WithNextCheckExpired() + batchBuilder.WithLimitPerHost(config.Opts.PollingLimitPerHost()) jobs, err := batchBuilder.FetchJobs() if err != nil { @@ -39,6 +40,8 @@ func refreshFeeds(store *storage.Storage) { slog.Int("batch_size", config.Opts.BatchSize()), ) + slog.Debug("Feed URLs in this batch", slog.Any("feed_urls", jobs.FeedURLs())) + var jobQueue = make(chan model.Job, nbJobs) slog.Info("Starting a pool of workers", diff --git a/internal/cli/scheduler.go b/internal/cli/scheduler.go index de838e16..ee2b2d00 100644 --- a/internal/cli/scheduler.go +++ b/internal/cli/scheduler.go @@ -21,6 +21,7 @@ func runScheduler(store *storage.Storage, pool *worker.Pool) { config.Opts.PollingFrequency(), config.Opts.BatchSize(), config.Opts.PollingParsingErrorLimit(), + config.Opts.PollingLimitPerHost(), ) go cleanupScheduler( @@ -29,7 +30,7 @@ func runScheduler(store *storage.Storage, pool *worker.Pool) { ) } -func feedScheduler(store *storage.Storage, pool *worker.Pool, frequency, batchSize, errorLimit int) { +func feedScheduler(store *storage.Storage, pool *worker.Pool, frequency, batchSize, errorLimit, limitPerHost int) { for range time.Tick(time.Duration(frequency) * time.Minute) { // Generate a batch of feeds for any user that has feeds to refresh. batchBuilder := store.NewBatchBuilder() @@ -37,6 +38,7 @@ func feedScheduler(store *storage.Storage, pool *worker.Pool, frequency, batchSi batchBuilder.WithErrorLimit(errorLimit) batchBuilder.WithoutDisabledFeeds() batchBuilder.WithNextCheckExpired() + batchBuilder.WithLimitPerHost(limitPerHost) if jobs, err := batchBuilder.FetchJobs(); err != nil { slog.Error("Unable to fetch jobs from database", slog.Any("error", err)) @@ -44,6 +46,7 @@ func feedScheduler(store *storage.Storage, pool *worker.Pool, frequency, batchSi slog.Info("Created a batch of feeds", slog.Int("nb_jobs", len(jobs)), ) + slog.Debug("Feed URLs in this batch", slog.Any("feed_urls", jobs.FeedURLs())) pool.Push(jobs) } } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 890359f0..a3e55a71 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -2104,3 +2104,36 @@ func TestInvalidHTTPClientProxy(t *testing.T) { t.Fatalf(`Expected error for invalid HTTP_CLIENT_PROXY value, but got none`) } } + +func TestDefaultPollingLimitPerHost(t *testing.T) { + os.Clearenv() + + parser := NewParser() + opts, err := parser.ParseEnvironmentVariables() + if err != nil { + t.Fatalf(`Parsing failure: %v`, err) + } + + expected := 0 + result := opts.PollingLimitPerHost() + if result != expected { + t.Fatalf(`Unexpected default PollingLimitPerHost value, got %v instead of %v`, result, expected) + } +} + +func TestCustomPollingLimitPerHost(t *testing.T) { + os.Clearenv() + os.Setenv("POLLING_LIMIT_PER_HOST", "10") + + parser := NewParser() + opts, err := parser.ParseEnvironmentVariables() + if err != nil { + t.Fatalf(`Parsing failure: %v`, err) + } + + expected := 10 + result := opts.PollingLimitPerHost() + if result != expected { + t.Fatalf(`Unexpected custom PollingLimitPerHost value, got %v instead of %v`, result, expected) + } +} diff --git a/internal/config/options.go b/internal/config/options.go index 4563dc1c..b57f3128 100644 --- a/internal/config/options.go +++ b/internal/config/options.go @@ -130,16 +130,17 @@ type options struct { cleanupArchiveUnreadDays int cleanupArchiveBatchSize int cleanupRemoveSessionsDays int - pollingFrequency int forceRefreshInterval int batchSize int - pollingScheduler string schedulerEntryFrequencyMinInterval int schedulerEntryFrequencyMaxInterval int schedulerEntryFrequencyFactor int schedulerRoundRobinMinInterval int schedulerRoundRobinMaxInterval int + pollingFrequency int + pollingLimitPerHost int pollingParsingErrorLimit int + pollingScheduler string workerPoolSize int createAdmin bool adminUsername string @@ -390,11 +391,6 @@ func (o *options) WorkerPoolSize() int { return o.workerPoolSize } -// PollingFrequency returns the interval to refresh feeds in the background. -func (o *options) PollingFrequency() int { - return o.pollingFrequency -} - // ForceRefreshInterval returns the force refresh interval func (o *options) ForceRefreshInterval() int { return o.forceRefreshInterval @@ -405,6 +401,22 @@ func (o *options) BatchSize() int { return o.batchSize } +// PollingFrequency returns the interval to refresh feeds in the background. +func (o *options) PollingFrequency() int { + return o.pollingFrequency +} + +// PollingLimitPerHost returns the limit of concurrent requests per host. +// Set to zero to disable. +func (o *options) PollingLimitPerHost() int { + return o.pollingLimitPerHost +} + +// PollingParsingErrorLimit returns the limit of errors when to stop polling. +func (o *options) PollingParsingErrorLimit() int { + return o.pollingParsingErrorLimit +} + // PollingScheduler returns the scheduler used for polling feeds. func (o *options) PollingScheduler() string { return o.pollingScheduler @@ -433,11 +445,6 @@ func (o *options) SchedulerRoundRobinMaxInterval() int { return o.schedulerRoundRobinMaxInterval } -// PollingParsingErrorLimit returns the limit of errors when to stop polling. -func (o *options) PollingParsingErrorLimit() int { - return o.pollingParsingErrorLimit -} - // IsOAuth2UserCreationAllowed returns true if user creation is allowed for OAuth2 users. func (o *options) IsOAuth2UserCreationAllowed() bool { return o.oauth2UserCreationAllowed @@ -762,8 +769,9 @@ func (o *options) SortedOptions(redactSecret bool) []*option { "OAUTH2_REDIRECT_URL": o.oauth2RedirectURL, "OAUTH2_USER_CREATION": o.oauth2UserCreationAllowed, "DISABLE_LOCAL_AUTH": o.disableLocalAuth, - "POLLING_FREQUENCY": o.pollingFrequency, "FORCE_REFRESH_INTERVAL": o.forceRefreshInterval, + "POLLING_FREQUENCY": o.pollingFrequency, + "POLLING_LIMIT_PER_HOST": o.pollingLimitPerHost, "POLLING_PARSING_ERROR_LIMIT": o.pollingParsingErrorLimit, "POLLING_SCHEDULER": o.pollingScheduler, "MEDIA_PROXY_HTTP_CLIENT_TIMEOUT": o.mediaProxyHTTPClientTimeout, diff --git a/internal/config/parser.go b/internal/config/parser.go index 14b01dae..a1afd65f 100644 --- a/internal/config/parser.go +++ b/internal/config/parser.go @@ -137,12 +137,16 @@ func (p *parser) parseLines(lines []string) (err error) { p.opts.cleanupRemoveSessionsDays = parseInt(value, defaultCleanupRemoveSessionsDays) case "WORKER_POOL_SIZE": p.opts.workerPoolSize = parseInt(value, defaultWorkerPoolSize) - case "POLLING_FREQUENCY": - p.opts.pollingFrequency = parseInt(value, defaultPollingFrequency) case "FORCE_REFRESH_INTERVAL": p.opts.forceRefreshInterval = parseInt(value, defaultForceRefreshInterval) case "BATCH_SIZE": p.opts.batchSize = parseInt(value, defaultBatchSize) + case "POLLING_FREQUENCY": + p.opts.pollingFrequency = parseInt(value, defaultPollingFrequency) + case "POLLING_LIMIT_PER_HOST": + p.opts.pollingLimitPerHost = parseInt(value, 0) + case "POLLING_PARSING_ERROR_LIMIT": + p.opts.pollingParsingErrorLimit = parseInt(value, defaultPollingParsingErrorLimit) case "POLLING_SCHEDULER": p.opts.pollingScheduler = strings.ToLower(parseString(value, defaultPollingScheduler)) case "SCHEDULER_ENTRY_FREQUENCY_MAX_INTERVAL": @@ -155,8 +159,6 @@ func (p *parser) parseLines(lines []string) (err error) { p.opts.schedulerRoundRobinMinInterval = parseInt(value, defaultSchedulerRoundRobinMinInterval) case "SCHEDULER_ROUND_ROBIN_MAX_INTERVAL": p.opts.schedulerRoundRobinMaxInterval = parseInt(value, defaultSchedulerRoundRobinMaxInterval) - case "POLLING_PARSING_ERROR_LIMIT": - p.opts.pollingParsingErrorLimit = parseInt(value, defaultPollingParsingErrorLimit) case "MEDIA_PROXY_HTTP_CLIENT_TIMEOUT": p.opts.mediaProxyHTTPClientTimeout = parseInt(value, defaultMediaProxyHTTPClientTimeout) case "MEDIA_PROXY_MODE": diff --git a/internal/model/job.go b/internal/model/job.go index 39bcc222..d7a82d75 100644 --- a/internal/model/job.go +++ b/internal/model/job.go @@ -5,9 +5,20 @@ package model // import "miniflux.app/v2/internal/model" // Job represents a payload sent to the processing queue. type Job struct { - UserID int64 - FeedID int64 + UserID int64 + FeedID int64 + FeedURL string } // JobList represents a list of jobs. type JobList []Job + +// FeedURLs returns a list of feed URLs from the job list. +// This is useful for logging or debugging purposes to see which feeds are being processed. +func (jl *JobList) FeedURLs() []string { + feedURLs := make([]string, len(*jl)) + for i, job := range *jl { + feedURLs[i] = job.FeedURL + } + return feedURLs +} diff --git a/internal/storage/batch.go b/internal/storage/batch.go index be7e43c0..56f1d771 100644 --- a/internal/storage/batch.go +++ b/internal/storage/batch.go @@ -6,16 +6,19 @@ package storage // import "miniflux.app/v2/internal/storage" import ( "database/sql" "fmt" + "log/slog" "strings" "miniflux.app/v2/internal/model" + "miniflux.app/v2/internal/urllib" ) type BatchBuilder struct { - db *sql.DB - args []any - conditions []string - limit int + db *sql.DB + args []any + conditions []string + limit int + limitPerHost int } func (s *Storage) NewBatchBuilder() *BatchBuilder { @@ -59,15 +62,27 @@ func (b *BatchBuilder) WithoutDisabledFeeds() *BatchBuilder { return b } +func (b *BatchBuilder) WithLimitPerHost(limit int) *BatchBuilder { + if limit > 0 { + b.limitPerHost = limit + } + return b +} + +// FetchJobs retrieves a batch of jobs based on the conditions set in the builder. +// It ensures that each job is unique by feed URL to avoid making too many concurrent requests to the same website. +// When limitPerHost is set, it limits the number of jobs per feed hostname to prevent overwhelming a single host. func (b *BatchBuilder) FetchJobs() (model.JobList, error) { - query := `SELECT id, user_id FROM feeds` + query := `SELECT DISTINCT ON (feed_url) id, user_id, feed_url FROM feeds` if len(b.conditions) > 0 { query += " WHERE " + strings.Join(b.conditions, " AND ") } + query += " ORDER BY feed_url, next_check_at ASC" + if b.limit > 0 { - query += fmt.Sprintf(" ORDER BY next_check_at ASC LIMIT %d", b.limit) + query += fmt.Sprintf(" LIMIT %d", b.limit) } rows, err := b.db.Query(query, b.args...) @@ -77,15 +92,34 @@ func (b *BatchBuilder) FetchJobs() (model.JobList, error) { defer rows.Close() jobs := make(model.JobList, 0, b.limit) + hosts := make(map[string]int) for rows.Next() { var job model.Job - if err := rows.Scan(&job.FeedID, &job.UserID); err != nil { - return nil, fmt.Errorf(`store: unable to fetch job: %v`, err) + if err := rows.Scan(&job.FeedID, &job.UserID, &job.FeedURL); err != nil { + return nil, fmt.Errorf(`store: unable to fetch job record: %v`, err) + } + + if b.limitPerHost > 0 { + feedHostname := urllib.Domain(job.FeedURL) + if hosts[feedHostname] >= b.limitPerHost { + slog.Debug("Feed host limit reached for this batch", + slog.String("feed_url", job.FeedURL), + slog.String("feed_hostname", feedHostname), + slog.Int("limit_per_host", b.limitPerHost), + slog.Int("current", hosts[feedHostname]), + ) + continue + } + hosts[feedHostname]++ } jobs = append(jobs, job) } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf(`store: error iterating on job records: %v`, err) + } + return jobs, nil } diff --git a/internal/ui/category_refresh.go b/internal/ui/category_refresh.go index c0ca39b8..6eae71ab 100644 --- a/internal/ui/category_refresh.go +++ b/internal/ui/category_refresh.go @@ -43,6 +43,7 @@ func (h *handler) refreshCategory(w http.ResponseWriter, r *http.Request) int64 batchBuilder.WithoutDisabledFeeds() batchBuilder.WithUserID(userID) batchBuilder.WithCategoryID(categoryID) + batchBuilder.WithLimitPerHost(config.Opts.PollingLimitPerHost()) jobs, err := batchBuilder.FetchJobs() if err != nil { diff --git a/internal/ui/feed_refresh.go b/internal/ui/feed_refresh.go index 861f4f86..ca8f2c1a 100644 --- a/internal/ui/feed_refresh.go +++ b/internal/ui/feed_refresh.go @@ -47,6 +47,7 @@ func (h *handler) refreshAllFeeds(w http.ResponseWriter, r *http.Request) { batchBuilder := h.store.NewBatchBuilder() batchBuilder.WithoutDisabledFeeds() batchBuilder.WithUserID(userID) + batchBuilder.WithLimitPerHost(config.Opts.PollingLimitPerHost()) jobs, err := batchBuilder.FetchJobs() if err != nil { diff --git a/internal/worker/worker.go b/internal/worker/worker.go index ad7f00dd..c6094316 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -32,6 +32,7 @@ func (w *worker) Run(c <-chan model.Job) { slog.Int("worker_id", w.id), slog.Int64("user_id", job.UserID), slog.Int64("feed_id", job.FeedID), + slog.String("feed_url", job.FeedURL), ) startTime := time.Now() diff --git a/miniflux.1 b/miniflux.1 index 13f7c5f3..3538128c 100644 --- a/miniflux.1 +++ b/miniflux.1 @@ -1,5 +1,5 @@ .\" Manpage for miniflux. -.TH "MINIFLUX" "1" "June 23, 2025" "\ \&" "\ \&" +.TH "MINIFLUX" "1" "August 8, 2025" "\ \&" "\ \&" .SH NAME miniflux \- Minimalist and opinionated feed reader @@ -490,19 +490,32 @@ Refresh interval in minutes for feeds\&. .br Default is 60 minutes\&. .TP +.B POLLING_LIMIT_PER_HOST +Limits the number of concurrent requests to the same hostname when polling feeds. +.br +This helps prevent overwhelming a single server during batch processing by the worker pool. +.br +Default is 0 (disabled)\&. +.TP .B POLLING_PARSING_ERROR_LIMIT -The maximum number of parsing errors that the program will try before stopping polling a feed. Once the limit is reached, the user must refresh the feed manually. Set to 0 for unlimited. +The maximum number of parsing errors that the program will try before stopping polling a feed. +.br +Once the limit is reached, the user must refresh the feed manually. Set to 0 for unlimited. .br Default is 3\&. .TP .B POLLING_SCHEDULER -Scheduler used for polling feeds. Possible values are "round_robin" or "entry_frequency"\&. +Determines the strategy used to schedule feed polling. .br -The maximum number of feeds polled for a given period is subject to POLLING_FREQUENCY and BATCH_SIZE\&. +Supported values are "round_robin" and "entry_frequency". .br -When "entry_frequency" is selected, the refresh interval for a given feed is equal to the average updating interval of the last week of the feed\&. +- "round_robin": Feeds are polled in a fixed, rotating order. .br -The actual number of feeds polled will not exceed the maximum number of feeds that could be polled for a given period\&. +- "entry_frequency": The polling interval for each feed is based on the average update frequency over the past week. +.br +The number of feeds polled in a given period is limited by the POLLING_FREQUENCY and BATCH_SIZE settings. +.br +Regardless of the scheduler used, the total number of polled feeds will not exceed the maximum allowed per polling cycle. .br Default is "round_robin"\&. .TP