commit 975217d1f7ce7a43bca4ea1733b94ebe6be6d7b3 Author: Seb Date: Thu Feb 15 21:43:08 2024 -0800 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e93df66 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.DS_Store +/bench/bench.db* +/TODO diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..a9a3e34 --- /dev/null +++ b/Makefile @@ -0,0 +1,29 @@ +default: + go run *.go || (go get ./... && go run *.go) + +watch: + (watchexec -e sql -- make schema-const) & \ + (watchexec -e sql -- sqlc generate) & \ + (watchexec -w sqlc.yaml -- sqlc generate) & \ + (watchexec -e go -c -r -- go test ./... -count 1) & \ + wait + # SQL Watcher done + +# Puts the schema in a constant in go so it can be used to create the table directly +schema-const: + echo "// Code generated by Makefile. DO NOT EDIT." > internal/schema.go + echo "package internal\n" >> internal/schema.go + echo "const Schema = \`" >> internal/schema.go + cat db/schema.sql >> internal/schema.go + echo "\`" >> internal/schema.go + + +.PHONY: bench +bench: + make cleanup + make schema-const + sqlc generate + cd bench && go run main.go + +cleanup: + rm -f bench/bench.db* diff --git a/README.md b/README.md new file mode 100644 index 0000000..135a6c4 --- /dev/null +++ b/README.md @@ -0,0 +1,172 @@ +# liteq + +Library to have a persistent job queue in Go backed by a SQLite DB. + +## Motivation + +I needed a way to have a persistent queue for a small app so it would survive restarts and allow me to schedule jobs in the future. +Since I already had SQLite as a dependency, I didn't want to add another dependency to make the app work. Especially not an external one like RabbitMQ, Redis, SQS or others. + +liteq allows to run tens of thousands of jobs per second if needed. It can also be made to use more than a single DB file to keep growing the concurrency should you need it. + +## Usage + +### Install + +```sh +go get github.com/khepin/liteq +``` + +### Setup and DB creation + +```go +import ( + "github.com/khepin/liteq" + _ "github.com/mattn/go-sqlite3" +) + +func main() { + // Open the sqlite3 DB in the file "liteq.db" + liteqDb, err := sql.Open("sqlite3", "liteq.db") + if err != nil { + fmt.Println(err) + os.Exit(1) + } + // Create the DB table if it doesn't exist + liteq.Setup(liteqDb) + // create a job queue + jqueue := liteq.New(liteqDb) +} +``` + +### Queuing a job + +```go +jqueue.QueueJob(context.Background(), liteq.QueueJobParams{ + Queue: "notify.email", + Job: `{"email_address": "bob@example.com", "content": "..."}`, +}) +``` + +This will send the job with the given payload on a queue called `notify.email`. + +### Consuming + +To consume jobs from the queue, you call the consume method: + +```go +jqueue.Consume(context.Background(), liteq.ConsumeParams{ + Queue: "notify.email", + PoolSize: 3, + VisibilityTimeout: 20, + Worker: func (ctx context.Context, job *liteq.Job) error { + return sendEmail(job) + }, +}) +``` + +- `context.Background()` You can pass in a cancellable context and the queue will stop processing when the context is canceled +- `Queue` this is the name of the queue we want this consumer to consume from +- `PoolSize` this is the number of concurrent consumer we want for this queue +- `VisibilityTimeout` is the time that a job will remain reserved to a consumer. After that time has elapsed, if the job hasn't been marked either failed +or successful, it will be put back in the queue for others to consume. The error will be added to the job's error list and the number of remaining attempts will be +decreased. +- `Worker` A callback to process the job. When an error is returned, the job is either returned to the queue for processing with a decreased number of remaining attempts or marked as failed if no more attempts remain. + +### Multiple attempts + +When queueing a job, it is possible to decide how many times this job can be attempted in case of failures: + +```go +jqueue.QueueJob(context.Background(), liteq.QueueJobParams{ + Queue: "notify.email", + RemainingAttempts: 3, + Job: `{"email_address": "bob@example.com", "content": "..."}`, +}) +``` + +### Delayed jobs + +When queueing a job, you can decide to execute it at a later point in time: + +```go +jqueue.QueueJob(context.Background(), liteq.QueueJobParams{ + Queue: "notify.email", + ExecuteAfter: time.Now().Add(6*time.Minute).Unix(), + Job: `{"email_address": "bob@example.com", "content": "..."}`, +}) +``` + +In this case, the job won't run until the given time. + +### Deduplication + +Sometimes it can be useful to prevent the queueing of multiple messages that would essentially be performing the same task to avoid un-necessary work. + +This is possible in `liteq` via the `DedupingKey` job parameter. There are 2 types of deduping keys: + +- `IgnoreDuplicate` will ignore the new job that was sent and keep the one that was already on the queue +- `ReplaceDuplicate` will instead remove the job currently on the queue and use the new one instead + +Assuming we have the following consumer: + +```go +jqueue.Consume(context.Background(), liteq.ConsumeParams{ + Queue: "print", + VisibilityTimeout: 20, + Worker: func (ctx context.Context, job *liteq.Job) error { + fmt.Println(job.Payload) + return nil + }, +}) +``` + +And we send the following jobs: + +```go +jqueue.QueueJob(context.Background(), liteq.QueueJobParams{ + Queue: "print", + Job: `first`, + DedupingKey: liteq.IgnoreDuplicate("print.job") +}) +jqueue.QueueJob(context.Background(), liteq.QueueJobParams{ + Queue: "print", + Job: `second`, + DedupingKey: liteq.IgnoreDuplicate("print.job") +}) +``` + +Then the result would be a single output line: + +``` +first +``` + +If instead we use `liteq.ReplaceDuplicate` + +```go +jqueue.QueueJob(context.Background(), liteq.QueueJobParams{ + Queue: "print", + Job: `third`, + DedupingKey: liteq.ReplaceDuplicate("print.job") +}) +jqueue.QueueJob(context.Background(), liteq.QueueJobParams{ + Queue: "print", + Job: `fourth`, + DedupingKey: liteq.ReplaceDuplicate("print.job") +}) +``` + +We will output `fourth` + +If we think for example of the scenario of sending email or text notifications about an order to a customer, we could construct a deduping key like: + +```go +jqueue.QueueJob(context.Background(), liteq.QueueJobParams{ + Queue: "email.notify", + Job: `{"order_id": 123, "customer_id": "abc"}`, + DedupingKey: liteq.ReplaceDuplicate("email.notify:%s:%s", customer.ID, order.ID) +}) +``` + +That way if the `Order Prepared` status update email hasn't been sent yet by the time we're ready to send the `Order Shipped` email, we can skip the `Order Prepared` one and only send the most recent update to the customer. diff --git a/bench/main.go b/bench/main.go new file mode 100644 index 0000000..2699f6b --- /dev/null +++ b/bench/main.go @@ -0,0 +1,71 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "math/rand" + "time" + + "github.com/khepin/liteq" + _ "github.com/mattn/go-sqlite3" +) + +func main() { + howMany := 100_000 + + go func() { + db, err := sql.Open("sqlite3", "bench.db") + if err != nil { + panic(err) + } + liteq.Setup(db) + + jq := liteq.New(db) + + for i := 0; i < howMany; i++ { + jq.QueueJob(context.Background(), liteq.QueueJobParams{ + Queue: fmt.Sprintf("default-%d", i%30), + Job: "SendEmail", + }) + } + }() + + c := make(chan struct{}) + for i := 0; i < 30; i++ { + db, err := sql.Open("sqlite3", "bench.db") + if err != nil { + panic(err) + } + liteq.Setup(db) + + jq := liteq.New(db) + + go func(i int) { + err := jq.Consume(context.Background(), liteq.ConsumeParams{ + Queue: fmt.Sprintf("default-%d", i), + PoolSize: 10, + Worker: func(ctx context.Context, job *liteq.Job) error { + // random sleep + n := rand.Intn(50) + + time.Sleep(time.Duration(n) * time.Millisecond) + c <- struct{}{} + return nil + }, + }) + fmt.Println(err) + }(i) + } + + rec := 0 + for range c { + rec++ + if rec%1000 == 0 { + fmt.Println(rec) + } + if rec == howMany { + return + } + } +} diff --git a/db/queries.sql b/db/queries.sql new file mode 100644 index 0000000..f8a1804 --- /dev/null +++ b/db/queries.sql @@ -0,0 +1,138 @@ +-- name: doQueueJobIgnoreDupe :exec +INSERT INTO + jobs ( + queue, + job, + execute_after, + job_status, + created_at, + updated_at, + remaining_attempts, + deduping_key + ) +VALUES + ( + ?, + ?, + ?, + 'queued', + unixepoch(), + unixepoch(), + ?, + ? + ) ON CONFLICT (deduping_key, job_status) +WHERE + deduping_key != '' + AND job_status = 'queued' DO NOTHING; + +-- name: doQueueJobReplaceDupe :exec +INSERT INTO + jobs ( + queue, + job, + execute_after, + job_status, + created_at, + updated_at, + remaining_attempts, + deduping_key + ) +VALUES + ( + ?, + ?, + ?, + 'queued', + unixepoch(), + unixepoch(), + ?, + ? + ) ON CONFLICT (deduping_key, job_status) +WHERE + deduping_key != '' + AND job_status = 'queued' DO +UPDATE +SET + job = EXCLUDED.job, + execute_after = EXCLUDED.execute_after, + updated_at = unixepoch(), + remaining_attempts = EXCLUDED.remaining_attempts; + +-- name: CompleteJob :exec +UPDATE + jobs +SET + job_status = 'completed', + finished_at = unixepoch(), + updated_at = unixepoch(), + consumer_fetched_at = 0, + remaining_attempts = 0 +WHERE + id = ?; + +-- name: FailJob :exec +UPDATE + jobs +SET + job_status = CASE + WHEN remaining_attempts <= 1 THEN 'failed' + ELSE 'queued' + END, + finished_at = 0, + updated_at = unixepoch(), + consumer_fetched_at = 0, + remaining_attempts = MAX(remaining_attempts - 1, 0), + errors = ? +WHERE + id = ?; + +-- name: MarkJobsForConsumer :many +UPDATE + jobs +SET + consumer_fetched_at = unixepoch(), + updated_at = unixepoch(), + job_status = 'fetched' +WHERE + jobs.job_status = 'queued' + AND jobs.remaining_attempts > 0 + AND jobs.id IN ( + SELECT + id + FROM + jobs js + WHERE + js.queue = ? + AND js.job_status = 'queued' + AND js.execute_after <= ? + AND js.remaining_attempts > 0 + ORDER BY + execute_after ASC + LIMIT + ? + ) RETURNING *; + +-- name: ResetJobs :execrows +UPDATE + jobs +SET + job_status = CASE + WHEN remaining_attempts <= 1 THEN 'failed' + ELSE 'queued' + END, + updated_at = unixepoch(), + consumer_fetched_at = 0, + remaining_attempts = MAX(remaining_attempts - 1, 0), + errors = json_insert(errors, '$[#]', 'visibility timeout expired') +WHERE + job_status = 'fetched' + AND queue = ? + AND consumer_fetched_at < ?; + +-- name: FindJob :one +SELECT + * +FROM + jobs +WHERE + id = ?; diff --git a/db/schema.sql b/db/schema.sql new file mode 100644 index 0000000..b2bfa93 --- /dev/null +++ b/db/schema.sql @@ -0,0 +1,27 @@ +PRAGMA journal_mode = WAL; + +CREATE TABLE jobs ( + id INTEGER NOT NULL, + queue TEXT NOT NULL, + job TEXT NOT NULL, + job_status TEXT NOT NULL DEFAULT 'queued', + execute_after INTEGER NOT NULL DEFAULT 0, + remaining_attempts INTEGER NOT NULL DEFAULT 1, + consumer_fetched_at INTEGER NOT NULL DEFAULT 0, + finished_at INTEGER NOT NULL DEFAULT 0, + deduping_key TEXT NOT NULL DEFAULT '', + errors TEXT NOT NULL DEFAULT "[]", + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + PRIMARY KEY (id) +); + +CREATE INDEX todo ON jobs (queue, job_status, execute_after) +WHERE + job_status = 'queued' + OR job_status = 'fetched'; + +CREATE UNIQUE INDEX dedupe ON jobs (deduping_key, job_status) +WHERE + deduping_key != '' + AND job_status = 'queued'; diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..94be995 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module github.com/khepin/liteq + +go 1.22.0 + +require ( + github.com/alitto/pond v1.8.3 + github.com/matryer/is v1.4.1 + github.com/mattn/go-sqlite3 v1.14.22 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d4153b0 --- /dev/null +++ b/go.sum @@ -0,0 +1,6 @@ +github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs= +github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q= +github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ= +github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= diff --git a/internal/db.go b/internal/db.go new file mode 100644 index 0000000..9e50c0c --- /dev/null +++ b/internal/db.go @@ -0,0 +1,31 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.25.0 + +package internal + +import ( + "context" + "database/sql" +) + +type DBTX interface { + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx *sql.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/internal/jobs_test.go b/internal/jobs_test.go new file mode 100644 index 0000000..439eb99 --- /dev/null +++ b/internal/jobs_test.go @@ -0,0 +1,434 @@ +package internal_test + +import ( + "context" + "database/sql" + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/khepin/liteq/internal" + "github.com/matryer/is" + _ "github.com/mattn/go-sqlite3" +) + +func TestMain(m *testing.M) { + e := m.Run() + + removeFiles("jobs.db", "jobs.db-journal", "jobs.db-wal", "jobs.db-shm") + os.Exit(e) +} + +func removeFiles(files ...string) error { + for _, file := range files { + err := os.Remove(file) + if err != nil && !os.IsNotExist(err) { + return err + } + } + return nil +} + +func getDb(file string) (*sql.DB, error) { + if file != ":memory:" { + err := removeFiles(file, file+"-journal", file+"-wal", file+"-shm") + if err != nil && !os.IsNotExist(err) { + return nil, err + } + } + + db, err := sql.Open("sqlite3", file) + if err != nil { + return nil, err + } + schema, err := os.ReadFile("../db/schema.sql") + if err != nil { + return nil, err + } + _, err = db.Exec(string(schema)) + if err != nil { + return nil, err + } + return db, nil +} + +func getJqueue(file string) (*internal.Queries, error) { + sqlcdb, err := getDb(file) + if err != nil { + return nil, err + } + return internal.New(sqlcdb), nil +} + +func Test_QueueJob(t *testing.T) { + is := is.New(t) + sqlcdb, err := getDb("jobs.db") + is.NoErr(err) // error opening sqlite3 database + + jqueue := internal.New(sqlcdb) + jobPaylod := `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}` + err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{ + Queue: "", + Job: jobPaylod, + }) + is.NoErr(err) // error queuing job + + jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{ + Queue: "", + }) + + is.NoErr(err) // error fetching job for consumer + is.Equal(len(jobs), 1) // expected 1 job + is.Equal(jobs[0].Job, jobPaylod) +} + +func Test_FetchTwice(t *testing.T) { + is := is.New(t) + sqlcdb, err := getDb("jobs.db") + is.NoErr(err) // error opening sqlite3 database + + jqueue := internal.New(sqlcdb) + jobPaylod := `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}` + err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{ + Queue: "", + Job: jobPaylod, + }) + is.NoErr(err) // error queuing job + jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{ + Queue: "", + }) + + is.NoErr(err) // error fetching job for consumer + is.Equal(len(jobs), 1) // expected 1 job + is.Equal(jobs[0].Job, jobPaylod) + + jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{ + Queue: "", + }) + + is.NoErr(err) // error fetching job for consumer + is.Equal(len(jobs), 0) // expected 0 job +} + +// delay jobs, fetch jobs, check that we get no jobs, then check that we get the job after the delay +func Test_DelayedJob(t *testing.T) { + is := is.New(t) + jqueue, err := getJqueue("jobs.db") + is.NoErr(err) // error getting job queue + err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{ + Queue: "", + Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`, + ExecuteAfter: time.Now().Unix() + 1, + }) + is.NoErr(err) // error queuing job + + jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{ + Queue: "", + }) + + is.NoErr(err) // error fetching job for consumer + is.Equal(len(jobs), 0) // expected 0 job + + time.Sleep(1 * time.Second) + jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{ + Queue: "", + }) + + is.NoErr(err) // error fetching job for consumer + is.Equal(len(jobs), 1) // expected 1 job +} + +func Test_PrefetchJobs(t *testing.T) { + is := is.New(t) + jqueue, err := getJqueue(":memory:") + is.NoErr(err) // error getting job queue + for i := 0; i < 10; i++ { + + err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{ + Queue: "", + Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`, + }) + is.NoErr(err) // error queuing job + } + + // grab the first 2 + jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{ + Queue: "", + Count: 2, + }) + + is.NoErr(err) // error fetching job for consumer + is.Equal(len(jobs), 2) // expected 2 jobs + + // the next 6 + jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{ + Queue: "", + Count: 6, + }) + + is.NoErr(err) // error fetching job for consumer + is.Equal(len(jobs), 6) // expected 6 jobs + + // try for 5 but only 2 are left + jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{ + Queue: "", + Count: 5, + }) + + is.NoErr(err) // error fetching job for consumer + is.Equal(len(jobs), 2) // expected 2 jobs +} + +func Test_Consume(t *testing.T) { + is := is.New(t) + // This somehow doesn't work well with in memory db + jqueue, err := getJqueue("jobs.db") + is.NoErr(err) // error getting job queue + + fillq1 := make(chan struct{}, 1) + go func() { + for i := 0; i < 100; i++ { + jobPayload := fmt.Sprintf("q1-%d", i) + err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{ + Queue: "q1", + Job: jobPayload, + }) + + is.NoErr(err) // error queuing job + } + fillq1 <- struct{}{} + close(fillq1) + }() + + fillq2 := make(chan struct{}, 1) + go func() { + for i := 0; i < 100; i++ { + jobPayload := fmt.Sprintf("q2-%d", i) + err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{ + Queue: "q2", + Job: jobPayload, + }) + + is.NoErr(err) // error queuing job + } + fillq2 <- struct{}{} + close(fillq2) + }() + + q1 := make(chan string, 100) + q1done := make(chan struct{}, 1) + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + jqueue.Consume(ctx, internal.ConsumeParams{ + Queue: "q1", + PoolSize: 10, + Worker: func(ctx context.Context, job *internal.Job) error { + q1 <- job.Job + if job.Job == "q1-99" { + q1done <- struct{}{} + close(q1done) + close(q1) + } + return nil + }, + }) + }() + + q2 := make(chan string, 100) + q2done := make(chan struct{}, 1) + ctx2, cancel2 := context.WithCancel(context.Background()) + go func() { + jqueue.Consume(ctx2, internal.ConsumeParams{ + Queue: "q2", + PoolSize: 7, + Worker: func(ctx context.Context, job *internal.Job) error { + q2 <- job.Job + if job.Job == "q2-99" { + q2done <- struct{}{} + close(q2done) + close(q2) + } + return nil + }, + }) + }() + + <-fillq1 + <-q1done + cancel() + <-fillq2 + <-q2done + cancel2() + + i := 0 + for pl := range q1 { + is.True(strings.HasPrefix(pl, "q1")) // expected q1-* + i++ + } + is.Equal(i, 100) // expected 100 jobs + + j := 0 + for pl := range q2 { + is.True(strings.HasPrefix(pl, "q2")) // expected q2-* + j++ + } + is.Equal(j, 100) // expected 100 jobs +} + +func Test_MultipleAttempts(t *testing.T) { + is := is.New(t) + jqueue, err := getJqueue("jobs.db") + is.NoErr(err) // error getting job queue + + err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{ + Queue: "", + Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`, + RemainingAttempts: 3, + }) + is.NoErr(err) // error queuing job + + jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{ + Queue: "", + }) + + is.NoErr(err) // error fetching job for consumer + is.Equal(len(jobs), 1) // expected 1 job + + thejob := jobs[0] + is.Equal(thejob.RemainingAttempts, int64(3)) // expected 3 attempts + + // Fail and verify + err = jqueue.FailJob(context.Background(), internal.FailJobParams{ + ID: thejob.ID, + Errors: internal.ErrorList{"error1"}, + }) + is.NoErr(err) // error failing job + jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{ + Queue: "", + }) + is.NoErr(err) // error fetching job for consumer + is.Equal(len(jobs), 1) // expected 1 job + is.Equal(jobs[0].RemainingAttempts, int64(2)) // expected 2 attempts + is.Equal(jobs[0].Errors, internal.ErrorList{"error1"}) + + // Fail again and verify + err = jqueue.FailJob(context.Background(), internal.FailJobParams{ + ID: thejob.ID, + Errors: internal.ErrorList{"error1", "error2"}, + }) + is.NoErr(err) // error failing job + jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{ + Queue: "", + }) + is.NoErr(err) // error fetching job for consumer + is.Equal(len(jobs), 1) // expected 1 job + is.Equal(jobs[0].RemainingAttempts, int64(1)) // expected 1 attempts + is.Equal(jobs[0].Errors, internal.ErrorList{"error1", "error2"}) + + // Fail again and verify + err = jqueue.FailJob(context.Background(), internal.FailJobParams{ + ID: thejob.ID, + Errors: internal.ErrorList{"error1", "error2", "error3"}, + }) + is.NoErr(err) // error failing job + jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{ + Queue: "", + }) + is.NoErr(err) // error fetching job for consumer + is.Equal(len(jobs), 0) // expected no job since no more attempts +} + +func Test_VisibilityTimeout(t *testing.T) { + is := is.New(t) + jqueue, err := getJqueue("jobs.db") + is.NoErr(err) // error getting job queue + + err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{ + Queue: "", + Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`, + }) + is.NoErr(err) // error queuing job + + ctx, cancel := context.WithCancel(context.Background()) + id := int64(0) + go func() { + jqueue.Consume(ctx, internal.ConsumeParams{ + Queue: "", + PoolSize: 6, + VisibilityTimeout: 1, + OnEmptySleep: 100 * time.Millisecond, + Worker: func(ctx context.Context, job *internal.Job) error { + id = job.ID + time.Sleep(3 * time.Second) + return nil + }, + }) + }() + + time.Sleep(2 * time.Second) + cancel() + job, err := jqueue.FindJob(context.Background(), id) + + is.NoErr(err) // error fetching job for consumer + is.Equal(job.JobStatus, "failed") // expected fetched + is.Equal(job.Errors, internal.ErrorList{"visibility timeout expired"}) + // Sleep to ensure enough time for the job to finish and avoid panics + time.Sleep(2 * time.Second) +} + +func Test_DedupeIgnore(t *testing.T) { + is := is.New(t) + jqueue, err := getJqueue("jobs.db") + is.NoErr(err) // error getting job queue + err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{ + Queue: "", + Job: `job:1`, + DedupingKey: internal.IgnoreDuplicate("dedupe"), + }) + is.NoErr(err) // error queuing job + + err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{ + Queue: "", + Job: `job:2`, + DedupingKey: internal.IgnoreDuplicate("dedupe"), + }) + is.NoErr(err) // error queuing job + + jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{ + Queue: "", + Count: 10, + }) + is.NoErr(err) // error fetching job for consumer + is.Equal(len(jobs), 1) // expected only 1 job due to dedupe + is.Equal(jobs[0].Job, `job:1`) // expected job:1 +} + +func Test_DedupeReplace(t *testing.T) { + is := is.New(t) + jqueue, err := getJqueue("jobs.db") + is.NoErr(err) // error getting job queue + err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{ + Queue: "", + Job: `job:1`, + DedupingKey: internal.ReplaceDuplicate("dedupe"), + }) + is.NoErr(err) // error queuing job + + err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{ + Queue: "", + Job: `job:2`, + DedupingKey: internal.ReplaceDuplicate("dedupe"), + }) + is.NoErr(err) // error queuing job + + jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{ + Queue: "", + Count: 10, + }) + is.NoErr(err) // error fetching job for consumer + is.Equal(len(jobs), 1) // expected only 1 job due to dedupe + is.Equal(jobs[0].Job, `job:2`) // expected job:1 +} diff --git a/internal/methods.go b/internal/methods.go new file mode 100644 index 0000000..b9c3402 --- /dev/null +++ b/internal/methods.go @@ -0,0 +1,177 @@ +package internal + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "database/sql/driver" + + "github.com/alitto/pond" +) + +type QueueJobParams struct { + Queue string + Job string + ExecuteAfter int64 + RemainingAttempts int64 + DedupingKey DedupingKey +} + +type DedupingKey interface { + String() string + ReplaceDuplicate() bool +} + +type IgnoreDuplicate string + +func (i IgnoreDuplicate) String() string { + return string(i) +} +func (i IgnoreDuplicate) ReplaceDuplicate() bool { + return false +} + +type ReplaceDuplicate string + +func (r ReplaceDuplicate) String() string { + return string(r) +} +func (r ReplaceDuplicate) ReplaceDuplicate() bool { + return true +} + +func (q *Queries) QueueJob(ctx context.Context, params QueueJobParams) error { + if params.RemainingAttempts == 0 { + params.RemainingAttempts = 1 + } + + if params.DedupingKey == nil { + params.DedupingKey = IgnoreDuplicate("") + } + + doParams := doQueueJobIgnoreDupeParams{ + Queue: params.Queue, + Job: params.Job, + ExecuteAfter: params.ExecuteAfter, + RemainingAttempts: params.RemainingAttempts, + DedupingKey: params.DedupingKey.String(), + } + + if params.DedupingKey.String() == "" { + return q.doQueueJobIgnoreDupe(ctx, doParams) + } + + if params.DedupingKey.ReplaceDuplicate() { + return q.doQueueJobReplaceDupe(ctx, doQueueJobReplaceDupeParams(doParams)) + } + + return q.doQueueJobIgnoreDupe(ctx, doParams) +} + +type GrabJobsParams struct { + Queue string + ExecuteAfter int64 + Count int64 +} + +func (q *Queries) GrabJobs(ctx context.Context, params GrabJobsParams) ([]*Job, error) { + executeAfter := time.Now().Unix() + if params.ExecuteAfter > 0 { + executeAfter = params.ExecuteAfter + } + limit := int64(1) + if params.Count > 0 { + limit = params.Count + } + + return q.MarkJobsForConsumer(ctx, MarkJobsForConsumerParams{ + Queue: params.Queue, + ExecuteAfter: executeAfter, + Limit: limit, + }) +} + +type ConsumeParams struct { + Queue string + PoolSize int + Worker func(context.Context, *Job) error + VisibilityTimeout int64 + OnEmptySleep time.Duration +} + +func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error { + workers := pond.New(params.PoolSize, params.PoolSize) + sleep := params.OnEmptySleep + if sleep == 0 { + sleep = 1 * time.Second + } + for { + // If the context gets canceled for example, stop consuming + if ctx.Err() != nil { + return nil + } + + if params.VisibilityTimeout > 0 { + _, err := q.ResetJobs(ctx, ResetJobsParams{ + Queue: params.Queue, + ConsumerFetchedAt: time.Now().Unix() - params.VisibilityTimeout, + }) + + if err != nil { + return fmt.Errorf("error resetting jobs: %w", err) + } + } + + jobs, err := q.GrabJobs(ctx, GrabJobsParams{ + Queue: params.Queue, + Count: int64(params.PoolSize), + }) + + if err != nil { + return fmt.Errorf("error grabbing jobs: %w", err) + } + + if len(jobs) == 0 { + time.Sleep(sleep) + continue + } + + for _, job := range jobs { + job := job + workers.Submit(func() { + err := params.Worker(ctx, job) + if err != nil { + q.FailJob(ctx, FailJobParams{ + ID: job.ID, + Errors: ErrorList(append(job.Errors, err.Error())), + }) + return + } + + q.CompleteJob(ctx, job.ID) + }) + } + } +} + +type ErrorList []string + +func (e ErrorList) Value() (driver.Value, error) { + if len(e) == 0 { + return "[]", nil + } + return json.Marshal(e) +} + +func (e *ErrorList) Scan(src interface{}) error { + switch src := src.(type) { + case string: + return json.Unmarshal([]byte(src), e) + case []byte: + return json.Unmarshal(src, e) + default: + return fmt.Errorf("unsupported type: %T", src) + } +} diff --git a/internal/models.go b/internal/models.go new file mode 100644 index 0000000..b57c523 --- /dev/null +++ b/internal/models.go @@ -0,0 +1,22 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.25.0 + +package internal + +import () + +type Job struct { + ID int64 + Queue string + Job string + JobStatus string + ExecuteAfter int64 + RemainingAttempts int64 + ConsumerFetchedAt int64 + FinishedAt int64 + DedupingKey string + Errors ErrorList + CreatedAt int64 + UpdatedAt int64 +} diff --git a/internal/queries.sql.go b/internal/queries.sql.go new file mode 100644 index 0000000..89848e5 --- /dev/null +++ b/internal/queries.sql.go @@ -0,0 +1,284 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.25.0 +// source: queries.sql + +package internal + +import ( + "context" +) + +const completeJob = `-- name: CompleteJob :exec +UPDATE + jobs +SET + job_status = 'completed', + finished_at = unixepoch(), + updated_at = unixepoch(), + consumer_fetched_at = 0, + remaining_attempts = 0 +WHERE + id = ? +` + +func (q *Queries) CompleteJob(ctx context.Context, id int64) error { + _, err := q.db.ExecContext(ctx, completeJob, id) + return err +} + +const failJob = `-- name: FailJob :exec +UPDATE + jobs +SET + job_status = CASE + WHEN remaining_attempts <= 1 THEN 'failed' + ELSE 'queued' + END, + finished_at = 0, + updated_at = unixepoch(), + consumer_fetched_at = 0, + remaining_attempts = MAX(remaining_attempts - 1, 0), + errors = ? +WHERE + id = ? +` + +type FailJobParams struct { + Errors ErrorList + ID int64 +} + +func (q *Queries) FailJob(ctx context.Context, arg FailJobParams) error { + _, err := q.db.ExecContext(ctx, failJob, arg.Errors, arg.ID) + return err +} + +const findJob = `-- name: FindJob :one +SELECT + id, queue, job, job_status, execute_after, remaining_attempts, consumer_fetched_at, finished_at, deduping_key, errors, created_at, updated_at +FROM + jobs +WHERE + id = ? +` + +func (q *Queries) FindJob(ctx context.Context, id int64) (*Job, error) { + row := q.db.QueryRowContext(ctx, findJob, id) + var i Job + err := row.Scan( + &i.ID, + &i.Queue, + &i.Job, + &i.JobStatus, + &i.ExecuteAfter, + &i.RemainingAttempts, + &i.ConsumerFetchedAt, + &i.FinishedAt, + &i.DedupingKey, + &i.Errors, + &i.CreatedAt, + &i.UpdatedAt, + ) + return &i, err +} + +const markJobsForConsumer = `-- name: MarkJobsForConsumer :many +UPDATE + jobs +SET + consumer_fetched_at = unixepoch(), + updated_at = unixepoch(), + job_status = 'fetched' +WHERE + jobs.job_status = 'queued' + AND jobs.remaining_attempts > 0 + AND jobs.id IN ( + SELECT + id + FROM + jobs js + WHERE + js.queue = ? + AND js.job_status = 'queued' + AND js.execute_after <= ? + AND js.remaining_attempts > 0 + ORDER BY + execute_after ASC + LIMIT + ? + ) RETURNING id, queue, job, job_status, execute_after, remaining_attempts, consumer_fetched_at, finished_at, deduping_key, errors, created_at, updated_at +` + +type MarkJobsForConsumerParams struct { + Queue string + ExecuteAfter int64 + Limit int64 +} + +func (q *Queries) MarkJobsForConsumer(ctx context.Context, arg MarkJobsForConsumerParams) ([]*Job, error) { + rows, err := q.db.QueryContext(ctx, markJobsForConsumer, arg.Queue, arg.ExecuteAfter, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*Job + for rows.Next() { + var i Job + if err := rows.Scan( + &i.ID, + &i.Queue, + &i.Job, + &i.JobStatus, + &i.ExecuteAfter, + &i.RemainingAttempts, + &i.ConsumerFetchedAt, + &i.FinishedAt, + &i.DedupingKey, + &i.Errors, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const resetJobs = `-- name: ResetJobs :execrows +UPDATE + jobs +SET + job_status = CASE + WHEN remaining_attempts <= 1 THEN 'failed' + ELSE 'queued' + END, + updated_at = unixepoch(), + consumer_fetched_at = 0, + remaining_attempts = MAX(remaining_attempts - 1, 0), + errors = json_insert(errors, '$[#]', 'visibility timeout expired') +WHERE + job_status = 'fetched' + AND queue = ? + AND consumer_fetched_at < ? +` + +type ResetJobsParams struct { + Queue string + ConsumerFetchedAt int64 +} + +func (q *Queries) ResetJobs(ctx context.Context, arg ResetJobsParams) (int64, error) { + result, err := q.db.ExecContext(ctx, resetJobs, arg.Queue, arg.ConsumerFetchedAt) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +const doQueueJobIgnoreDupe = `-- name: doQueueJobIgnoreDupe :exec +INSERT INTO + jobs ( + queue, + job, + execute_after, + job_status, + created_at, + updated_at, + remaining_attempts, + deduping_key + ) +VALUES + ( + ?, + ?, + ?, + 'queued', + unixepoch(), + unixepoch(), + ?, + ? + ) ON CONFLICT (deduping_key, job_status) +WHERE + deduping_key != '' + AND job_status = 'queued' DO NOTHING +` + +type doQueueJobIgnoreDupeParams struct { + Queue string + Job string + ExecuteAfter int64 + RemainingAttempts int64 + DedupingKey string +} + +func (q *Queries) doQueueJobIgnoreDupe(ctx context.Context, arg doQueueJobIgnoreDupeParams) error { + _, err := q.db.ExecContext(ctx, doQueueJobIgnoreDupe, + arg.Queue, + arg.Job, + arg.ExecuteAfter, + arg.RemainingAttempts, + arg.DedupingKey, + ) + return err +} + +const doQueueJobReplaceDupe = `-- name: doQueueJobReplaceDupe :exec +INSERT INTO + jobs ( + queue, + job, + execute_after, + job_status, + created_at, + updated_at, + remaining_attempts, + deduping_key + ) +VALUES + ( + ?, + ?, + ?, + 'queued', + unixepoch(), + unixepoch(), + ?, + ? + ) ON CONFLICT (deduping_key, job_status) +WHERE + deduping_key != '' + AND job_status = 'queued' DO +UPDATE +SET + job = EXCLUDED.job, + execute_after = EXCLUDED.execute_after, + updated_at = unixepoch(), + remaining_attempts = EXCLUDED.remaining_attempts +` + +type doQueueJobReplaceDupeParams struct { + Queue string + Job string + ExecuteAfter int64 + RemainingAttempts int64 + DedupingKey string +} + +func (q *Queries) doQueueJobReplaceDupe(ctx context.Context, arg doQueueJobReplaceDupeParams) error { + _, err := q.db.ExecContext(ctx, doQueueJobReplaceDupe, + arg.Queue, + arg.Job, + arg.ExecuteAfter, + arg.RemainingAttempts, + arg.DedupingKey, + ) + return err +} diff --git a/internal/schema.go b/internal/schema.go new file mode 100644 index 0000000..a863793 --- /dev/null +++ b/internal/schema.go @@ -0,0 +1,32 @@ +// Code generated by Makefile. DO NOT EDIT. +package internal + +const Schema = ` +PRAGMA journal_mode = WAL; + +CREATE TABLE jobs ( + id INTEGER NOT NULL, + queue TEXT NOT NULL, + job TEXT NOT NULL, + job_status TEXT NOT NULL DEFAULT 'queued', + execute_after INTEGER NOT NULL DEFAULT 0, + remaining_attempts INTEGER NOT NULL DEFAULT 1, + consumer_fetched_at INTEGER NOT NULL DEFAULT 0, + finished_at INTEGER NOT NULL DEFAULT 0, + deduping_key TEXT NOT NULL DEFAULT '', + errors TEXT NOT NULL DEFAULT "[]", + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + PRIMARY KEY (id) +); + +CREATE INDEX todo ON jobs (queue, job_status, execute_after) +WHERE + job_status = 'queued' + OR job_status = 'fetched'; + +CREATE UNIQUE INDEX dedupe ON jobs (deduping_key, job_status) +WHERE + deduping_key != '' + AND job_status = 'queued'; +` diff --git a/liteq.go b/liteq.go new file mode 100644 index 0000000..b9c5a9f --- /dev/null +++ b/liteq.go @@ -0,0 +1,41 @@ +package liteq + +import ( + "context" + "database/sql" + + "github.com/khepin/liteq/internal" +) + +// Creates the db file with the tables and indexes +func Setup(db *sql.DB) error { + _, err := db.Exec(internal.Schema) + return err +} + +func New(db *sql.DB) *JobQueue { + queries := internal.New(db) + return &JobQueue{queries} +} + +type JobQueue struct { + queries *internal.Queries +} + +type QueueJobParams = internal.QueueJobParams +type DedupingKey = internal.DedupingKey +type IgnoreDuplicate = internal.IgnoreDuplicate +type ReplaceDuplicate = internal.ReplaceDuplicate + +func (jq *JobQueue) QueueJob(ctx context.Context, params QueueJobParams) error { + return jq.queries.QueueJob(ctx, params) +} + +type ConsumeParams = internal.ConsumeParams + +func (jq *JobQueue) Consume(ctx context.Context, params ConsumeParams) error { + return jq.queries.Consume(ctx, params) +} + +type ErrorList = internal.ErrorList +type Job = internal.Job diff --git a/sqlc.yaml b/sqlc.yaml new file mode 100644 index 0000000..771a94d --- /dev/null +++ b/sqlc.yaml @@ -0,0 +1,17 @@ +version: "2" +sql: + - engine: "sqlite" + queries: "db/queries.sql" + schema: "db/schema.sql" + gen: + go: + package: "internal" + out: "internal" + emit_result_struct_pointers: true + overrides: + - column: jobs.consumer_id + go_type: "string" + - column: jobs.errors + go_type: + import: "" + type: "ErrorList"