initial commit

This commit is contained in:
Seb
2024-02-15 21:43:08 -08:00
commit 975217d1f7
16 changed files with 1493 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
.DS_Store
/bench/bench.db*
/TODO

29
Makefile Normal file
View File

@@ -0,0 +1,29 @@
default:
go run *.go || (go get ./... && go run *.go)
watch:
(watchexec -e sql -- make schema-const) & \
(watchexec -e sql -- sqlc generate) & \
(watchexec -w sqlc.yaml -- sqlc generate) & \
(watchexec -e go -c -r -- go test ./... -count 1) & \
wait
# SQL Watcher done
# Puts the schema in a constant in go so it can be used to create the table directly
schema-const:
echo "// Code generated by Makefile. DO NOT EDIT." > internal/schema.go
echo "package internal\n" >> internal/schema.go
echo "const Schema = \`" >> internal/schema.go
cat db/schema.sql >> internal/schema.go
echo "\`" >> internal/schema.go
.PHONY: bench
bench:
make cleanup
make schema-const
sqlc generate
cd bench && go run main.go
cleanup:
rm -f bench/bench.db*

172
README.md Normal file
View File

@@ -0,0 +1,172 @@
# liteq
Library to have a persistent job queue in Go backed by a SQLite DB.
## Motivation
I needed a way to have a persistent queue for a small app so it would survive restarts and allow me to schedule jobs in the future.
Since I already had SQLite as a dependency, I didn't want to add another dependency to make the app work. Especially not an external one like RabbitMQ, Redis, SQS or others.
liteq allows to run tens of thousands of jobs per second if needed. It can also be made to use more than a single DB file to keep growing the concurrency should you need it.
## Usage
### Install
```sh
go get github.com/khepin/liteq
```
### Setup and DB creation
```go
import (
"github.com/khepin/liteq"
_ "github.com/mattn/go-sqlite3"
)
func main() {
// Open the sqlite3 DB in the file "liteq.db"
liteqDb, err := sql.Open("sqlite3", "liteq.db")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
// Create the DB table if it doesn't exist
liteq.Setup(liteqDb)
// create a job queue
jqueue := liteq.New(liteqDb)
}
```
### Queuing a job
```go
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "notify.email",
Job: `{"email_address": "bob@example.com", "content": "..."}`,
})
```
This will send the job with the given payload on a queue called `notify.email`.
### Consuming
To consume jobs from the queue, you call the consume method:
```go
jqueue.Consume(context.Background(), liteq.ConsumeParams{
Queue: "notify.email",
PoolSize: 3,
VisibilityTimeout: 20,
Worker: func (ctx context.Context, job *liteq.Job) error {
return sendEmail(job)
},
})
```
- `context.Background()` You can pass in a cancellable context and the queue will stop processing when the context is canceled
- `Queue` this is the name of the queue we want this consumer to consume from
- `PoolSize` this is the number of concurrent consumer we want for this queue
- `VisibilityTimeout` is the time that a job will remain reserved to a consumer. After that time has elapsed, if the job hasn't been marked either failed
or successful, it will be put back in the queue for others to consume. The error will be added to the job's error list and the number of remaining attempts will be
decreased.
- `Worker` A callback to process the job. When an error is returned, the job is either returned to the queue for processing with a decreased number of remaining attempts or marked as failed if no more attempts remain.
### Multiple attempts
When queueing a job, it is possible to decide how many times this job can be attempted in case of failures:
```go
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "notify.email",
RemainingAttempts: 3,
Job: `{"email_address": "bob@example.com", "content": "..."}`,
})
```
### Delayed jobs
When queueing a job, you can decide to execute it at a later point in time:
```go
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "notify.email",
ExecuteAfter: time.Now().Add(6*time.Minute).Unix(),
Job: `{"email_address": "bob@example.com", "content": "..."}`,
})
```
In this case, the job won't run until the given time.
### Deduplication
Sometimes it can be useful to prevent the queueing of multiple messages that would essentially be performing the same task to avoid un-necessary work.
This is possible in `liteq` via the `DedupingKey` job parameter. There are 2 types of deduping keys:
- `IgnoreDuplicate` will ignore the new job that was sent and keep the one that was already on the queue
- `ReplaceDuplicate` will instead remove the job currently on the queue and use the new one instead
Assuming we have the following consumer:
```go
jqueue.Consume(context.Background(), liteq.ConsumeParams{
Queue: "print",
VisibilityTimeout: 20,
Worker: func (ctx context.Context, job *liteq.Job) error {
fmt.Println(job.Payload)
return nil
},
})
```
And we send the following jobs:
```go
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "print",
Job: `first`,
DedupingKey: liteq.IgnoreDuplicate("print.job")
})
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "print",
Job: `second`,
DedupingKey: liteq.IgnoreDuplicate("print.job")
})
```
Then the result would be a single output line:
```
first
```
If instead we use `liteq.ReplaceDuplicate`
```go
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "print",
Job: `third`,
DedupingKey: liteq.ReplaceDuplicate("print.job")
})
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "print",
Job: `fourth`,
DedupingKey: liteq.ReplaceDuplicate("print.job")
})
```
We will output `fourth`
If we think for example of the scenario of sending email or text notifications about an order to a customer, we could construct a deduping key like:
```go
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "email.notify",
Job: `{"order_id": 123, "customer_id": "abc"}`,
DedupingKey: liteq.ReplaceDuplicate("email.notify:%s:%s", customer.ID, order.ID)
})
```
That way if the `Order Prepared` status update email hasn't been sent yet by the time we're ready to send the `Order Shipped` email, we can skip the `Order Prepared` one and only send the most recent update to the customer.

71
bench/main.go Normal file
View File

@@ -0,0 +1,71 @@
package main
import (
"context"
"database/sql"
"fmt"
"math/rand"
"time"
"github.com/khepin/liteq"
_ "github.com/mattn/go-sqlite3"
)
func main() {
howMany := 100_000
go func() {
db, err := sql.Open("sqlite3", "bench.db")
if err != nil {
panic(err)
}
liteq.Setup(db)
jq := liteq.New(db)
for i := 0; i < howMany; i++ {
jq.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: fmt.Sprintf("default-%d", i%30),
Job: "SendEmail",
})
}
}()
c := make(chan struct{})
for i := 0; i < 30; i++ {
db, err := sql.Open("sqlite3", "bench.db")
if err != nil {
panic(err)
}
liteq.Setup(db)
jq := liteq.New(db)
go func(i int) {
err := jq.Consume(context.Background(), liteq.ConsumeParams{
Queue: fmt.Sprintf("default-%d", i),
PoolSize: 10,
Worker: func(ctx context.Context, job *liteq.Job) error {
// random sleep
n := rand.Intn(50)
time.Sleep(time.Duration(n) * time.Millisecond)
c <- struct{}{}
return nil
},
})
fmt.Println(err)
}(i)
}
rec := 0
for range c {
rec++
if rec%1000 == 0 {
fmt.Println(rec)
}
if rec == howMany {
return
}
}
}

138
db/queries.sql Normal file
View File

@@ -0,0 +1,138 @@
-- name: doQueueJobIgnoreDupe :exec
INSERT INTO
jobs (
queue,
job,
execute_after,
job_status,
created_at,
updated_at,
remaining_attempts,
deduping_key
)
VALUES
(
?,
?,
?,
'queued',
unixepoch(),
unixepoch(),
?,
?
) ON CONFLICT (deduping_key, job_status)
WHERE
deduping_key != ''
AND job_status = 'queued' DO NOTHING;
-- name: doQueueJobReplaceDupe :exec
INSERT INTO
jobs (
queue,
job,
execute_after,
job_status,
created_at,
updated_at,
remaining_attempts,
deduping_key
)
VALUES
(
?,
?,
?,
'queued',
unixepoch(),
unixepoch(),
?,
?
) ON CONFLICT (deduping_key, job_status)
WHERE
deduping_key != ''
AND job_status = 'queued' DO
UPDATE
SET
job = EXCLUDED.job,
execute_after = EXCLUDED.execute_after,
updated_at = unixepoch(),
remaining_attempts = EXCLUDED.remaining_attempts;
-- name: CompleteJob :exec
UPDATE
jobs
SET
job_status = 'completed',
finished_at = unixepoch(),
updated_at = unixepoch(),
consumer_fetched_at = 0,
remaining_attempts = 0
WHERE
id = ?;
-- name: FailJob :exec
UPDATE
jobs
SET
job_status = CASE
WHEN remaining_attempts <= 1 THEN 'failed'
ELSE 'queued'
END,
finished_at = 0,
updated_at = unixepoch(),
consumer_fetched_at = 0,
remaining_attempts = MAX(remaining_attempts - 1, 0),
errors = ?
WHERE
id = ?;
-- name: MarkJobsForConsumer :many
UPDATE
jobs
SET
consumer_fetched_at = unixepoch(),
updated_at = unixepoch(),
job_status = 'fetched'
WHERE
jobs.job_status = 'queued'
AND jobs.remaining_attempts > 0
AND jobs.id IN (
SELECT
id
FROM
jobs js
WHERE
js.queue = ?
AND js.job_status = 'queued'
AND js.execute_after <= ?
AND js.remaining_attempts > 0
ORDER BY
execute_after ASC
LIMIT
?
) RETURNING *;
-- name: ResetJobs :execrows
UPDATE
jobs
SET
job_status = CASE
WHEN remaining_attempts <= 1 THEN 'failed'
ELSE 'queued'
END,
updated_at = unixepoch(),
consumer_fetched_at = 0,
remaining_attempts = MAX(remaining_attempts - 1, 0),
errors = json_insert(errors, '$[#]', 'visibility timeout expired')
WHERE
job_status = 'fetched'
AND queue = ?
AND consumer_fetched_at < ?;
-- name: FindJob :one
SELECT
*
FROM
jobs
WHERE
id = ?;

