mirror of
https://github.com/miniflux/v2.git
synced 2025-09-15 18:57:04 +00:00
feat(storage): improve BatchBuilder
logging
This commit is contained in:
parent
a2229198ae
commit
598d4d4f51
3 changed files with 18 additions and 15 deletions
|
@ -33,15 +33,9 @@ func refreshFeeds(store *storage.Storage) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
nbJobs := len(jobs)
|
|
||||||
|
|
||||||
slog.Info("Created a batch of feeds",
|
|
||||||
slog.Int("nb_jobs", nbJobs),
|
|
||||||
slog.Int("batch_size", config.Opts.BatchSize()),
|
|
||||||
)
|
|
||||||
|
|
||||||
slog.Debug("Feed URLs in this batch", slog.Any("feed_urls", jobs.FeedURLs()))
|
slog.Debug("Feed URLs in this batch", slog.Any("feed_urls", jobs.FeedURLs()))
|
||||||
|
|
||||||
|
nbJobs := len(jobs)
|
||||||
var jobQueue = make(chan model.Job, nbJobs)
|
var jobQueue = make(chan model.Job, nbJobs)
|
||||||
|
|
||||||
slog.Info("Starting a pool of workers",
|
slog.Info("Starting a pool of workers",
|
||||||
|
|
|
@ -43,9 +43,6 @@ func feedScheduler(store *storage.Storage, pool *worker.Pool, frequency, batchSi
|
||||||
if jobs, err := batchBuilder.FetchJobs(); err != nil {
|
if jobs, err := batchBuilder.FetchJobs(); err != nil {
|
||||||
slog.Error("Unable to fetch jobs from database", slog.Any("error", err))
|
slog.Error("Unable to fetch jobs from database", slog.Any("error", err))
|
||||||
} else if len(jobs) > 0 {
|
} else if len(jobs) > 0 {
|
||||||
slog.Info("Created a batch of feeds",
|
|
||||||
slog.Int("nb_jobs", len(jobs)),
|
|
||||||
)
|
|
||||||
slog.Debug("Feed URLs in this batch", slog.Any("feed_urls", jobs.FeedURLs()))
|
slog.Debug("Feed URLs in this batch", slog.Any("feed_urls", jobs.FeedURLs()))
|
||||||
pool.Push(jobs)
|
pool.Push(jobs)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ type BatchBuilder struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
args []any
|
args []any
|
||||||
conditions []string
|
conditions []string
|
||||||
limit int
|
batchSize int
|
||||||
limitPerHost int
|
limitPerHost int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,7 +28,7 @@ func (s *Storage) NewBatchBuilder() *BatchBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BatchBuilder) WithBatchSize(batchSize int) *BatchBuilder {
|
func (b *BatchBuilder) WithBatchSize(batchSize int) *BatchBuilder {
|
||||||
b.limit = batchSize
|
b.batchSize = batchSize
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,8 +81,8 @@ func (b *BatchBuilder) FetchJobs() (model.JobList, error) {
|
||||||
|
|
||||||
query += " ORDER BY feed_url, next_check_at ASC"
|
query += " ORDER BY feed_url, next_check_at ASC"
|
||||||
|
|
||||||
if b.limit > 0 {
|
if b.batchSize > 0 {
|
||||||
query += fmt.Sprintf(" LIMIT %d", b.limit)
|
query += fmt.Sprintf(" LIMIT %d", b.batchSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := b.db.Query(query, b.args...)
|
rows, err := b.db.Query(query, b.args...)
|
||||||
|
@ -91,8 +91,10 @@ func (b *BatchBuilder) FetchJobs() (model.JobList, error) {
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
jobs := make(model.JobList, 0, b.limit)
|
jobs := make(model.JobList, 0, b.batchSize)
|
||||||
hosts := make(map[string]int)
|
hosts := make(map[string]int)
|
||||||
|
nbRows := 0
|
||||||
|
nbSkippedFeeds := 0
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var job model.Job
|
var job model.Job
|
||||||
|
@ -100,6 +102,8 @@ func (b *BatchBuilder) FetchJobs() (model.JobList, error) {
|
||||||
return nil, fmt.Errorf(`store: unable to fetch job record: %v`, err)
|
return nil, fmt.Errorf(`store: unable to fetch job record: %v`, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nbRows++
|
||||||
|
|
||||||
if b.limitPerHost > 0 {
|
if b.limitPerHost > 0 {
|
||||||
feedHostname := urllib.Domain(job.FeedURL)
|
feedHostname := urllib.Domain(job.FeedURL)
|
||||||
if hosts[feedHostname] >= b.limitPerHost {
|
if hosts[feedHostname] >= b.limitPerHost {
|
||||||
|
@ -109,6 +113,7 @@ func (b *BatchBuilder) FetchJobs() (model.JobList, error) {
|
||||||
slog.Int("limit_per_host", b.limitPerHost),
|
slog.Int("limit_per_host", b.limitPerHost),
|
||||||
slog.Int("current", hosts[feedHostname]),
|
slog.Int("current", hosts[feedHostname]),
|
||||||
)
|
)
|
||||||
|
nbSkippedFeeds++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
hosts[feedHostname]++
|
hosts[feedHostname]++
|
||||||
|
@ -121,5 +126,12 @@ func (b *BatchBuilder) FetchJobs() (model.JobList, error) {
|
||||||
return nil, fmt.Errorf(`store: error iterating on job records: %v`, err)
|
return nil, fmt.Errorf(`store: error iterating on job records: %v`, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
slog.Info("Created a batch of feeds",
|
||||||
|
slog.Int("batch_size", b.batchSize),
|
||||||
|
slog.Int("rows_count", nbRows),
|
||||||
|
slog.Int("skipped_feeds_count", nbSkippedFeeds),
|
||||||
|
slog.Int("jobs_count", len(jobs)),
|
||||||
|
)
|
||||||
|
|
||||||
return jobs, nil
|
return jobs, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue