diff --git a/internal/cli/refresh_feeds.go b/internal/cli/refresh_feeds.go index a0834232..0a594d96 100644 --- a/internal/cli/refresh_feeds.go +++ b/internal/cli/refresh_feeds.go @@ -33,15 +33,9 @@ func refreshFeeds(store *storage.Storage) { return } - nbJobs := len(jobs) - - slog.Info("Created a batch of feeds", - slog.Int("nb_jobs", nbJobs), - slog.Int("batch_size", config.Opts.BatchSize()), - ) - slog.Debug("Feed URLs in this batch", slog.Any("feed_urls", jobs.FeedURLs())) + nbJobs := len(jobs) var jobQueue = make(chan model.Job, nbJobs) slog.Info("Starting a pool of workers", diff --git a/internal/cli/scheduler.go b/internal/cli/scheduler.go index ee2b2d00..a0ab9ad5 100644 --- a/internal/cli/scheduler.go +++ b/internal/cli/scheduler.go @@ -43,9 +43,6 @@ func feedScheduler(store *storage.Storage, pool *worker.Pool, frequency, batchSi if jobs, err := batchBuilder.FetchJobs(); err != nil { slog.Error("Unable to fetch jobs from database", slog.Any("error", err)) } else if len(jobs) > 0 { - slog.Info("Created a batch of feeds", - slog.Int("nb_jobs", len(jobs)), - ) slog.Debug("Feed URLs in this batch", slog.Any("feed_urls", jobs.FeedURLs())) pool.Push(jobs) } diff --git a/internal/storage/batch.go b/internal/storage/batch.go index 56f1d771..2a720cb7 100644 --- a/internal/storage/batch.go +++ b/internal/storage/batch.go @@ -17,7 +17,7 @@ type BatchBuilder struct { db *sql.DB args []any conditions []string - limit int + batchSize int limitPerHost int } @@ -28,7 +28,7 @@ func (s *Storage) NewBatchBuilder() *BatchBuilder { } func (b *BatchBuilder) WithBatchSize(batchSize int) *BatchBuilder { - b.limit = batchSize + b.batchSize = batchSize return b } @@ -81,8 +81,8 @@ func (b *BatchBuilder) FetchJobs() (model.JobList, error) { query += " ORDER BY feed_url, next_check_at ASC" - if b.limit > 0 { - query += fmt.Sprintf(" LIMIT %d", b.limit) + if b.batchSize > 0 { + query += fmt.Sprintf(" LIMIT %d", b.batchSize) } rows, err := b.db.Query(query, b.args...) @@ -91,8 +91,10 @@ func (b *BatchBuilder) FetchJobs() (model.JobList, error) { } defer rows.Close() - jobs := make(model.JobList, 0, b.limit) + jobs := make(model.JobList, 0, b.batchSize) hosts := make(map[string]int) + nbRows := 0 + nbSkippedFeeds := 0 for rows.Next() { var job model.Job @@ -100,6 +102,8 @@ func (b *BatchBuilder) FetchJobs() (model.JobList, error) { return nil, fmt.Errorf(`store: unable to fetch job record: %v`, err) } + nbRows++ + if b.limitPerHost > 0 { feedHostname := urllib.Domain(job.FeedURL) if hosts[feedHostname] >= b.limitPerHost { @@ -109,6 +113,7 @@ func (b *BatchBuilder) FetchJobs() (model.JobList, error) { slog.Int("limit_per_host", b.limitPerHost), slog.Int("current", hosts[feedHostname]), ) + nbSkippedFeeds++ continue } hosts[feedHostname]++ @@ -121,5 +126,12 @@ func (b *BatchBuilder) FetchJobs() (model.JobList, error) { return nil, fmt.Errorf(`store: error iterating on job records: %v`, err) } + slog.Info("Created a batch of feeds", + slog.Int("batch_size", b.batchSize), + slog.Int("rows_count", nbRows), + slog.Int("skipped_feeds_count", nbSkippedFeeds), + slog.Int("jobs_count", len(jobs)), + ) + return jobs, nil }