27
db/schema.sql Normal file
View File

@@ -0,0 +1,27 @@
PRAGMA journal_mode = WAL;
CREATE TABLE jobs (
id INTEGER NOT NULL,
queue TEXT NOT NULL,
job TEXT NOT NULL,
job_status TEXT NOT NULL DEFAULT 'queued',
execute_after INTEGER NOT NULL DEFAULT 0,
remaining_attempts INTEGER NOT NULL DEFAULT 1,
consumer_fetched_at INTEGER NOT NULL DEFAULT 0,
finished_at INTEGER NOT NULL DEFAULT 0,
deduping_key TEXT NOT NULL DEFAULT '',
errors TEXT NOT NULL DEFAULT "[]",
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
PRIMARY KEY (id)
);
CREATE INDEX todo ON jobs (queue, job_status, execute_after)
WHERE
job_status = 'queued'
OR job_status = 'fetched';
CREATE UNIQUE INDEX dedupe ON jobs (deduping_key, job_status)
WHERE
deduping_key != ''
AND job_status = 'queued';

9
go.mod Normal file
View File

@@ -0,0 +1,9 @@
module github.com/khepin/liteq
go 1.22.0
require (
github.com/alitto/pond v1.8.3
github.com/matryer/is v1.4.1
github.com/mattn/go-sqlite3 v1.14.22
)

6
go.sum Normal file
View File

@@ -0,0 +1,6 @@
github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs=
github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q=
github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ=
github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=

31
internal/db.go Normal file
View File

@@ -0,0 +1,31 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.25.0
package internal
import (
"context"
"database/sql"
)
type DBTX interface {
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
PrepareContext(context.Context, string) (*sql.Stmt, error)
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
}
func New(db DBTX) *Queries {
return &Queries{db: db}
}
type Queries struct {
db DBTX
}
func (q *Queries) WithTx(tx *sql.Tx) *Queries {
return &Queries{
db: tx,
}
}

434
internal/jobs_test.go Normal file
View File

