diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d715cec --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.db* +*.sqlite* \ No newline at end of file diff --git a/Makefile b/Makefile index 7783707..8779f28 100644 --- a/Makefile +++ b/Makefile @@ -3,4 +3,5 @@ test: generate generate: cd internal/domain64 ; make generate - cd internal/liteq ; sqlc generate \ No newline at end of file + cd internal/liteq ; sqlc generate + cd internal/liteq ; make schema-const diff --git a/cmd/api/main.go b/cmd/api/main.go index 0e3c730..b8cbf32 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -12,9 +12,11 @@ import ( _ "modernc.org/sqlite" - liteq "git.jadud.com/grosbeak/internal/liteq" + liteq "git.jadud.com/jadudm/grosbeak/internal/liteq" ) +type queueWorker func(ctx context.Context, job *liteq.Job) error + func Fetch(ctx context.Context, job *liteq.Job) error { n := rand.Intn(50) @@ -23,38 +25,48 @@ func Fetch(ctx context.Context, job *liteq.Job) error { return nil } -func runWorkers(queue *liteq.JobQueue) { - go queue.Consume(context.Background(), liteq.ConsumeParams{ - Queue: "fetch", - PoolSize: 3, - VisibilityTimeout: 200000, - Worker: Fetch, - }) +func runQ(queue *liteq.JobQueue, queueName string, worker queueWorker) { + for { + err := queue.Consume(context.Background(), liteq.ConsumeParams{ + Queue: queueName, + PoolSize: 3, + VisibilityTimeout: 20, + Worker: worker, + }) + if err != nil { + log.Printf("runQ/%s: %w", queueName, err.Error()) + time.Sleep(2 * time.Second) + } + time.Sleep(1 * time.Second) + } } func Entre(queue *liteq.JobQueue, chUrl <-chan string) { ctx := context.Background() for { url := <-chUrl - log.Println("Entre", url) - // Don't duplicate jobs on the same day of the year. n := time.Now() - queue.QueueJob(ctx, liteq.QueueJobParams{ + ignore_tag := fmt.Sprintf("%s:%d:%d", url, n.Year(), n.YearDay()) + log.Println("entre", url, ignore_tag) + // Don't duplicate jobs on the same day of the year. + err := queue.QueueJob(ctx, liteq.QueueJobParams{ Queue: "fetch", // This only works for things in the `queued` state - DedupingKey: liteq.IgnoreDuplicate(fmt.Sprintf("%s:%d:%d", url, n.Year(), n.YearDay())), + DedupingKey: liteq.IgnoreDuplicate(ignore_tag), Job: url, }) + if err != nil { + log.Println("entre err", err.Error()) + } } } -func main() { - // Don't let `main()` exit - wg := &sync.WaitGroup{} - wg.Add(1) +func setupLiteQ() *liteq.JobQueue { // FIXME: This path needs to come from the env. liteqDB, err := sql.Open("sqlite", "liteq.db") + liteqDB.SetMaxOpenConns(1) + if err != nil { fmt.Println(err) os.Exit(1) @@ -63,22 +75,36 @@ func main() { queue := liteq.New(liteqDB) // The queue processes as long as this context is not cancelled. - log.Println("Setting up workers...") - go runWorkers(queue) + log.Println("Setting up worker queues...") + queues := []struct { + queueName string + worker queueWorker + }{ + {"fetch", Fetch}, + } + for _, q := range queues { + go runQ(queue, q.queueName, q.worker) + } + return queue +} + +func main() { + // Don't let `main()` exit + wg := &sync.WaitGroup{} + wg.Add(1) + + queue := setupLiteQ() - log.Println("Building network...") // Create the network for the search engine. chUrl := make(chan string) go Entre(queue, chUrl) - for { + for range 5 { chUrl <- "https://jadud.com/" - time.Sleep(2 * time.Second) chUrl <- "https://berea.us/" - time.Sleep(2 * time.Second) } // Don't exit. - log.Println("Waiting...") + log.Println("Waiting for godot...") wg.Wait() } diff --git a/go.mod b/go.mod index cf508c1..b2a91dd 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module git.jadud.com/grosbeak +module git.jadud.com/jadudm/grosbeak go 1.24.0 diff --git a/internal/domain64/database.go b/internal/domain64/database.go index 4f4850c..5165df4 100644 --- a/internal/domain64/database.go +++ b/internal/domain64/database.go @@ -8,7 +8,7 @@ import ( "slices" "sync" - sqlc "git.jadud.com/grosbeak/internal/domain64/sqlc" + sqlc "git.jadud.com/jadudm/grosbeak/internal/domain64/sqlc" "github.com/jpillora/go-tld" ) diff --git a/internal/liteq/bench/main.go b/internal/liteq/bench/main.go index be2d26f..00faefb 100644 --- a/internal/liteq/bench/main.go +++ b/internal/liteq/bench/main.go @@ -7,7 +7,7 @@ import ( "math/rand" "time" - liteq "git.jadud.com/grosbeak/internal/liteq" + liteq "git.jadud.com/jadudm/grosbeak/internal/liteq" _ "modernc.org/sqlite" ) diff --git a/internal/liteq/db/queries.sql b/internal/liteq/db/queries.sql index 680fff0..90a257a 100644 --- a/internal/liteq/db/queries.sql +++ b/internal/liteq/db/queries.sql @@ -20,10 +20,9 @@ VALUES unixepoch(), ?, ? - ) ON CONFLICT (deduping_key, job_status) + ) ON CONFLICT (deduping_key) WHERE - deduping_key != '' - AND (job_status = 'queued' OR job_status = 'fetched' OR job_status = 'completed') DO NOTHING; + deduping_key != '' DO NOTHING; -- name: doQueueJobReplaceDupe :exec INSERT INTO diff --git a/internal/liteq/db/schema.sql b/internal/liteq/db/schema.sql index 51f3c52..671c8db 100644 --- a/internal/liteq/db/schema.sql +++ b/internal/liteq/db/schema.sql @@ -21,10 +21,9 @@ WHERE job_status = 'queued' OR job_status = 'fetched'; -CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key, job_status) +CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key) WHERE - deduping_key != '' - AND (job_status = 'queued' OR job_status = 'fetched' OR job_status = 'completed'); + deduping_key != ''; CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status) WHERE diff --git a/internal/liteq/internal/jobs_test.go b/internal/liteq/internal/jobs_test.go index 7e75966..de2349f 100644 --- a/internal/liteq/internal/jobs_test.go +++ b/internal/liteq/internal/jobs_test.go @@ -10,7 +10,7 @@ import ( "testing" "time" - "git.jadud.com/grosbeak/internal/liteq/internal" + "git.jadud.com/jadudm/grosbeak/internal/liteq/internal" "github.com/matryer/is" _ "modernc.org/sqlite" ) diff --git a/internal/liteq/internal/methods.go b/internal/liteq/internal/methods.go index b9c3402..ee1ed77 100644 --- a/internal/liteq/internal/methods.go +++ b/internal/liteq/internal/methods.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "log" "time" "database/sql/driver" @@ -110,6 +111,7 @@ func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error { for { // If the context gets canceled for example, stop consuming if ctx.Err() != nil { + log.Println("context cancelled in Consume()") return nil } @@ -120,6 +122,7 @@ func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error { }) if err != nil { + log.Println("error resetting jobs", err.Error()) return fmt.Errorf("error resetting jobs: %w", err) } } @@ -143,6 +146,7 @@ func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error { workers.Submit(func() { err := params.Worker(ctx, job) if err != nil { + log.Println("worker failled", err.Error()) q.FailJob(ctx, FailJobParams{ ID: job.ID, Errors: ErrorList(append(job.Errors, err.Error())), diff --git a/internal/liteq/internal/queries.sql.go b/internal/liteq/internal/queries.sql.go index 09d07f5..3f7bd93 100644 --- a/internal/liteq/internal/queries.sql.go +++ b/internal/liteq/internal/queries.sql.go @@ -205,10 +205,9 @@ VALUES unixepoch(), ?, ? - ) ON CONFLICT (deduping_key, job_status) + ) ON CONFLICT (deduping_key) WHERE - deduping_key != '' - AND (job_status = 'queued' OR job_status = 'fetched' OR job_status = 'completed') DO NOTHING + deduping_key != '' DO NOTHING ` type doQueueJobIgnoreDupeParams struct { diff --git a/internal/liteq/internal/schema.go b/internal/liteq/internal/schema.go index f7dc22a..684cdbe 100644 --- a/internal/liteq/internal/schema.go +++ b/internal/liteq/internal/schema.go @@ -25,10 +25,9 @@ WHERE job_status = 'queued' OR job_status = 'fetched'; -CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key, job_status) +CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key) WHERE - deduping_key != '' - AND (job_status = 'queued' OR job_status = 'fetched' OR job_status = 'completed'); + deduping_key != ''; CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status) WHERE diff --git a/internal/liteq/liteq.go b/internal/liteq/liteq.go index e8a8f43..2c2223e 100644 --- a/internal/liteq/liteq.go +++ b/internal/liteq/liteq.go @@ -4,7 +4,7 @@ import ( "context" "database/sql" - internal "git.jadud.com/grosbeak/internal/liteq/internal" + internal "git.jadud.com/jadudm/grosbeak/internal/liteq/internal" ) // Creates the db file with the tables and indexes