Integrated, working.

This integrates the liteq, and it prevents duplicates in a way that
matches my use-case.

I might try and push things back out to a separate module, but for now,
this will do.
This commit is contained in:
Matt Jadud
2025-11-30 18:01:35 -05:00
parent d212a354fe
commit 06cdc68be7
13 changed files with 70 additions and 41 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
*.db*
*.sqlite*

View File

@@ -3,4 +3,5 @@ test: generate
generate: generate:
cd internal/domain64 ; make generate cd internal/domain64 ; make generate
cd internal/liteq ; sqlc generate cd internal/liteq ; sqlc generate
cd internal/liteq ; make schema-const

View File

@@ -12,9 +12,11 @@ import (
_ "modernc.org/sqlite" _ "modernc.org/sqlite"
liteq "git.jadud.com/grosbeak/internal/liteq" liteq "git.jadud.com/jadudm/grosbeak/internal/liteq"
) )
type queueWorker func(ctx context.Context, job *liteq.Job) error
func Fetch(ctx context.Context, job *liteq.Job) error { func Fetch(ctx context.Context, job *liteq.Job) error {
n := rand.Intn(50) n := rand.Intn(50)
@@ -23,38 +25,48 @@ func Fetch(ctx context.Context, job *liteq.Job) error {
return nil return nil
} }
func runWorkers(queue *liteq.JobQueue) { func runQ(queue *liteq.JobQueue, queueName string, worker queueWorker) {
go queue.Consume(context.Background(), liteq.ConsumeParams{ for {
Queue: "fetch", err := queue.Consume(context.Background(), liteq.ConsumeParams{
PoolSize: 3, Queue: queueName,
VisibilityTimeout: 200000, PoolSize: 3,
Worker: Fetch, VisibilityTimeout: 20,
}) Worker: worker,
})
if err != nil {
log.Printf("runQ/%s: %w", queueName, err.Error())
time.Sleep(2 * time.Second)
}
time.Sleep(1 * time.Second)
}
} }
func Entre(queue *liteq.JobQueue, chUrl <-chan string) { func Entre(queue *liteq.JobQueue, chUrl <-chan string) {
ctx := context.Background() ctx := context.Background()
for { for {
url := <-chUrl url := <-chUrl
log.Println("Entre", url)
// Don't duplicate jobs on the same day of the year.
n := time.Now() n := time.Now()
queue.QueueJob(ctx, liteq.QueueJobParams{ ignore_tag := fmt.Sprintf("%s:%d:%d", url, n.Year(), n.YearDay())
log.Println("entre", url, ignore_tag)
// Don't duplicate jobs on the same day of the year.
err := queue.QueueJob(ctx, liteq.QueueJobParams{
Queue: "fetch", Queue: "fetch",
// This only works for things in the `queued` state // This only works for things in the `queued` state
DedupingKey: liteq.IgnoreDuplicate(fmt.Sprintf("%s:%d:%d", url, n.Year(), n.YearDay())), DedupingKey: liteq.IgnoreDuplicate(ignore_tag),
Job: url, Job: url,
}) })
if err != nil {
log.Println("entre err", err.Error())
}
} }
} }
func main() { func setupLiteQ() *liteq.JobQueue {
// Don't let `main()` exit
wg := &sync.WaitGroup{}
wg.Add(1)
// FIXME: This path needs to come from the env. // FIXME: This path needs to come from the env.
liteqDB, err := sql.Open("sqlite", "liteq.db") liteqDB, err := sql.Open("sqlite", "liteq.db")
liteqDB.SetMaxOpenConns(1)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
@@ -63,22 +75,36 @@ func main() {
queue := liteq.New(liteqDB) queue := liteq.New(liteqDB)
// The queue processes as long as this context is not cancelled. // The queue processes as long as this context is not cancelled.
log.Println("Setting up workers...") log.Println("Setting up worker queues...")
go runWorkers(queue) queues := []struct {
queueName string
worker queueWorker
}{
{"fetch", Fetch},
}
for _, q := range queues {
go runQ(queue, q.queueName, q.worker)
}
return queue
}
func main() {
// Don't let `main()` exit
wg := &sync.WaitGroup{}
wg.Add(1)
queue := setupLiteQ()
log.Println("Building network...")
// Create the network for the search engine. // Create the network for the search engine.
chUrl := make(chan string) chUrl := make(chan string)
go Entre(queue, chUrl) go Entre(queue, chUrl)
for { for range 5 {
chUrl <- "https://jadud.com/" chUrl <- "https://jadud.com/"
time.Sleep(2 * time.Second)
chUrl <- "https://berea.us/" chUrl <- "https://berea.us/"
time.Sleep(2 * time.Second)
} }
// Don't exit. // Don't exit.
log.Println("Waiting...") log.Println("Waiting for godot...")
wg.Wait() wg.Wait()
} }

2
go.mod
View File

@@ -1,4 +1,4 @@
module git.jadud.com/grosbeak module git.jadud.com/jadudm/grosbeak
go 1.24.0 go 1.24.0

View File

@@ -8,7 +8,7 @@ import (
"slices" "slices"
"sync" "sync"
sqlc "git.jadud.com/grosbeak/internal/domain64/sqlc" sqlc "git.jadud.com/jadudm/grosbeak/internal/domain64/sqlc"
"github.com/jpillora/go-tld" "github.com/jpillora/go-tld"
) )

View File

@@ -7,7 +7,7 @@ import (
"math/rand" "math/rand"
"time" "time"
liteq "git.jadud.com/grosbeak/internal/liteq" liteq "git.jadud.com/jadudm/grosbeak/internal/liteq"
_ "modernc.org/sqlite" _ "modernc.org/sqlite"
) )