@@ -0,0 +1,434 @@
package internal_test
import (
"context"
"database/sql"
"fmt"
"os"
"strings"
"testing"
"time"
"github.com/khepin/liteq/internal"
"github.com/matryer/is"
_ "github.com/mattn/go-sqlite3"
)
func TestMain(m *testing.M) {
e := m.Run()
removeFiles("jobs.db", "jobs.db-journal", "jobs.db-wal", "jobs.db-shm")
os.Exit(e)
}
func removeFiles(files ...string) error {
for _, file := range files {
err := os.Remove(file)
if err != nil && !os.IsNotExist(err) {
return err
}
}
return nil
}
func getDb(file string) (*sql.DB, error) {
if file != ":memory:" {
err := removeFiles(file, file+"-journal", file+"-wal", file+"-shm")
if err != nil && !os.IsNotExist(err) {
return nil, err
}
}
db, err := sql.Open("sqlite3", file)
if err != nil {
return nil, err
}
schema, err := os.ReadFile("../db/schema.sql")
if err != nil {
return nil, err
}
_, err = db.Exec(string(schema))
if err != nil {
return nil, err
}
return db, nil
}
func getJqueue(file string) (*internal.Queries, error) {
sqlcdb, err := getDb(file)
if err != nil {
return nil, err
}
return internal.New(sqlcdb), nil
}
func Test_QueueJob(t *testing.T) {
is := is.New(t)
sqlcdb, err := getDb("jobs.db")
is.NoErr(err) // error opening sqlite3 database
jqueue := internal.New(sqlcdb)
jobPaylod := `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: jobPaylod,
})
is.NoErr(err) // error queuing job
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected 1 job
is.Equal(jobs[0].Job, jobPaylod)
}
func Test_FetchTwice(t *testing.T) {
is := is.New(t)
sqlcdb, err := getDb("jobs.db")
is.NoErr(err) // error opening sqlite3 database
jqueue := internal.New(sqlcdb)
jobPaylod := `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: jobPaylod,
})
is.NoErr(err) // error queuing job
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected 1 job
is.Equal(jobs[0].Job, jobPaylod)
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 0) // expected 0 job
}
// delay jobs, fetch jobs, check that we get no jobs, then check that we get the job after the delay
func Test_DelayedJob(t *testing.T) {
is := is.New(t)
jqueue, err := getJqueue("jobs.db")
is.NoErr(err) // error getting job queue
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
ExecuteAfter: time.Now().Unix() + 1,
})
is.NoErr(err) // error queuing job
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 0) // expected 0 job
time.Sleep(1 * time.Second)
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected 1 job
}
func Test_PrefetchJobs(t *testing.T) {
is := is.New(t)
jqueue, err := getJqueue(":memory:")
is.NoErr(err) // error getting job queue
for i := 0; i < 10; i++ {
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
})
is.NoErr(err) // error queuing job
}
// grab the first 2
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
Count: 2,
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 2) // expected 2 jobs
// the next 6
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
Count: 6,
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 6) // expected 6 jobs
// try for 5 but only 2 are left
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
Count: 5,
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 2) // expected 2 jobs
}
func Test_Consume(t *testing.T) {
is := is.New(t)
// This somehow doesn't work well with in memory db
jqueue, err := getJqueue("jobs.db")
is.NoErr(err) // error getting job queue
fillq1 := make(chan struct{}, 1)
go func() {
for i := 0; i < 100; i++ {
jobPayload := fmt.Sprintf("q1-%d", i)
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "q1",
Job: jobPayload,
})
is.NoErr(err) // error queuing job
}
fillq1 <- struct{}{}
close(fillq1)
}()
fillq2 := make(chan struct{}, 1)
go func() {
for i := 0; i < 100; i++ {
jobPayload := fmt.Sprintf("q2-%d", i)
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "q2",
Job: jobPayload,
})
is.NoErr(err) // error queuing job
}
fillq2 <- struct{}{}
close(fillq2)
}()
q1 := make(chan string, 100)
q1done := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(context.Background())
go func() {
jqueue.Consume(ctx, internal.ConsumeParams{
Queue: "q1",
PoolSize: 10,
Worker: func(ctx context.Context, job *internal.Job) error {
q1 <- job.Job
if job.Job == "q1-99" {
q1done <- struct{}{}
close(q1done)
close(q1)
}
return nil
},
})
}()
q2 := make(chan string, 100)
q2done := make(chan struct{}, 1)
ctx2, cancel2 := context.WithCancel(context.Background())
go func() {
jqueue.Consume(ctx2, internal.ConsumeParams{
Queue: "q2",
PoolSize: 7,
Worker: func(ctx context.Context, job *internal.Job) error {
q2 <- job.Job
if job.Job == "q2-99" {
q2done <- struct{}{}
close(q2done)
close(q2)
}
return nil
},
})
}()
<-fillq1
<-q1done
cancel()
<-fillq2
<-q2done
cancel2()
i := 0
for pl := range q1 {
is.True(strings.HasPrefix(pl, "q1")) // expected q1-*
i++
}
is.Equal(i, 100) // expected 100 jobs
j := 0
for pl := range q2 {
is.True(strings.HasPrefix(pl, "q2")) // expected q2-*
j++
}
is.Equal(j, 100) // expected 100 jobs
}
func Test_MultipleAttempts(t *testing.T) {
is := is.New(t)
jqueue, err := getJqueue("jobs.db")
is.NoErr(err) // error getting job queue
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
RemainingAttempts: 3,
})
is.NoErr(err) // error queuing job
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected 1 job
thejob := jobs[0]
is.Equal(thejob.RemainingAttempts, int64(3)) // expected 3 attempts
// Fail and verify
err = jqueue.FailJob(context.Background(), internal.FailJobParams{
ID: thejob.ID,
Errors: internal.ErrorList{"error1"},
})
is.NoErr(err) // error failing job
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected 1 job
is.Equal(jobs[0].RemainingAttempts, int64(2)) // expected 2 attempts
is.Equal(jobs[0].Errors, internal.ErrorList{"error1"})
// Fail again and verify
err = jqueue.FailJob(context.Background(), internal.FailJobParams{
ID: thejob.ID,
Errors: internal.ErrorList{"error1", "error2"},
})
is.NoErr(err) // error failing job
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected 1 job
is.Equal(jobs[0].RemainingAttempts, int64(1)) // expected 1 attempts
is.Equal(jobs[0].Errors, internal.ErrorList{"error1", "error2"})
// Fail again and verify
err = jqueue.FailJob(context.Background(), internal.FailJobParams{
ID: thejob.ID,
Errors: internal.ErrorList{"error1", "error2", "error3"},
})
is.NoErr(err) // error failing job
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 0) // expected no job since no more attempts
}
func Test_VisibilityTimeout(t *testing.T) {
is := is.New(t)
jqueue, err := getJqueue("jobs.db")
is.NoErr(err) // error getting job queue
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
})
is.NoErr(err) // error queuing job
ctx, cancel := context.WithCancel(context.Background())
id := int64(0)
go func() {
jqueue.Consume(ctx, internal.ConsumeParams{
Queue: "",
PoolSize: 6,
VisibilityTimeout: 1,
OnEmptySleep: 100 * time.Millisecond,
Worker: func(ctx context.Context, job *internal.Job) error {
id = job.ID
time.Sleep(3 * time.Second)
return nil
},
})
}()
time.Sleep(2 * time.Second)
cancel()
job, err := jqueue.FindJob(context.Background(), id)
is.NoErr(err) // error fetching job for consumer
is.Equal(job.JobStatus, "failed") // expected fetched
is.Equal(job.Errors, internal.ErrorList{"visibility timeout expired"})
// Sleep to ensure enough time for the job to finish and avoid panics
time.Sleep(2 * time.Second)
}
func Test_DedupeIgnore(t *testing.T) {
is := is.New(t)
jqueue, err := getJqueue("jobs.db")
is.NoErr(err) // error getting job queue
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `job:1`,
DedupingKey: internal.IgnoreDuplicate("dedupe"),
})
is.NoErr(err) // error queuing job
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `job:2`,
DedupingKey: internal.IgnoreDuplicate("dedupe"),
})
is.NoErr(err) // error queuing job
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
Count: 10,
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected only 1 job due to dedupe
is.Equal(jobs[0].Job, `job:1`) // expected job:1
}
func Test_DedupeReplace(t *testing.T) {
is := is.New(t)
jqueue, err := getJqueue("jobs.db")
is.NoErr(err) // error getting job queue
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `job:1`,
DedupingKey: internal.ReplaceDuplicate("dedupe"),
})
is.NoErr(err) // error queuing job
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `job:2`,
DedupingKey: internal.ReplaceDuplicate("dedupe"),
})
is.NoErr(err) // error queuing job
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
Count: 10,
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected only 1 job due to dedupe
is.Equal(jobs[0].Job, `job:2`) // expected job:1
}

