1
0
Fork 0
mirror of https://github.com/miniflux/v2.git synced 2025-08-01 17:38:37 +00:00

Refactor Batch Builder and prevent accidental and excessive refreshes from the web ui

This commit is contained in:
Frédéric Guillot 2023-10-20 15:12:02 -07:00
parent 95ee1c423b
commit 4cc99881d8
32 changed files with 251 additions and 176 deletions

91
internal/storage/batch.go Normal file
View file

@ -0,0 +1,91 @@
// SPDX-FileCopyrightText: Copyright The Miniflux Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package storage // import "miniflux.app/v2/internal/storage"
import (
"database/sql"
"fmt"
"strings"
"miniflux.app/v2/internal/model"
)
type BatchBuilder struct {
db *sql.DB
args []any
conditions []string
limit int
}
func (s *Storage) NewBatchBuilder() *BatchBuilder {
return &BatchBuilder{
db: s.db,
}
}
func (b *BatchBuilder) WithBatchSize(batchSize int) *BatchBuilder {
b.limit = batchSize
return b
}
func (b *BatchBuilder) WithUserID(userID int64) *BatchBuilder {
b.conditions = append(b.conditions, fmt.Sprintf("user_id = $%d", len(b.args)+1))
b.args = append(b.args, userID)
return b
}
func (b *BatchBuilder) WithCategoryID(categoryID int64) *BatchBuilder {
b.conditions = append(b.conditions, fmt.Sprintf("category_id = $%d", len(b.args)+1))
b.args = append(b.args, categoryID)
return b
}
func (b *BatchBuilder) WithErrorLimit(limit int) *BatchBuilder {
if limit > 0 {
b.conditions = append(b.conditions, fmt.Sprintf("parsing_error_count < $%d", len(b.args)+1))
b.args = append(b.args, limit)
}
return b
}
func (b *BatchBuilder) WithNextCheckExpired() *BatchBuilder {
b.conditions = append(b.conditions, "next_check_at < now()")
return b
}
func (b *BatchBuilder) WithoutDisabledFeeds() *BatchBuilder {
b.conditions = append(b.conditions, "disabled is false")
return b
}
func (b *BatchBuilder) FetchJobs() (jobs model.JobList, err error) {
var parts []string
parts = append(parts, `SELECT id, user_id FROM feeds`)
if len(b.conditions) > 0 {
parts = append(parts, fmt.Sprintf("WHERE %s", strings.Join(b.conditions, " AND ")))
}
if b.limit > 0 {
parts = append(parts, fmt.Sprintf("ORDER BY next_check_at ASC LIMIT %d", b.limit))
}
query := strings.Join(parts, " ")
rows, err := b.db.Query(query, b.args...)
if err != nil {
return nil, fmt.Errorf(`store: unable to fetch batch of jobs: %v`, err)
}
defer rows.Close()
for rows.Next() {
var job model.Job
if err := rows.Scan(&job.FeedID, &job.UserID); err != nil {
return nil, fmt.Errorf(`store: unable to fetch job: %v`, err)
}
jobs = append(jobs, job)
}
return jobs, nil
}

View file

@ -87,17 +87,6 @@ func (s *Storage) CountAllFeeds() map[string]int64 {
return results
}
// CountFeeds returns the number of feeds that belongs to the given user.
func (s *Storage) CountFeeds(userID int64) int {
var result int
err := s.db.QueryRow(`SELECT count(*) FROM feeds WHERE user_id=$1`, userID).Scan(&result)
if err != nil {
return 0
}
return result
}
// CountUserFeedsWithErrors returns the number of feeds with parsing errors that belong to the given user.
func (s *Storage) CountUserFeedsWithErrors(userID int64) int {
pollingParsingErrorLimit := config.Opts.PollingParsingErrorLimit()

View file

@ -1,81 +0,0 @@
// SPDX-FileCopyrightText: Copyright The Miniflux Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package storage // import "miniflux.app/v2/internal/storage"
import (
"fmt"
"miniflux.app/v2/internal/config"
"miniflux.app/v2/internal/model"
)
// NewBatch returns a series of jobs.
func (s *Storage) NewBatch(batchSize int) (jobs model.JobList, err error) {
pollingParsingErrorLimit := config.Opts.PollingParsingErrorLimit()
query := `
SELECT
id,
user_id
FROM
feeds
WHERE
disabled is false AND next_check_at < now() AND
CASE WHEN $1 > 0 THEN parsing_error_count < $1 ELSE parsing_error_count >= 0 END
ORDER BY next_check_at ASC LIMIT $2
`
return s.fetchBatchRows(query, pollingParsingErrorLimit, batchSize)
}
// NewUserBatch returns a series of jobs but only for a given user.
func (s *Storage) NewUserBatch(userID int64, batchSize int) (jobs model.JobList, err error) {
// We do not take the error counter into consideration when the given
// user refresh manually all his feeds to force a refresh.
query := `
SELECT
id,
user_id
FROM
feeds
WHERE
user_id=$1 AND disabled is false AND next_check_at < now()
ORDER BY next_check_at ASC LIMIT %d
`
return s.fetchBatchRows(fmt.Sprintf(query, batchSize), userID)
}
// NewCategoryBatch returns a series of jobs but only for a given category.
func (s *Storage) NewCategoryBatch(userID int64, categoryID int64, batchSize int) (jobs model.JobList, err error) {
// We do not take the error counter into consideration when the given
// user refresh manually all his feeds to force a refresh.
query := `
SELECT
id,
user_id
FROM
feeds
WHERE
user_id=$1 AND category_id=$2 AND disabled is false AND next_check_at < now()
ORDER BY next_check_at ASC LIMIT %d
`
return s.fetchBatchRows(fmt.Sprintf(query, batchSize), userID, categoryID)
}
func (s *Storage) fetchBatchRows(query string, args ...interface{}) (jobs model.JobList, err error) {
rows, err := s.db.Query(query, args...)
if err != nil {
return nil, fmt.Errorf(`store: unable to fetch batch of jobs: %v`, err)
}
defer rows.Close()
for rows.Next() {
var job model.Job
if err := rows.Scan(&job.FeedID, &job.UserID); err != nil {
return nil, fmt.Errorf(`store: unable to fetch job: %v`, err)
}
jobs = append(jobs, job)
}
return jobs, nil
}