View File

@@ -20,10 +20,9 @@ VALUES
unixepoch(), unixepoch(),
?, ?,
? ?
) ON CONFLICT (deduping_key, job_status) ) ON CONFLICT (deduping_key)
WHERE WHERE
deduping_key != '' deduping_key != '' DO NOTHING;
AND (job_status = 'queued' OR job_status = 'fetched' OR job_status = 'completed') DO NOTHING;
-- name: doQueueJobReplaceDupe :exec -- name: doQueueJobReplaceDupe :exec
INSERT INTO INSERT INTO

View File

@@ -21,10 +21,9 @@ WHERE
job_status = 'queued' job_status = 'queued'
OR job_status = 'fetched'; OR job_status = 'fetched';
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key, job_status) CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key)
WHERE WHERE
deduping_key != '' deduping_key != '';
AND (job_status = 'queued' OR job_status = 'fetched' OR job_status = 'completed');
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status) CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status)
WHERE WHERE

View File

@@ -10,7 +10,7 @@ import (
"testing" "testing"
"time" "time"
"git.jadud.com/grosbeak/internal/liteq/internal" "git.jadud.com/jadudm/grosbeak/internal/liteq/internal"
"github.com/matryer/is" "github.com/matryer/is"
_ "modernc.org/sqlite" _ "modernc.org/sqlite"
) )

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"time" "time"
"database/sql/driver" "database/sql/driver"
@@ -110,6 +111,7 @@ func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
for { for {
// If the context gets canceled for example, stop consuming // If the context gets canceled for example, stop consuming
if ctx.Err() != nil { if ctx.Err() != nil {
log.Println("context cancelled in Consume()")
return nil return nil
} }
@@ -120,6 +122,7 @@ func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
}) })
if err != nil { if err != nil {
log.Println("error resetting jobs", err.Error())
return fmt.Errorf("error resetting jobs: %w", err) return fmt.Errorf("error resetting jobs: %w", err)
} }
} }
@@ -143,6 +146,7 @@ func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
workers.Submit(func() { workers.Submit(func() {
err := params.Worker(ctx, job) err := params.Worker(ctx, job)
if err != nil { if err != nil {
log.Println("worker failled", err.Error())
q.FailJob(ctx, FailJobParams{ q.FailJob(ctx, FailJobParams{
ID: job.ID, ID: job.ID,
Errors: ErrorList(append(job.Errors, err.Error())), Errors: ErrorList(append(job.Errors, err.Error())),

View File

@@ -205,10 +205,9 @@ VALUES
unixepoch(), unixepoch(),
?, ?,
? ?
) ON CONFLICT (deduping_key, job_status) ) ON CONFLICT (deduping_key)
WHERE WHERE
deduping_key != '' deduping_key != '' DO NOTHING
AND (job_status = 'queued' OR job_status = 'fetched' OR job_status = 'completed') DO NOTHING
` `
type doQueueJobIgnoreDupeParams struct { type doQueueJobIgnoreDupeParams struct {

View File

@@ -25,10 +25,9 @@ WHERE
job_status = 'queued' job_status = 'queued'
OR job_status = 'fetched'; OR job_status = 'fetched';
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key, job_status) CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key)
WHERE WHERE
deduping_key != '' deduping_key != '';
AND (job_status = 'queued' OR job_status = 'fetched' OR job_status = 'completed');
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status) CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status)
WHERE WHERE

View File

@@ -4,7 +4,7 @@ import (
"context" "context"
"database/sql" "database/sql"
internal "git.jadud.com/grosbeak/internal/liteq/internal" internal "git.jadud.com/jadudm/grosbeak/internal/liteq/internal"
) )
// Creates the db file with the tables and indexes // Creates the db file with the tables and indexes