177
internal/methods.go Normal file
View File

@@ -0,0 +1,177 @@
package internal
import (
"context"
"encoding/json"
"fmt"
"time"
"database/sql/driver"
"github.com/alitto/pond"
)
type QueueJobParams struct {
Queue string
Job string
ExecuteAfter int64
RemainingAttempts int64
DedupingKey DedupingKey
}
type DedupingKey interface {
String() string
ReplaceDuplicate() bool
}
type IgnoreDuplicate string
func (i IgnoreDuplicate) String() string {
return string(i)
}
func (i IgnoreDuplicate) ReplaceDuplicate() bool {
return false
}
type ReplaceDuplicate string
func (r ReplaceDuplicate) String() string {
return string(r)
}
func (r ReplaceDuplicate) ReplaceDuplicate() bool {
return true
}
func (q *Queries) QueueJob(ctx context.Context, params QueueJobParams) error {
if params.RemainingAttempts == 0 {
params.RemainingAttempts = 1
}
if params.DedupingKey == nil {
params.DedupingKey = IgnoreDuplicate("")
}
doParams := doQueueJobIgnoreDupeParams{
Queue: params.Queue,
Job: params.Job,
ExecuteAfter: params.ExecuteAfter,
RemainingAttempts: params.RemainingAttempts,
DedupingKey: params.DedupingKey.String(),
}
if params.DedupingKey.String() == "" {
return q.doQueueJobIgnoreDupe(ctx, doParams)
}
if params.DedupingKey.ReplaceDuplicate() {
return q.doQueueJobReplaceDupe(ctx, doQueueJobReplaceDupeParams(doParams))
}
return q.doQueueJobIgnoreDupe(ctx, doParams)
}
type GrabJobsParams struct {
Queue string
ExecuteAfter int64
Count int64
}
func (q *Queries) GrabJobs(ctx context.Context, params GrabJobsParams) ([]*Job, error) {
executeAfter := time.Now().Unix()
if params.ExecuteAfter > 0 {
executeAfter = params.ExecuteAfter
}
limit := int64(1)
if params.Count > 0 {
limit = params.Count
}
return q.MarkJobsForConsumer(ctx, MarkJobsForConsumerParams{
Queue: params.Queue,
ExecuteAfter: executeAfter,
Limit: limit,
})
}
type ConsumeParams struct {
Queue string
PoolSize int
Worker func(context.Context, *Job) error
VisibilityTimeout int64
OnEmptySleep time.Duration
}
func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
workers := pond.New(params.PoolSize, params.PoolSize)
sleep := params.OnEmptySleep
if sleep == 0 {
sleep = 1 * time.Second
}
for {
// If the context gets canceled for example, stop consuming
if ctx.Err() != nil {
return nil
}
if params.VisibilityTimeout > 0 {
_, err := q.ResetJobs(ctx, ResetJobsParams{
Queue: params.Queue,
ConsumerFetchedAt: time.Now().Unix() - params.VisibilityTimeout,
})
if err != nil {
return fmt.Errorf("error resetting jobs: %w", err)
}
}
jobs, err := q.GrabJobs(ctx, GrabJobsParams{
Queue: params.Queue,
Count: int64(params.PoolSize),
})
if err != nil {
return fmt.Errorf("error grabbing jobs: %w", err)
}
if len(jobs) == 0 {
time.Sleep(sleep)
continue
}
for _, job := range jobs {
job := job
workers.Submit(func() {
err := params.Worker(ctx, job)
if err != nil {
q.FailJob(ctx, FailJobParams{
ID: job.ID,
Errors: ErrorList(append(job.Errors, err.Error())),
})
return
}
q.CompleteJob(ctx, job.ID)
})
}
}
}
type ErrorList []string
func (e ErrorList) Value() (driver.Value, error) {
if len(e) == 0 {
return "[]", nil
}
return json.Marshal(e)
}
func (e *ErrorList) Scan(src interface{}) error {
switch src := src.(type) {
case string:
return json.Unmarshal([]byte(src), e)
case []byte:
return json.Unmarshal(src, e)
default:
return fmt.Errorf("unsupported type: %T", src)
}
}

22
internal/models.go Normal file
View File

@@ -0,0 +1,22 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.25.0
package internal
import ()
type Job struct {
ID int64
Queue string
Job string
JobStatus string
ExecuteAfter int64
RemainingAttempts int64
ConsumerFetchedAt int64
FinishedAt int64
DedupingKey string
Errors ErrorList
CreatedAt int64
UpdatedAt int64
}

284
internal/queries.sql.go Normal file
View File

@@ -0,0 +1,284 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.25.0
// source: queries.sql
package internal
import (
"context"
)
const completeJob = `-- name: CompleteJob :exec
UPDATE
jobs
SET
job_status = 'completed',
finished_at = unixepoch(),
updated_at = unixepoch(),
consumer_fetched_at = 0,
remaining_attempts = 0
WHERE
id = ?
`
func (q *Queries) CompleteJob(ctx context.Context, id int64) error {
_, err := q.db.ExecContext(ctx, completeJob, id)
return err
}
const failJob = `-- name: FailJob :exec
UPDATE
jobs
SET
job_status = CASE
WHEN remaining_attempts <= 1 THEN 'failed'
ELSE 'queued'
END,
finished_at = 0,
updated_at = unixepoch(),
consumer_fetched_at = 0,
remaining_attempts = MAX(remaining_attempts - 1, 0),
errors = ?
WHERE
id = ?
`
type FailJobParams struct {
Errors ErrorList
ID int64
}
func (q *Queries) FailJob(ctx context.Context, arg FailJobParams) error {
_, err := q.db.ExecContext(ctx, failJob, arg.Errors, arg.ID)
return err
}
const findJob = `-- name: FindJob :one
SELECT
id, queue, job, job_status, execute_after, remaining_attempts, consumer_fetched_at, finished_at, deduping_key, errors, created_at, updated_at
FROM
jobs
WHERE
id = ?
`
func (q *Queries) FindJob(ctx context.Context, id int64) (*Job, error) {
row := q.db.QueryRowContext(ctx, findJob, id)
var i Job
err := row.Scan(
&i.ID,
&i.Queue,
&i.Job,
&i.JobStatus,
&i.ExecuteAfter,
&i.RemainingAttempts,
&i.ConsumerFetchedAt,
&i.FinishedAt,
&i.DedupingKey,
&i.Errors,
&i.CreatedAt,
&i.UpdatedAt,
)
return &i, err
}
const markJobsForConsumer = `-- name: MarkJobsForConsumer :many
UPDATE
jobs
SET
consumer_fetched_at = unixepoch(),
updated_at = unixepoch(),
job_status = 'fetched'
WHERE
jobs.job_status = 'queued'
AND jobs.remaining_attempts > 0
AND jobs.id IN (
SELECT
id
FROM
jobs js
WHERE
js.queue = ?
AND js.job_status = 'queued'
AND js.execute_after <= ?
AND js.remaining_attempts > 0
ORDER BY
execute_after ASC
LIMIT
?
) RETURNING id, queue, job, job_status, execute_after, remaining_attempts, consumer_fetched_at, finished_at, deduping_key, errors, created_at, updated_at
`
type MarkJobsForConsumerParams struct {
Queue string
ExecuteAfter int64
Limit int64
}
func (q *Queries) MarkJobsForConsumer(ctx context.Context, arg MarkJobsForConsumerParams) ([]*Job, error) {
rows, err := q.db.QueryContext(ctx, markJobsForConsumer, arg.Queue, arg.ExecuteAfter, arg.Limit)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*Job
for rows.Next() {
var i Job
if err := rows.Scan(
&i.ID,
&i.Queue,
&i.Job,
&i.JobStatus,
&i.ExecuteAfter,
&i.RemainingAttempts,
&i.ConsumerFetchedAt,
&i.FinishedAt,
&i.DedupingKey,
&i.Errors,
&i.CreatedAt,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const resetJobs = `-- name: ResetJobs :execrows
UPDATE
jobs
SET
job_status = CASE
WHEN remaining_attempts <= 1 THEN 'failed'
ELSE 'queued'
END,
updated_at = unixepoch(),
consumer_fetched_at = 0,
remaining_attempts = MAX(remaining_attempts - 1, 0),
errors = json_insert(errors, '$[#]', 'visibility timeout expired')
WHERE
job_status = 'fetched'
AND queue = ?
AND consumer_fetched_at < ?
`
type ResetJobsParams struct {
Queue string
ConsumerFetchedAt int64
}
func (q *Queries) ResetJobs(ctx context.Context, arg ResetJobsParams) (int64, error) {
result, err := q.db.ExecContext(ctx, resetJobs, arg.Queue, arg.ConsumerFetchedAt)
if err != nil {
return 0, err
}
return result.RowsAffected()
}
const doQueueJobIgnoreDupe = `-- name: doQueueJobIgnoreDupe :exec
INSERT INTO
jobs (
queue,
job,
execute_after,
job_status,
created_at,
updated_at,
remaining_attempts,
deduping_key
)
VALUES
(
?,
?,
?,
'queued',
unixepoch(),
unixepoch(),
?,
?
) ON CONFLICT (deduping_key, job_status)
WHERE
deduping_key != ''
AND job_status = 'queued' DO NOTHING
`
type doQueueJobIgnoreDupeParams struct {
Queue string
Job string
ExecuteAfter int64
RemainingAttempts int64
DedupingKey string
}
func (q *Queries) doQueueJobIgnoreDupe(ctx context.Context, arg doQueueJobIgnoreDupeParams) error {
_, err := q.db.ExecContext(ctx, doQueueJobIgnoreDupe,
arg.Queue,
arg.Job,
arg.ExecuteAfter,
arg.RemainingAttempts,
arg.DedupingKey,
)
return err
}
const doQueueJobReplaceDupe = `-- name: doQueueJobReplaceDupe :exec
INSERT INTO
jobs (
queue,
job,
execute_after,
job_status,
created_at,
updated_at,
remaining_attempts,
deduping_key
)
VALUES
(
?,
?,
?,
'queued',
unixepoch(),
unixepoch(),
?,
?
) ON CONFLICT (deduping_key, job_status)
WHERE
deduping_key != ''
AND job_status = 'queued' DO
UPDATE
SET
job = EXCLUDED.job,
execute_after = EXCLUDED.execute_after,
updated_at = unixepoch(),
remaining_attempts = EXCLUDED.remaining_attempts
`
type doQueueJobReplaceDupeParams struct {
Queue string
Job string
ExecuteAfter int64
RemainingAttempts int64
DedupingKey string
}
func (q *Queries) doQueueJobReplaceDupe(ctx context.Context, arg doQueueJobReplaceDupeParams) error {
_, err := q.db.ExecContext(ctx, doQueueJobReplaceDupe,
arg.Queue,
arg.Job,
arg.ExecuteAfter,
arg.RemainingAttempts,
arg.DedupingKey,
)
return err
}

32
internal/schema.go Normal file
View File

@@ -0,0 +1,32 @@
// Code generated by Makefile. DO NOT EDIT.
package internal
const Schema = `
PRAGMA journal_mode = WAL;
CREATE TABLE jobs (
id INTEGER NOT NULL,
queue TEXT NOT NULL,
job TEXT NOT NULL,
job_status TEXT NOT NULL DEFAULT 'queued',
execute_after INTEGER NOT NULL DEFAULT 0,
remaining_attempts INTEGER NOT NULL DEFAULT 1,
consumer_fetched_at INTEGER NOT NULL DEFAULT 0,
finished_at INTEGER NOT NULL DEFAULT 0,
deduping_key TEXT NOT NULL DEFAULT '',
errors TEXT NOT NULL DEFAULT "[]",
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
PRIMARY KEY (id)
);
CREATE INDEX todo ON jobs (queue, job_status, execute_after)
WHERE
job_status = 'queued'
OR job_status = 'fetched';
CREATE UNIQUE INDEX dedupe ON jobs (deduping_key, job_status)
WHERE
deduping_key != ''
AND job_status = 'queued';
`

41
liteq.go Normal file
View File

@@ -0,0 +1,41 @@
package liteq
import (
"context"
"database/sql"
"github.com/khepin/liteq/internal"
)
// Creates the db file with the tables and indexes
func Setup(db *sql.DB) error {
_, err := db.Exec(internal.Schema)
return err
}
func New(db *sql.DB) *JobQueue {
queries := internal.New(db)
return &JobQueue{queries}
}
type JobQueue struct {
queries *internal.Queries
}
type QueueJobParams = internal.QueueJobParams
type DedupingKey = internal.DedupingKey
type IgnoreDuplicate = internal.IgnoreDuplicate
type ReplaceDuplicate = internal.ReplaceDuplicate
func (jq *JobQueue) QueueJob(ctx context.Context, params QueueJobParams) error {
return jq.queries.QueueJob(ctx, params)
}
type ConsumeParams = internal.ConsumeParams
func (jq *JobQueue) Consume(ctx context.Context, params ConsumeParams) error {
return jq.queries.Consume(ctx, params)
}
type ErrorList = internal.ErrorList
type Job = internal.Job

17
sqlc.yaml Normal file
View File

@@ -0,0 +1,17 @@
version: "2"
sql:
- engine: "sqlite"
queries: "db/queries.sql"
schema: "db/schema.sql"
gen:
go:
package: "internal"
out: "internal"
emit_result_struct_pointers: true
overrides:
- column: jobs.consumer_id
go_type: "string"
- column: jobs.errors
go_type:
import: ""
type: "ErrorList"