Compare commits
5 Commits
437393a1e7
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f53639af2f
|
||
|
|
06cdc68be7
|
||
|
|
d212a354fe
|
||
|
|
23923b7be4
|
||
|
|
80b55d1a3b
|
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
*.db*
|
||||
*.sqlite*
|
||||
4
Makefile
4
Makefile
@@ -4,3 +4,7 @@ test: generate
|
||||
generate:
|
||||
cd internal/domain64 ; make generate
|
||||
cd internal/liteq ; sqlc generate
|
||||
cd internal/liteq ; make schema-const
|
||||
|
||||
run: generate
|
||||
go run cmd/api/*.go
|
||||
@@ -32,3 +32,7 @@ The use-case is (essentially) single-user.
|
||||
* https://github.com/jpillora/go-tld
|
||||
* https://pkg.go.dev/modernc.org/sqlite
|
||||
* https://github.com/sqlc-dev/sqlc
|
||||
* https://github.com/spf13/afero
|
||||
* https://github.com/tidwall/gjson
|
||||
|
||||
//go:generate stringer -type=UpdateFrequency
|
||||
|
||||
78
cmd/api/init.go
Normal file
78
cmd/api/init.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.jadud.com/jadudm/grosbeak/internal/domain64"
|
||||
"git.jadud.com/jadudm/grosbeak/internal/engine"
|
||||
liteq "git.jadud.com/jadudm/grosbeak/internal/liteq"
|
||||
"git.jadud.com/jadudm/grosbeak/internal/types"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
func setupDB() *sql.DB {
|
||||
|
||||
// FIXME: This path needs to come from the env.
|
||||
db, err := sql.Open("sqlite", "grosbeak.sqlite")
|
||||
db.SetMaxOpenConns(1)
|
||||
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
return db
|
||||
}
|
||||
|
||||
func runQ(queue *liteq.JobQueue, queueName string, worker types.QueueWorker) {
|
||||
for {
|
||||
log.Printf("runQ %s\n", queueName)
|
||||
|
||||
err := queue.Consume(context.Background(), liteq.ConsumeParams{
|
||||
Queue: queueName,
|
||||
PoolSize: 3,
|
||||
VisibilityTimeout: 20,
|
||||
Worker: worker,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Printf("runQ/%s: %s", queueName, err.Error())
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func setupLiteQ(db *sql.DB, d64m *domain64.Domain64Map) *liteq.JobQueue {
|
||||
liteq.Setup(db)
|
||||
queue := liteq.New(db)
|
||||
// The queue processes as long as this context is not cancelled.
|
||||
|
||||
log.Println("setting up worker queues...")
|
||||
queues := []struct {
|
||||
queueName string
|
||||
worker types.QueueWorker
|
||||
}{
|
||||
{"fetch", engine.Fetch(d64m)},
|
||||
}
|
||||
for _, q := range queues {
|
||||
go runQ(queue, q.queueName, q.worker)
|
||||
}
|
||||
return queue
|
||||
}
|
||||
|
||||
func setupDomain64Map(db *sql.DB) *domain64.Domain64Map {
|
||||
d64m, err := domain64.NewDomain64Map()
|
||||
if err != nil {
|
||||
log.Printf("newdomain64map err: %s", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
d64m.Setup(db)
|
||||
return d64m
|
||||
}
|
||||
@@ -1,84 +1,40 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
|
||||
liteq "git.jadud.com/grosbeak/internal/liteq"
|
||||
"git.jadud.com/jadudm/grosbeak/internal/engine"
|
||||
base "git.jadud.com/jadudm/grosbeak/internal/types"
|
||||
)
|
||||
|
||||
func Fetch(ctx context.Context, job *liteq.Job) error {
|
||||
n := rand.Intn(50)
|
||||
|
||||
time.Sleep(time.Duration(n) * time.Millisecond)
|
||||
log.Println("Fetching", job.Job)
|
||||
return nil
|
||||
}
|
||||
|
||||
func runWorkers(queue *liteq.JobQueue) {
|
||||
go queue.Consume(context.Background(), liteq.ConsumeParams{
|
||||
Queue: "fetch",
|
||||
PoolSize: 3,
|
||||
VisibilityTimeout: 200000,
|
||||
Worker: Fetch,
|
||||
})
|
||||
}
|
||||
|
||||
func Entre(queue *liteq.JobQueue, chUrl <-chan string) {
|
||||
ctx := context.Background()
|
||||
for {
|
||||
url := <-chUrl
|
||||
log.Println("Entre", url)
|
||||
// Don't duplicate jobs on the same day of the year.
|
||||
n := time.Now()
|
||||
queue.QueueJob(ctx, liteq.QueueJobParams{
|
||||
Queue: "fetch",
|
||||
// This only works for things in the `queued` state
|
||||
DedupingKey: liteq.IgnoreDuplicate(fmt.Sprintf("%s:%d:%d", url, n.Year(), n.YearDay())),
|
||||
Job: url,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Don't let `main()` exit
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
|
||||
// FIXME: This path needs to come from the env.
|
||||
liteqDB, err := sql.Open("sqlite", "liteq.db")
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
db := setupDB()
|
||||
d64m := setupDomain64Map(db)
|
||||
queue := setupLiteQ(db, d64m)
|
||||
|
||||
// Enqueue URLs
|
||||
urls := []struct {
|
||||
url string
|
||||
uf base.UpdateFrequency
|
||||
}{
|
||||
{"https://jadud.com/", base.UPDATE_DAILY},
|
||||
{"https://berea.us/", base.UPDATE_WEEKLY},
|
||||
}
|
||||
liteq.Setup(liteqDB)
|
||||
queue := liteq.New(liteqDB)
|
||||
// The queue processes as long as this context is not cancelled.
|
||||
|
||||
log.Println("Setting up workers...")
|
||||
go runWorkers(queue)
|
||||
|
||||
log.Println("Building network...")
|
||||
// Create the network for the search engine.
|
||||
chUrl := make(chan string)
|
||||
|
||||
go Entre(queue, chUrl)
|
||||
for {
|
||||
chUrl <- "https://jadud.com/"
|
||||
time.Sleep(2 * time.Second)
|
||||
chUrl <- "https://berea.us/"
|
||||
time.Sleep(2 * time.Second)
|
||||
for _, u := range urls {
|
||||
engine.Entre(queue, &base.EntreJob{URL: u.url, UpdateFrequency: u.uf})
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
// Don't exit.
|
||||
log.Println("Waiting...")
|
||||
log.Println("Waiting for godot...")
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
5
go.mod
5
go.mod
@@ -1,4 +1,4 @@
|
||||
module git.jadud.com/grosbeak
|
||||
module git.jadud.com/jadudm/grosbeak
|
||||
|
||||
go 1.24.0
|
||||
|
||||
@@ -8,6 +8,7 @@ require (
|
||||
github.com/alitto/pond v1.9.2
|
||||
github.com/jpillora/go-tld v1.2.1
|
||||
github.com/matryer/is v1.4.1
|
||||
github.com/tidwall/gjson v1.18.0
|
||||
modernc.org/sqlite v1.40.1
|
||||
)
|
||||
|
||||
@@ -17,6 +18,8 @@ require (
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/ncruces/go-strftime v1.0.0 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
github.com/tidwall/match v1.2.0 // indirect
|
||||
github.com/tidwall/pretty v1.2.1 // indirect
|
||||
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 // indirect
|
||||
golang.org/x/net v0.47.0 // indirect
|
||||
golang.org/x/sys v0.38.0 // indirect
|
||||
|
||||
8
go.sum
8
go.sum
@@ -18,6 +18,14 @@ github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOF
|
||||
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
|
||||
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
|
||||
github.com/tidwall/match v1.2.0 h1:0pt8FlkOwjN2fPt4bIl4BoNxb98gGHN2ObFEDkrfZnM=
|
||||
github.com/tidwall/match v1.2.0/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
|
||||
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
|
||||
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 h1:DHNhtq3sNNzrvduZZIiFyXWOL9IWaDPHqTnLJp+rCBY=
|
||||
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39/go.mod h1:46edojNIoXTNOhySWIWdix628clX9ODXwPsQuG6hsK0=
|
||||
golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk=
|
||||
|
||||
@@ -3,30 +3,43 @@ package domain64
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
_ "embed"
|
||||
"fmt"
|
||||
"log"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
sqlc "git.jadud.com/grosbeak/internal/domain64/sqlc"
|
||||
sqlc "git.jadud.com/jadudm/grosbeak/internal/domain64/sqlc"
|
||||
"github.com/jpillora/go-tld"
|
||||
)
|
||||
|
||||
//go:embed sqlc/schema.sql
|
||||
var ddl string
|
||||
|
||||
type Domain64Map struct {
|
||||
mu sync.Mutex
|
||||
m map[string]int
|
||||
DB *sql.DB
|
||||
Flushed bool
|
||||
Queries *sqlc.Queries
|
||||
}
|
||||
|
||||
func NewDomain64Map(db *sql.DB) (*Domain64Map, error) {
|
||||
func NewDomain64Map() (*Domain64Map, error) {
|
||||
d64m := &Domain64Map{}
|
||||
d64m.DB = db
|
||||
d64m.Flushed = true
|
||||
// Init the tables if a DB pointer is passed in.
|
||||
d64m.DB = nil
|
||||
d64m.Queries = nil
|
||||
return d64m, nil
|
||||
}
|
||||
|
||||
func (d64m *Domain64Map) Setup(db *sql.DB) {
|
||||
log.Printf("creating domain64 tables\n")
|
||||
if _, err := db.ExecContext(context.Background(), ddl); err != nil {
|
||||
log.Printf("setup err: %s", err.Error())
|
||||
panic(err)
|
||||
}
|
||||
d64m.DB = db
|
||||
d64m.Queries = sqlc.New(db)
|
||||
}
|
||||
|
||||
func (d64m *Domain64Map) URLToRFQDN(url *tld.URL) string {
|
||||
s := ""
|
||||
if url.TLD != "" {
|
||||
@@ -140,21 +153,21 @@ func (d64m *Domain64Map) URLToDomain64(url *tld.URL) (*Domain64, error) {
|
||||
}
|
||||
|
||||
d64 := &Domain64{}
|
||||
queries := sqlc.New(d64m.DB)
|
||||
|
||||
// These manipulate both the DB and the Domain64 struct
|
||||
err := _get_or_insert_tld(queries, d64, url)
|
||||
err := _get_or_insert_tld(d64m.Queries, d64, url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = _get_or_insert_domain(queries, d64, url)
|
||||
err = _get_or_insert_domain(d64m.Queries, d64, url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = _get_or_insert_subdomain(queries, d64, url)
|
||||
err = _get_or_insert_subdomain(d64m.Queries, d64, url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = _get_or_insert_path(queries, d64, url)
|
||||
err = _get_or_insert_path(d64m.Queries, d64, url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -3,16 +3,12 @@ package domain64
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
_ "embed"
|
||||
"testing"
|
||||
|
||||
"github.com/jpillora/go-tld"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
//go:embed sqlc/schema.sql
|
||||
var ddl string
|
||||
|
||||
func setup() *sql.DB {
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -32,7 +28,8 @@ func setup() *sql.DB {
|
||||
|
||||
func TestNewDomain64Map(t *testing.T) {
|
||||
db := setup()
|
||||
M, err := NewDomain64Map(db)
|
||||
M, err := NewDomain64Map()
|
||||
M.Setup(db)
|
||||
if err != nil {
|
||||
// TODO
|
||||
t.Error(err)
|
||||
@@ -43,7 +40,8 @@ func TestNewDomain64Map(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestURLToRFQDN(t *testing.T) {
|
||||
M, _ := NewDomain64Map(nil)
|
||||
M, _ := NewDomain64Map()
|
||||
// M.Setup(db)
|
||||
simple, _ := tld.Parse("https://jadud.com/")
|
||||
rfqdn := M.URLToRFQDN(simple)
|
||||
if rfqdn != "com.jadud/" {
|
||||
@@ -53,7 +51,8 @@ func TestURLToRFQDN(t *testing.T) {
|
||||
|
||||
func TestURLToDomain64(t *testing.T) {
|
||||
db := setup()
|
||||
M, _ := NewDomain64Map(db)
|
||||
M, _ := NewDomain64Map()
|
||||
M.Setup(db)
|
||||
simple, _ := tld.Parse("https://jadud.com/")
|
||||
d64, _ := M.URLToDomain64(simple)
|
||||
if d64.TLD != 1 {
|
||||
@@ -67,7 +66,8 @@ func TestURLToDomain64(t *testing.T) {
|
||||
|
||||
func TestURLToDomain64_02(t *testing.T) {
|
||||
db := setup()
|
||||
M, _ := NewDomain64Map(db)
|
||||
M, _ := NewDomain64Map()
|
||||
M.Setup(db)
|
||||
simple1, _ := tld.Parse("https://jadud.com/")
|
||||
simple2, _ := tld.Parse("https://another.com/")
|
||||
d64_1, _ := M.URLToDomain64(simple1)
|
||||
@@ -93,7 +93,8 @@ func TestURLToDomain64_02(t *testing.T) {
|
||||
|
||||
func TestURLToDomain64_03(t *testing.T) {
|
||||
db := setup()
|
||||
M, _ := NewDomain64Map(db)
|
||||
M, _ := NewDomain64Map()
|
||||
M.Setup(db)
|
||||
var tests = []struct {
|
||||
url string
|
||||
tld int64
|
||||
|
||||
39
internal/engine/entre.go
Normal file
39
internal/engine/entre.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"git.jadud.com/jadudm/grosbeak/internal/liteq"
|
||||
base "git.jadud.com/jadudm/grosbeak/internal/types"
|
||||
)
|
||||
|
||||
func Entre(queue *liteq.JobQueue, ej *base.EntreJob) error {
|
||||
n := time.Now()
|
||||
|
||||
var ignore_tag string
|
||||
|
||||
switch ej.UpdateFrequency {
|
||||
case base.UPDATE_DAILY:
|
||||
ignore_tag = fmt.Sprintf("%s|y%d-yd%d", ej.URL, n.Year(), n.YearDay())
|
||||
case base.UPDATE_WEEKLY:
|
||||
ignore_tag = fmt.Sprintf("%s|y%d-w%d", ej.URL, n.Year(), n.YearDay()/7)
|
||||
case base.UPDATE_MONTHLY:
|
||||
ignore_tag = fmt.Sprintf("%s|y%d-m%d", ej.URL, n.Year(), n.Month())
|
||||
default:
|
||||
ignore_tag = fmt.Sprintf("%s|y%d-yd%d", ej.URL, n.Year(), n.YearDay())
|
||||
}
|
||||
|
||||
err := queue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "fetch",
|
||||
// This only works for things in the `queued` state
|
||||
DedupingKey: liteq.IgnoreDuplicate(ignore_tag),
|
||||
Job: ej.AsJson(),
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("entre err %s: %s\n", ej.URL, err.Error())
|
||||
}
|
||||
return err
|
||||
}
|
||||
34
internal/engine/fetch.go
Normal file
34
internal/engine/fetch.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
|
||||
"git.jadud.com/jadudm/grosbeak/internal/domain64"
|
||||
"git.jadud.com/jadudm/grosbeak/internal/liteq"
|
||||
"git.jadud.com/jadudm/grosbeak/internal/types"
|
||||
"github.com/jpillora/go-tld"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
func Fetch(d64m *domain64.Domain64Map) types.QueueWorker {
|
||||
_f := func(ctx context.Context, job *liteq.Job) error {
|
||||
url := gjson.Get(job.Job, "url").String()
|
||||
|
||||
turl, err := tld.Parse(url)
|
||||
if err != nil {
|
||||
// If we can't parse it, shut the job down as completed.
|
||||
return nil
|
||||
}
|
||||
|
||||
d64, err := d64m.URLToDomain64(turl)
|
||||
if err != nil {
|
||||
log.Printf("urltodomain64 err %s: %s\n", url, err.Error())
|
||||
}
|
||||
|
||||
log.Printf("fetching %s 0x%016x\n", url, d64.ToInt64())
|
||||
return nil
|
||||
}
|
||||
|
||||
return _f
|
||||
}
|
||||
Submodule internal/liteq deleted from 5ba4ae43a3
3
internal/liteq/.gitignore
vendored
Normal file
3
internal/liteq/.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
.DS_Store
|
||||
/bench/bench.db*
|
||||
/TODO
|
||||
21
internal/liteq/LICENSE
Normal file
21
internal/liteq/LICENSE
Normal file
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2024 Sebastien Armand
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
29
internal/liteq/Makefile
Normal file
29
internal/liteq/Makefile
Normal file
@@ -0,0 +1,29 @@
|
||||
default:
|
||||
go run *.go || (go get ./... && go run *.go)
|
||||
|
||||
watch:
|
||||
(watchexec -e sql -- make schema-const) & \
|
||||
(watchexec -e sql -- sqlc generate) & \
|
||||
(watchexec -w sqlc.yaml -- sqlc generate) & \
|
||||
(watchexec -e go -c -r -- go test ./... -count 1) & \
|
||||
wait
|
||||
# SQL Watcher done
|
||||
|
||||
# Puts the schema in a constant in go so it can be used to create the table directly
|
||||
schema-const:
|
||||
echo "// Code generated by Makefile. DO NOT EDIT." > internal/schema.go
|
||||
echo "package internal\n" >> internal/schema.go
|
||||
echo "const Schema = \`" >> internal/schema.go
|
||||
cat db/schema.sql >> internal/schema.go
|
||||
echo "\`" >> internal/schema.go
|
||||
|
||||
|
||||
.PHONY: bench
|
||||
bench:
|
||||
make cleanup
|
||||
make schema-const
|
||||
sqlc generate
|
||||
cd bench && go run main.go
|
||||
|
||||
cleanup:
|
||||
rm -f bench/bench.db*
|
||||
172
internal/liteq/README.md
Normal file
172
internal/liteq/README.md
Normal file
@@ -0,0 +1,172 @@
|
||||
# liteq
|
||||
|
||||
Library to have a persistent job queue in Go backed by a SQLite DB.
|
||||
|
||||
## Motivation
|
||||
|
||||
I needed a way to have a persistent queue for a small app so it would survive restarts and allow me to schedule jobs in the future.
|
||||
Since I already had SQLite as a dependency, I didn't want to add another dependency to make the app work. Especially not an external one like RabbitMQ, Redis, SQS or others.
|
||||
|
||||
liteq allows to run tens of thousands of jobs per second if needed. It can also be made to use more than a single DB file to keep growing the concurrency should you need it.
|
||||
|
||||
## Usage
|
||||
|
||||
### Install
|
||||
|
||||
```sh
|
||||
go get github.com/jadudm/liteq
|
||||
```
|
||||
|
||||
### Setup and DB creation
|
||||
|
||||
```go
|
||||
import (
|
||||
"github.com/jadudm/liteq"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Open the sqlite3 DB in the file "liteq.db"
|
||||
liteqDb, err := sql.Open("sqlite3", "liteq.db")
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
// Create the DB table if it doesn't exist
|
||||
liteq.Setup(liteqDb)
|
||||
// create a job queue
|
||||
jqueue := liteq.New(liteqDb)
|
||||
}
|
||||
```
|
||||
|
||||
### Queuing a job
|
||||
|
||||
```go
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "notify.email",
|
||||
Job: `{"email_address": "bob@example.com", "content": "..."}`,
|
||||
})
|
||||
```
|
||||
|
||||
This will send the job with the given payload on a queue called `notify.email`.
|
||||
|
||||
### Consuming
|
||||
|
||||
To consume jobs from the queue, you call the consume method:
|
||||
|
||||
```go
|
||||
jqueue.Consume(context.Background(), liteq.ConsumeParams{
|
||||
Queue: "notify.email",
|
||||
PoolSize: 3,
|
||||
VisibilityTimeout: 20,
|
||||
Worker: func (ctx context.Context, job *liteq.Job) error {
|
||||
return sendEmail(job)
|
||||
},
|
||||
})
|
||||
```
|
||||
|
||||
- `context.Background()` You can pass in a cancellable context and the queue will stop processing when the context is canceled
|
||||
- `Queue` this is the name of the queue we want this consumer to consume from
|
||||
- `PoolSize` this is the number of concurrent consumer we want for this queue
|
||||
- `VisibilityTimeout` is the time that a job will remain reserved to a consumer. After that time has elapsed, if the job hasn't been marked either failed
|
||||
or successful, it will be put back in the queue for others to consume. The error will be added to the job's error list and the number of remaining attempts will be
|
||||
decreased.
|
||||
- `Worker` A callback to process the job. When an error is returned, the job is either returned to the queue for processing with a decreased number of remaining attempts or marked as failed if no more attempts remain.
|
||||
|
||||
### Multiple attempts
|
||||
|
||||
When queueing a job, it is possible to decide how many times this job can be attempted in case of failures:
|
||||
|
||||
```go
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "notify.email",
|
||||
RemainingAttempts: 3,
|
||||
Job: `{"email_address": "bob@example.com", "content": "..."}`,
|
||||
})
|
||||
```
|
||||
|
||||
### Delayed jobs
|
||||
|
||||
When queueing a job, you can decide to execute it at a later point in time:
|
||||
|
||||
```go
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "notify.email",
|
||||
ExecuteAfter: time.Now().Add(6*time.Minute).Unix(),
|
||||
Job: `{"email_address": "bob@example.com", "content": "..."}`,
|
||||
})
|
||||
```
|
||||
|
||||
In this case, the job won't run until the given time.
|
||||
|
||||
### Deduplication
|
||||
|
||||
Sometimes it can be useful to prevent the queueing of multiple messages that would essentially be performing the same task to avoid un-necessary work.
|
||||
|
||||
This is possible in `liteq` via the `DedupingKey` job parameter. There are 2 types of deduping keys:
|
||||
|
||||
- `IgnoreDuplicate` will ignore the new job that was sent and keep the one that was already on the queue
|
||||
- `ReplaceDuplicate` will instead remove the job currently on the queue and use the new one instead
|
||||
|
||||
Assuming we have the following consumer:
|
||||
|
||||
```go
|
||||
jqueue.Consume(context.Background(), liteq.ConsumeParams{
|
||||
Queue: "print",
|
||||
VisibilityTimeout: 20,
|
||||
Worker: func (ctx context.Context, job *liteq.Job) error {
|
||||
fmt.Println(job.Payload)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
```
|
||||
|
||||
And we send the following jobs:
|
||||
|
||||
```go
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "print",
|
||||
Job: `first`,
|
||||
DedupingKey: liteq.IgnoreDuplicate("print.job")
|
||||
})
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "print",
|
||||
Job: `second`,
|
||||
DedupingKey: liteq.IgnoreDuplicate("print.job")
|
||||
})
|
||||
```
|
||||
|
||||
Then the result would be a single output line:
|
||||
|
||||
```
|
||||
first
|
||||
```
|
||||
|
||||
If instead we use `liteq.ReplaceDuplicate`
|
||||
|
||||
```go
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "print",
|
||||
Job: `third`,
|
||||
DedupingKey: liteq.ReplaceDuplicate("print.job")
|
||||
})
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "print",
|
||||
Job: `fourth`,
|
||||
DedupingKey: liteq.ReplaceDuplicate("print.job")
|
||||
})
|
||||
```
|
||||
|
||||
We will output `fourth`
|
||||
|
||||
If we think for example of the scenario of sending email or text notifications about an order to a customer, we could construct a deduping key like:
|
||||
|
||||
```go
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "email.notify",
|
||||
Job: `{"order_id": 123, "customer_id": "abc"}`,
|
||||
DedupingKey: liteq.ReplaceDuplicate("email.notify:%s:%s", customer.ID, order.ID)
|
||||
})
|
||||
```
|
||||
|
||||
That way if the `Order Prepared` status update email hasn't been sent yet by the time we're ready to send the `Order Shipped` email, we can skip the `Order Prepared` one and only send the most recent update to the customer.
|
||||
71
internal/liteq/bench/main.go
Normal file
71
internal/liteq/bench/main.go
Normal file
@@ -0,0 +1,71 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
liteq "git.jadud.com/jadudm/grosbeak/internal/liteq"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
func main() {
|
||||
howMany := 100_000
|
||||
|
||||
go func() {
|
||||
db, err := sql.Open("sqlite", "bench.db")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
liteq.Setup(db)
|
||||
|
||||
jq := liteq.New(db)
|
||||
|
||||
for i := 0; i < howMany; i++ {
|
||||
jq.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: fmt.Sprintf("default-%d", i%30),
|
||||
Job: "SendEmail",
|
||||
})
|
||||
}
|
||||
}()
|
||||
|
||||
c := make(chan struct{})
|
||||
for i := 0; i < 30; i++ {
|
||||
db, err := sql.Open("sqlite", "bench.db")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
liteq.Setup(db)
|
||||
|
||||
jq := liteq.New(db)
|
||||
|
||||
go func(i int) {
|
||||
err := jq.Consume(context.Background(), liteq.ConsumeParams{
|
||||
Queue: fmt.Sprintf("default-%d", i),
|
||||
PoolSize: 10,
|
||||
Worker: func(ctx context.Context, job *liteq.Job) error {
|
||||
// random sleep
|
||||
n := rand.Intn(50)
|
||||
|
||||
time.Sleep(time.Duration(n) * time.Millisecond)
|
||||
c <- struct{}{}
|
||||
return nil
|
||||
},
|
||||
})
|
||||
fmt.Println(err)
|
||||
}(i)
|
||||
}
|
||||
|
||||
rec := 0
|
||||
for range c {
|
||||
rec++
|
||||
if rec%1000 == 0 {
|
||||
fmt.Println(rec)
|
||||
}
|
||||
if rec == howMany {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
137
internal/liteq/db/queries.sql
Normal file
137
internal/liteq/db/queries.sql
Normal file
@@ -0,0 +1,137 @@
|
||||
-- name: doQueueJobIgnoreDupe :exec
|
||||
INSERT INTO
|
||||
jobs (
|
||||
queue,
|
||||
job,
|
||||
execute_after,
|
||||
job_status,
|
||||
created_at,
|
||||
updated_at,
|
||||
remaining_attempts,
|
||||
deduping_key
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
?,
|
||||
?,
|
||||
?,
|
||||
'queued',
|
||||
unixepoch(),
|
||||
unixepoch(),
|
||||
?,
|
||||
?
|
||||
) ON CONFLICT (deduping_key)
|
||||
WHERE
|
||||
deduping_key != '' DO NOTHING;
|
||||
|
||||
-- name: doQueueJobReplaceDupe :exec
|
||||
INSERT INTO
|
||||
jobs (
|
||||
queue,
|
||||
job,
|
||||
execute_after,
|
||||
job_status,
|
||||
created_at,
|
||||
updated_at,
|
||||
remaining_attempts,
|
||||
deduping_key
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
?,
|
||||
?,
|
||||
?,
|
||||
'queued',
|
||||
unixepoch(),
|
||||
unixepoch(),
|
||||
?,
|
||||
?
|
||||
) ON CONFLICT (deduping_key, job_status)
|
||||
WHERE
|
||||
deduping_key != ''
|
||||
AND job_status = 'queued' DO
|
||||
UPDATE
|
||||
SET
|
||||
job = EXCLUDED.job,
|
||||
execute_after = EXCLUDED.execute_after,
|
||||
updated_at = unixepoch(),
|
||||
remaining_attempts = EXCLUDED.remaining_attempts;
|
||||
|
||||
-- name: CompleteJob :exec
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
job_status = 'completed',
|
||||
finished_at = unixepoch(),
|
||||
updated_at = unixepoch(),
|
||||
consumer_fetched_at = 0,
|
||||
remaining_attempts = 0
|
||||
WHERE
|
||||
id = ?;
|
||||
|
||||
-- name: FailJob :exec
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
job_status = CASE
|
||||
WHEN remaining_attempts <= 1 THEN 'failed'
|
||||
ELSE 'queued'
|
||||
END,
|
||||
finished_at = 0,
|
||||
updated_at = unixepoch(),
|
||||
consumer_fetched_at = 0,
|
||||
remaining_attempts = MAX(remaining_attempts - 1, 0),
|
||||
errors = ?
|
||||
WHERE
|
||||
id = ?;
|
||||
|
||||
-- name: MarkJobsForConsumer :many
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
consumer_fetched_at = unixepoch(),
|
||||
updated_at = unixepoch(),
|
||||
job_status = 'fetched'
|
||||
WHERE
|
||||
jobs.job_status = 'queued'
|
||||
AND jobs.remaining_attempts > 0
|
||||
AND jobs.id IN (
|
||||
SELECT
|
||||
id
|
||||
FROM
|
||||
jobs js
|
||||
WHERE
|
||||
js.queue = ?
|
||||
AND js.job_status = 'queued'
|
||||
AND js.execute_after <= ?
|
||||
AND js.remaining_attempts > 0
|
||||
ORDER BY
|
||||
execute_after ASC
|
||||
LIMIT
|
||||
?
|
||||
) RETURNING *;
|
||||
|
||||
-- name: ResetJobs :execrows
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
job_status = CASE
|
||||
WHEN remaining_attempts <= 1 THEN 'failed'
|
||||
ELSE 'queued'
|
||||
END,
|
||||
updated_at = unixepoch(),
|
||||
consumer_fetched_at = 0,
|
||||
remaining_attempts = MAX(remaining_attempts - 1, 0),
|
||||
errors = json_insert(errors, '$[#]', 'visibility timeout expired')
|
||||
WHERE
|
||||
job_status = 'fetched'
|
||||
AND queue = ?
|
||||
AND consumer_fetched_at < ?;
|
||||
|
||||
-- name: FindJob :one
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
jobs
|
||||
WHERE
|
||||
id = ?;
|
||||
31
internal/liteq/db/schema.sql
Normal file
31
internal/liteq/db/schema.sql
Normal file
@@ -0,0 +1,31 @@
|
||||
PRAGMA journal_mode = WAL;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
id INTEGER NOT NULL,
|
||||
queue TEXT NOT NULL,
|
||||
job TEXT NOT NULL,
|
||||
job_status TEXT NOT NULL DEFAULT 'queued',
|
||||
execute_after INTEGER NOT NULL DEFAULT 0,
|
||||
remaining_attempts INTEGER NOT NULL DEFAULT 1,
|
||||
consumer_fetched_at INTEGER NOT NULL DEFAULT 0,
|
||||
finished_at INTEGER NOT NULL DEFAULT 0,
|
||||
deduping_key TEXT NOT NULL DEFAULT '',
|
||||
errors TEXT NOT NULL DEFAULT "[]",
|
||||
created_at INTEGER NOT NULL,
|
||||
updated_at INTEGER NOT NULL,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS todo ON jobs (queue, job_status, execute_after)
|
||||
WHERE
|
||||
job_status = 'queued'
|
||||
OR job_status = 'fetched';
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key)
|
||||
WHERE
|
||||
deduping_key != '';
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status)
|
||||
WHERE
|
||||
deduping_key != ''
|
||||
AND job_status = 'queued';
|
||||
31
internal/liteq/internal/db.go
Normal file
31
internal/liteq/internal/db.go
Normal file
@@ -0,0 +1,31 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.30.0
|
||||
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
type DBTX interface {
|
||||
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
|
||||
PrepareContext(context.Context, string) (*sql.Stmt, error)
|
||||
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
|
||||
}
|
||||
|
||||
func New(db DBTX) *Queries {
|
||||
return &Queries{db: db}
|
||||
}
|
||||
|
||||
type Queries struct {
|
||||
db DBTX
|
||||
}
|
||||
|
||||
func (q *Queries) WithTx(tx *sql.Tx) *Queries {
|
||||
return &Queries{
|
||||
db: tx,
|
||||
}
|
||||
}
|
||||
181
internal/liteq/internal/methods.go
Normal file
181
internal/liteq/internal/methods.go
Normal file
@@ -0,0 +1,181 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"database/sql/driver"
|
||||
|
||||
"github.com/alitto/pond"
|
||||
)
|
||||
|
||||
type QueueJobParams struct {
|
||||
Queue string
|
||||
Job string
|
||||
ExecuteAfter int64
|
||||
RemainingAttempts int64
|
||||
DedupingKey DedupingKey
|
||||
}
|
||||
|
||||
type DedupingKey interface {
|
||||
String() string
|
||||
ReplaceDuplicate() bool
|
||||
}
|
||||
|
||||
type IgnoreDuplicate string
|
||||
|
||||
func (i IgnoreDuplicate) String() string {
|
||||
return string(i)
|
||||
}
|
||||
func (i IgnoreDuplicate) ReplaceDuplicate() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type ReplaceDuplicate string
|
||||
|
||||
func (r ReplaceDuplicate) String() string {
|
||||
return string(r)
|
||||
}
|
||||
func (r ReplaceDuplicate) ReplaceDuplicate() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (q *Queries) QueueJob(ctx context.Context, params QueueJobParams) error {
|
||||
if params.RemainingAttempts == 0 {
|
||||
params.RemainingAttempts = 1
|
||||
}
|
||||
|
||||
if params.DedupingKey == nil {
|
||||
params.DedupingKey = IgnoreDuplicate("")
|
||||
}
|
||||
|
||||
doParams := doQueueJobIgnoreDupeParams{
|
||||
Queue: params.Queue,
|
||||
Job: params.Job,
|
||||
ExecuteAfter: params.ExecuteAfter,
|
||||
RemainingAttempts: params.RemainingAttempts,
|
||||
DedupingKey: params.DedupingKey.String(),
|
||||
}
|
||||
|
||||
if params.DedupingKey.String() == "" {
|
||||
return q.doQueueJobIgnoreDupe(ctx, doParams)
|
||||
}
|
||||
|
||||
if params.DedupingKey.ReplaceDuplicate() {
|
||||
return q.doQueueJobReplaceDupe(ctx, doQueueJobReplaceDupeParams(doParams))
|
||||
}
|
||||
|
||||
return q.doQueueJobIgnoreDupe(ctx, doParams)
|
||||
}
|
||||
|
||||
type GrabJobsParams struct {
|
||||
Queue string
|
||||
ExecuteAfter int64
|
||||
Count int64
|
||||
}
|
||||
|
||||
func (q *Queries) GrabJobs(ctx context.Context, params GrabJobsParams) ([]*Job, error) {
|
||||
executeAfter := time.Now().Unix()
|
||||
if params.ExecuteAfter > 0 {
|
||||
executeAfter = params.ExecuteAfter
|
||||
}
|
||||
limit := int64(1)
|
||||
if params.Count > 0 {
|
||||
limit = params.Count
|
||||
}
|
||||
|
||||
return q.MarkJobsForConsumer(ctx, MarkJobsForConsumerParams{
|
||||
Queue: params.Queue,
|
||||
ExecuteAfter: executeAfter,
|
||||
Limit: limit,
|
||||
})
|
||||
}
|
||||
|
||||
type ConsumeParams struct {
|
||||
Queue string
|
||||
PoolSize int
|
||||
Worker func(context.Context, *Job) error
|
||||
VisibilityTimeout int64
|
||||
OnEmptySleep time.Duration
|
||||
}
|
||||
|
||||
func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
|
||||
workers := pond.New(params.PoolSize, params.PoolSize)
|
||||
sleep := params.OnEmptySleep
|
||||
if sleep == 0 {
|
||||
sleep = 1 * time.Second
|
||||
}
|
||||
for {
|
||||
// If the context gets canceled for example, stop consuming
|
||||
if ctx.Err() != nil {
|
||||
log.Println("context cancelled in Consume()")
|
||||
return nil
|
||||
}
|
||||
|
||||
if params.VisibilityTimeout > 0 {
|
||||
_, err := q.ResetJobs(ctx, ResetJobsParams{
|
||||
Queue: params.Queue,
|
||||
ConsumerFetchedAt: time.Now().Unix() - params.VisibilityTimeout,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Println("error resetting jobs", err.Error())
|
||||
return fmt.Errorf("error resetting jobs: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
jobs, err := q.GrabJobs(ctx, GrabJobsParams{
|
||||
Queue: params.Queue,
|
||||
Count: int64(params.PoolSize),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("error grabbing jobs: %w", err)
|
||||
}
|
||||
|
||||
if len(jobs) == 0 {
|
||||
time.Sleep(sleep)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
job := job
|
||||
workers.Submit(func() {
|
||||
err := params.Worker(ctx, job)
|
||||
if err != nil {
|
||||
log.Println("worker failled", err.Error())
|
||||
q.FailJob(ctx, FailJobParams{
|
||||
ID: job.ID,
|
||||
Errors: ErrorList(append(job.Errors, err.Error())),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
q.CompleteJob(ctx, job.ID)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type ErrorList []string
|
||||
|
||||
func (e ErrorList) Value() (driver.Value, error) {
|
||||
if len(e) == 0 {
|
||||
return "[]", nil
|
||||
}
|
||||
return json.Marshal(e)
|
||||
}
|
||||
|
||||
func (e *ErrorList) Scan(src interface{}) error {
|
||||
switch src := src.(type) {
|
||||
case string:
|
||||
return json.Unmarshal([]byte(src), e)
|
||||
case []byte:
|
||||
return json.Unmarshal(src, e)
|
||||
default:
|
||||
return fmt.Errorf("unsupported type: %T", src)
|
||||
}
|
||||
}
|
||||
20
internal/liteq/internal/models.go
Normal file
20
internal/liteq/internal/models.go
Normal file
@@ -0,0 +1,20 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.30.0
|
||||
|
||||
package internal
|
||||
|
||||
type Job struct {
|
||||
ID int64
|
||||
Queue string
|
||||
Job string
|
||||
JobStatus string
|
||||
ExecuteAfter int64
|
||||
RemainingAttempts int64
|
||||
ConsumerFetchedAt int64
|
||||
FinishedAt int64
|
||||
DedupingKey string
|
||||
Errors ErrorList
|
||||
CreatedAt int64
|
||||
UpdatedAt int64
|
||||
}
|
||||
283
internal/liteq/internal/queries.sql.go
Normal file
283
internal/liteq/internal/queries.sql.go
Normal file
@@ -0,0 +1,283 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.30.0
|
||||
// source: queries.sql
|
||||
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
const completeJob = `-- name: CompleteJob :exec
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
job_status = 'completed',
|
||||
finished_at = unixepoch(),
|
||||
updated_at = unixepoch(),
|
||||
consumer_fetched_at = 0,
|
||||
remaining_attempts = 0
|
||||
WHERE
|
||||
id = ?
|
||||
`
|
||||
|
||||
func (q *Queries) CompleteJob(ctx context.Context, id int64) error {
|
||||
_, err := q.db.ExecContext(ctx, completeJob, id)
|
||||
return err
|
||||
}
|
||||
|
||||
const failJob = `-- name: FailJob :exec
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
job_status = CASE
|
||||
WHEN remaining_attempts <= 1 THEN 'failed'
|
||||
ELSE 'queued'
|
||||
END,
|
||||
finished_at = 0,
|
||||
updated_at = unixepoch(),
|
||||
consumer_fetched_at = 0,
|
||||
remaining_attempts = MAX(remaining_attempts - 1, 0),
|
||||
errors = ?
|
||||
WHERE
|
||||
id = ?
|
||||
`
|
||||
|
||||
type FailJobParams struct {
|
||||
Errors ErrorList
|
||||
ID int64
|
||||
}
|
||||
|
||||
func (q *Queries) FailJob(ctx context.Context, arg FailJobParams) error {
|
||||
_, err := q.db.ExecContext(ctx, failJob, arg.Errors, arg.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
const findJob = `-- name: FindJob :one
|
||||
SELECT
|
||||
id, queue, job, job_status, execute_after, remaining_attempts, consumer_fetched_at, finished_at, deduping_key, errors, created_at, updated_at
|
||||
FROM
|
||||
jobs
|
||||
WHERE
|
||||
id = ?
|
||||
`
|
||||
|
||||
func (q *Queries) FindJob(ctx context.Context, id int64) (*Job, error) {
|
||||
row := q.db.QueryRowContext(ctx, findJob, id)
|
||||
var i Job
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.Queue,
|
||||
&i.Job,
|
||||
&i.JobStatus,
|
||||
&i.ExecuteAfter,
|
||||
&i.RemainingAttempts,
|
||||
&i.ConsumerFetchedAt,
|
||||
&i.FinishedAt,
|
||||
&i.DedupingKey,
|
||||
&i.Errors,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return &i, err
|
||||
}
|
||||
|
||||
const markJobsForConsumer = `-- name: MarkJobsForConsumer :many
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
consumer_fetched_at = unixepoch(),
|
||||
updated_at = unixepoch(),
|
||||
job_status = 'fetched'
|
||||
WHERE
|
||||
jobs.job_status = 'queued'
|
||||
AND jobs.remaining_attempts > 0
|
||||
AND jobs.id IN (
|
||||
SELECT
|
||||
id
|
||||
FROM
|
||||
jobs js
|
||||
WHERE
|
||||
js.queue = ?
|
||||
AND js.job_status = 'queued'
|
||||
AND js.execute_after <= ?
|
||||
AND js.remaining_attempts > 0
|
||||
ORDER BY
|
||||
execute_after ASC
|
||||
LIMIT
|
||||
?
|
||||
) RETURNING id, queue, job, job_status, execute_after, remaining_attempts, consumer_fetched_at, finished_at, deduping_key, errors, created_at, updated_at
|
||||
`
|
||||
|
||||
type MarkJobsForConsumerParams struct {
|
||||
Queue string
|
||||
ExecuteAfter int64
|
||||
Limit int64
|
||||
}
|
||||
|
||||
func (q *Queries) MarkJobsForConsumer(ctx context.Context, arg MarkJobsForConsumerParams) ([]*Job, error) {
|
||||
rows, err := q.db.QueryContext(ctx, markJobsForConsumer, arg.Queue, arg.ExecuteAfter, arg.Limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []*Job
|
||||
for rows.Next() {
|
||||
var i Job
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.Queue,
|
||||
&i.Job,
|
||||
&i.JobStatus,
|
||||
&i.ExecuteAfter,
|
||||
&i.RemainingAttempts,
|
||||
&i.ConsumerFetchedAt,
|
||||
&i.FinishedAt,
|
||||
&i.DedupingKey,
|
||||
&i.Errors,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, &i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const resetJobs = `-- name: ResetJobs :execrows
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
job_status = CASE
|
||||
WHEN remaining_attempts <= 1 THEN 'failed'
|
||||
ELSE 'queued'
|
||||
END,
|
||||
updated_at = unixepoch(),
|
||||
consumer_fetched_at = 0,
|
||||
remaining_attempts = MAX(remaining_attempts - 1, 0),
|
||||
errors = json_insert(errors, '$[#]', 'visibility timeout expired')
|
||||
WHERE
|
||||
job_status = 'fetched'
|
||||
AND queue = ?
|
||||
AND consumer_fetched_at < ?
|
||||
`
|
||||
|
||||
type ResetJobsParams struct {
|
||||
Queue string
|
||||
ConsumerFetchedAt int64
|
||||
}
|
||||
|
||||
func (q *Queries) ResetJobs(ctx context.Context, arg ResetJobsParams) (int64, error) {
|
||||
result, err := q.db.ExecContext(ctx, resetJobs, arg.Queue, arg.ConsumerFetchedAt)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result.RowsAffected()
|
||||
}
|
||||
|
||||
const doQueueJobIgnoreDupe = `-- name: doQueueJobIgnoreDupe :exec
|
||||
INSERT INTO
|
||||
jobs (
|
||||
queue,
|
||||
job,
|
||||
execute_after,
|
||||
job_status,
|
||||
created_at,
|
||||
updated_at,
|
||||
remaining_attempts,
|
||||
deduping_key
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
?,
|
||||
?,
|
||||
?,
|
||||
'queued',
|
||||
unixepoch(),
|
||||
unixepoch(),
|
||||
?,
|
||||
?
|
||||
) ON CONFLICT (deduping_key)
|
||||
WHERE
|
||||
deduping_key != '' DO NOTHING
|
||||
`
|
||||
|
||||
type doQueueJobIgnoreDupeParams struct {
|
||||
Queue string
|
||||
Job string
|
||||
ExecuteAfter int64
|
||||
RemainingAttempts int64
|
||||
DedupingKey string
|
||||
}
|
||||
|
||||
func (q *Queries) doQueueJobIgnoreDupe(ctx context.Context, arg doQueueJobIgnoreDupeParams) error {
|
||||
_, err := q.db.ExecContext(ctx, doQueueJobIgnoreDupe,
|
||||
arg.Queue,
|
||||
arg.Job,
|
||||
arg.ExecuteAfter,
|
||||
arg.RemainingAttempts,
|
||||
arg.DedupingKey,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
const doQueueJobReplaceDupe = `-- name: doQueueJobReplaceDupe :exec
|
||||
INSERT INTO
|
||||
jobs (
|
||||
queue,
|
||||
job,
|
||||
execute_after,
|
||||
job_status,
|
||||
created_at,
|
||||
updated_at,
|
||||
remaining_attempts,
|
||||
deduping_key
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
?,
|
||||
?,
|
||||
?,
|
||||
'queued',
|
||||
unixepoch(),
|
||||
unixepoch(),
|
||||
?,
|
||||
?
|
||||
) ON CONFLICT (deduping_key, job_status)
|
||||
WHERE
|
||||
deduping_key != ''
|
||||
AND job_status = 'queued' DO
|
||||
UPDATE
|
||||
SET
|
||||
job = EXCLUDED.job,
|
||||
execute_after = EXCLUDED.execute_after,
|
||||
updated_at = unixepoch(),
|
||||
remaining_attempts = EXCLUDED.remaining_attempts
|
||||
`
|
||||
|
||||
type doQueueJobReplaceDupeParams struct {
|
||||
Queue string
|
||||
Job string
|
||||
ExecuteAfter int64
|
||||
RemainingAttempts int64
|
||||
DedupingKey string
|
||||
}
|
||||
|
||||
func (q *Queries) doQueueJobReplaceDupe(ctx context.Context, arg doQueueJobReplaceDupeParams) error {
|
||||
_, err := q.db.ExecContext(ctx, doQueueJobReplaceDupe,
|
||||
arg.Queue,
|
||||
arg.Job,
|
||||
arg.ExecuteAfter,
|
||||
arg.RemainingAttempts,
|
||||
arg.DedupingKey,
|
||||
)
|
||||
return err
|
||||
}
|
||||
36
internal/liteq/internal/schema.go
Normal file
36
internal/liteq/internal/schema.go
Normal file
@@ -0,0 +1,36 @@
|
||||
// Code generated by Makefile. DO NOT EDIT.
|
||||
package internal
|
||||
|
||||
const Schema = `
|
||||
PRAGMA journal_mode = WAL;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
id INTEGER NOT NULL,
|
||||
queue TEXT NOT NULL,
|
||||
job TEXT NOT NULL,
|
||||
job_status TEXT NOT NULL DEFAULT 'queued',
|
||||
execute_after INTEGER NOT NULL DEFAULT 0,
|
||||
remaining_attempts INTEGER NOT NULL DEFAULT 1,
|
||||
consumer_fetched_at INTEGER NOT NULL DEFAULT 0,
|
||||
finished_at INTEGER NOT NULL DEFAULT 0,
|
||||
deduping_key TEXT NOT NULL DEFAULT '',
|
||||
errors TEXT NOT NULL DEFAULT "[]",
|
||||
created_at INTEGER NOT NULL,
|
||||
updated_at INTEGER NOT NULL,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS todo ON jobs (queue, job_status, execute_after)
|
||||
WHERE
|
||||
job_status = 'queued'
|
||||
OR job_status = 'fetched';
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key)
|
||||
WHERE
|
||||
deduping_key != '';
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status)
|
||||
WHERE
|
||||
deduping_key != ''
|
||||
AND job_status = 'queued';
|
||||
`
|
||||
438
internal/liteq/internal_test/jobs_test.go
Normal file
438
internal/liteq/internal_test/jobs_test.go
Normal file
@@ -0,0 +1,438 @@
|
||||
package internal_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.jadud.com/jadudm/grosbeak/internal/liteq/internal"
|
||||
"github.com/matryer/is"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
e := m.Run()
|
||||
|
||||
removeFiles("jobs.db", "jobs.db-journal", "jobs.db-wal", "jobs.db-shm")
|
||||
os.Exit(e)
|
||||
}
|
||||
|
||||
func removeFiles(files ...string) error {
|
||||
for _, file := range files {
|
||||
err := os.Remove(file)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getDb(file string) (*sql.DB, error) {
|
||||
if file != ":memory:" {
|
||||
err := removeFiles(file, file+"-journal", file+"-wal", file+"-shm")
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
db, err := sql.Open("sqlite", file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
schema, err := os.ReadFile("../db/schema.sql")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = db.Exec(string(schema))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func getJqueue(file string) (*internal.Queries, error) {
|
||||
sqlcdb, err := getDb(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return internal.New(sqlcdb), nil
|
||||
}
|
||||
|
||||
func Test_QueueJob(t *testing.T) {
|
||||
is := is.New(t)
|
||||
sqlcdb, err := getDb("jobs.db")
|
||||
is.NoErr(err) // error opening sqlite3 database
|
||||
|
||||
jqueue := internal.New(sqlcdb)
|
||||
jobPaylod := `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: jobPaylod,
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected 1 job
|
||||
is.Equal(jobs[0].Job, jobPaylod)
|
||||
}
|
||||
|
||||
func Test_FetchTwice(t *testing.T) {
|
||||
is := is.New(t)
|
||||
sqlcdb, err := getDb("jobs.db")
|
||||
is.NoErr(err) // error opening sqlite3 database
|
||||
|
||||
jqueue := internal.New(sqlcdb)
|
||||
jobPaylod := `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: jobPaylod,
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected 1 job
|
||||
is.Equal(jobs[0].Job, jobPaylod)
|
||||
|
||||
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 0) // expected 0 job
|
||||
}
|
||||
|
||||
// delay jobs, fetch jobs, check that we get no jobs, then check that we get the job after the delay
|
||||
func Test_DelayedJob(t *testing.T) {
|
||||
is := is.New(t)
|
||||
jqueue, err := getJqueue("jobs.db")
|
||||
is.NoErr(err) // error getting job queue
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
|
||||
ExecuteAfter: time.Now().Unix() + 1,
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 0) // expected 0 job
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected 1 job
|
||||
}
|
||||
|
||||
func Test_PrefetchJobs(t *testing.T) {
|
||||
is := is.New(t)
|
||||
jqueue, err := getJqueue(":memory:")
|
||||
is.NoErr(err) // error getting job queue
|
||||
for i := 0; i < 10; i++ {
|
||||
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
}
|
||||
|
||||
// grab the first 2
|
||||
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
Count: 2,
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 2) // expected 2 jobs
|
||||
|
||||
// the next 6
|
||||
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
Count: 6,
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 6) // expected 6 jobs
|
||||
|
||||
// try for 5 but only 2 are left
|
||||
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
Count: 5,
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 2) // expected 2 jobs
|
||||
}
|
||||
|
||||
func Test_Consume(t *testing.T) {
|
||||
is := is.New(t)
|
||||
// This somehow doesn't work well with in memory db
|
||||
jqueue, err := getJqueue("jobs.db")
|
||||
is.NoErr(err) // error getting job queue
|
||||
|
||||
fillq1 := make(chan struct{}, 1)
|
||||
go func() {
|
||||
for i := 0; i < 100; i++ {
|
||||
jobPayload := fmt.Sprintf("q1-%d", i)
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "q1",
|
||||
Job: jobPayload,
|
||||
})
|
||||
|
||||
is.NoErr(err) // error queuing job
|
||||
}
|
||||
fillq1 <- struct{}{}
|
||||
close(fillq1)
|
||||
}()
|
||||
|
||||
fillq2 := make(chan struct{}, 1)
|
||||
go func() {
|
||||
for i := 0; i < 100; i++ {
|
||||
jobPayload := fmt.Sprintf("q2-%d", i)
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "q2",
|
||||
Job: jobPayload,
|
||||
})
|
||||
|
||||
is.NoErr(err) // error queuing job
|
||||
}
|
||||
fillq2 <- struct{}{}
|
||||
close(fillq2)
|
||||
}()
|
||||
|
||||
q1 := make(chan string, 100)
|
||||
q1done := make(chan struct{}, 1)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
go func() {
|
||||
jqueue.Consume(ctx, internal.ConsumeParams{
|
||||
Queue: "q1",
|
||||
PoolSize: 1,
|
||||
Worker: func(ctx context.Context, job *internal.Job) error {
|
||||
q1 <- job.Job
|
||||
if job.Job == "q1-99" {
|
||||
q1done <- struct{}{}
|
||||
close(q1done)
|
||||
close(q1)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}()
|
||||
|
||||
q2 := make(chan string, 100)
|
||||
q2done := make(chan struct{}, 1)
|
||||
ctx2, cancel2 := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
jqueue.Consume(ctx2, internal.ConsumeParams{
|
||||
Queue: "q2",
|
||||
PoolSize: 1,
|
||||
Worker: func(ctx context.Context, job *internal.Job) error {
|
||||
q2 <- job.Job
|
||||
if job.Job == "q2-99" {
|
||||
q2done <- struct{}{}
|
||||
close(q2done)
|
||||
close(q2)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}()
|
||||
|
||||
<-fillq1
|
||||
<-q1done
|
||||
cancel()
|
||||
<-fillq2
|
||||
<-q2done
|
||||
cancel2()
|
||||
|
||||
i := 0
|
||||
for pl := range q1 {
|
||||
is.True(strings.HasPrefix(pl, "q1")) // expected q1-*
|
||||
i++
|
||||
}
|
||||
is.Equal(i, 100) // expected 100 jobs
|
||||
|
||||
j := 0
|
||||
for pl := range q2 {
|
||||
is.True(strings.HasPrefix(pl, "q2")) // expected q2-*
|
||||
j++
|
||||
}
|
||||
is.Equal(j, 100) // expected 100 jobs
|
||||
}
|
||||
|
||||
func Test_MultipleAttempts(t *testing.T) {
|
||||
is := is.New(t)
|
||||
jqueue, err := getJqueue("jobs.db")
|
||||
is.NoErr(err) // error getting job queue
|
||||
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
|
||||
RemainingAttempts: 3,
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected 1 job
|
||||
|
||||
thejob := jobs[0]
|
||||
is.Equal(thejob.RemainingAttempts, int64(3)) // expected 3 attempts
|
||||
|
||||
// Fail and verify
|
||||
err = jqueue.FailJob(context.Background(), internal.FailJobParams{
|
||||
ID: thejob.ID,
|
||||
Errors: internal.ErrorList{"error1"},
|
||||
})
|
||||
is.NoErr(err) // error failing job
|
||||
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected 1 job
|
||||
is.Equal(jobs[0].RemainingAttempts, int64(2)) // expected 2 attempts
|
||||
is.Equal(jobs[0].Errors, internal.ErrorList{"error1"})
|
||||
|
||||
// Fail again and verify
|
||||
err = jqueue.FailJob(context.Background(), internal.FailJobParams{
|
||||
ID: thejob.ID,
|
||||
Errors: internal.ErrorList{"error1", "error2"},
|
||||
})
|
||||
is.NoErr(err) // error failing job
|
||||
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected 1 job
|
||||
is.Equal(jobs[0].RemainingAttempts, int64(1)) // expected 1 attempts
|
||||
is.Equal(jobs[0].Errors, internal.ErrorList{"error1", "error2"})
|
||||
|
||||
// Fail again and verify
|
||||
err = jqueue.FailJob(context.Background(), internal.FailJobParams{
|
||||
ID: thejob.ID,
|
||||
Errors: internal.ErrorList{"error1", "error2", "error3"},
|
||||
})
|
||||
is.NoErr(err) // error failing job
|
||||
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 0) // expected no job since no more attempts
|
||||
}
|
||||
|
||||
func Test_VisibilityTimeout(t *testing.T) {
|
||||
is := is.New(t)
|
||||
jqueue, err := getJqueue("jobs.db")
|
||||
is.NoErr(err) // error getting job queue
|
||||
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
id := int64(0)
|
||||
go func() {
|
||||
jqueue.Consume(ctx, internal.ConsumeParams{
|
||||
Queue: "",
|
||||
PoolSize: 1,
|
||||
VisibilityTimeout: 1,
|
||||
OnEmptySleep: 100 * time.Millisecond,
|
||||
Worker: func(ctx context.Context, job *internal.Job) error {
|
||||
id = job.ID
|
||||
time.Sleep(3 * time.Second)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}()
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
cancel()
|
||||
job, err := jqueue.FindJob(context.Background(), id)
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(job.JobStatus, "failed") // expected fetched
|
||||
is.Equal(job.Errors, internal.ErrorList{"visibility timeout expired"})
|
||||
// Sleep to ensure enough time for the job to finish and avoid panics
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
|
||||
func Test_DedupeIgnore(t *testing.T) {
|
||||
is := is.New(t)
|
||||
jqueue, err := getJqueue("jobs.db")
|
||||
is.NoErr(err) // error getting job queue
|
||||
|
||||
log.Println("A")
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `job:1`,
|
||||
DedupingKey: internal.IgnoreDuplicate("dedupe_ignore"),
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
log.Println("B")
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `job:2`,
|
||||
DedupingKey: internal.IgnoreDuplicate("dedupe_ignore"),
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
Count: 10,
|
||||
})
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected only 1 job due to dedupe
|
||||
is.Equal(jobs[0].Job, `job:1`) // expected job:1
|
||||
}
|
||||
|
||||
func Test_DedupeReplace(t *testing.T) {
|
||||
is := is.New(t)
|
||||
jqueue, err := getJqueue("jobs.db")
|
||||
is.NoErr(err) // error getting job queue
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `job:1`,
|
||||
DedupingKey: internal.ReplaceDuplicate("dedupe_replace"),
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `job:2`,
|
||||
DedupingKey: internal.ReplaceDuplicate("dedupe_replace"),
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
Count: 10,
|
||||
})
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected only 1 job due to dedupe
|
||||
is.Equal(jobs[0].Job, `job:2`) // expected job:1
|
||||
}
|
||||
41
internal/liteq/liteq.go
Normal file
41
internal/liteq/liteq.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package liteq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
internal "git.jadud.com/jadudm/grosbeak/internal/liteq/internal"
|
||||
)
|
||||
|
||||
// Creates the db file with the tables and indexes
|
||||
func Setup(db *sql.DB) error {
|
||||
_, err := db.Exec(internal.Schema)
|
||||
return err
|
||||
}
|
||||
|
||||
func New(db *sql.DB) *JobQueue {
|
||||
queries := internal.New(db)
|
||||
return &JobQueue{queries}
|
||||
}
|
||||
|
||||
type JobQueue struct {
|
||||
queries *internal.Queries
|
||||
}
|
||||
|
||||
type QueueJobParams = internal.QueueJobParams
|
||||
type DedupingKey = internal.DedupingKey
|
||||
type IgnoreDuplicate = internal.IgnoreDuplicate
|
||||
type ReplaceDuplicate = internal.ReplaceDuplicate
|
||||
|
||||
func (jq *JobQueue) QueueJob(ctx context.Context, params QueueJobParams) error {
|
||||
return jq.queries.QueueJob(ctx, params)
|
||||
}
|
||||
|
||||
type ConsumeParams = internal.ConsumeParams
|
||||
|
||||
func (jq *JobQueue) Consume(ctx context.Context, params ConsumeParams) error {
|
||||
return jq.queries.Consume(ctx, params)
|
||||
}
|
||||
|
||||
type ErrorList = internal.ErrorList
|
||||
type Job = internal.Job
|
||||
17
internal/liteq/sqlc.yaml
Normal file
17
internal/liteq/sqlc.yaml
Normal file
@@ -0,0 +1,17 @@
|
||||
version: "2"
|
||||
sql:
|
||||
- engine: "sqlite"
|
||||
queries: "db/queries.sql"
|
||||
schema: "db/schema.sql"
|
||||
gen:
|
||||
go:
|
||||
package: "internal"
|
||||
out: "internal"
|
||||
emit_result_struct_pointers: true
|
||||
overrides:
|
||||
- column: jobs.consumer_id
|
||||
go_type: "string"
|
||||
- column: jobs.errors
|
||||
go_type:
|
||||
import: ""
|
||||
type: "ErrorList"
|
||||
9
internal/types/base.go
Normal file
9
internal/types/base.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.jadud.com/jadudm/grosbeak/internal/liteq"
|
||||
)
|
||||
|
||||
type QueueWorker func(ctx context.Context, job *liteq.Job) error
|
||||
9
internal/types/constants.go
Normal file
9
internal/types/constants.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package types
|
||||
|
||||
type UpdateFrequency string
|
||||
|
||||
const (
|
||||
UPDATE_DAILY UpdateFrequency = "DAILY"
|
||||
UPDATE_WEEKLY UpdateFrequency = "WEEKLY"
|
||||
UPDATE_MONTHLY UpdateFrequency = "MONTHLY"
|
||||
)
|
||||
22
internal/types/jobs.go
Normal file
22
internal/types/jobs.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package types
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
type Job interface {
|
||||
AsJson() string
|
||||
}
|
||||
|
||||
// Used by all of the jobs.
|
||||
func _as_json(j Job) string {
|
||||
as_json, _ := json.Marshal(j)
|
||||
return string(as_json)
|
||||
}
|
||||
|
||||
type EntreJob struct {
|
||||
URL string `json:"url"`
|
||||
UpdateFrequency UpdateFrequency `json:"update_frequency"`
|
||||
}
|
||||
|
||||
func (ej *EntreJob) AsJson() string {
|
||||
return _as_json(ej)
|
||||
}
|
||||
Reference in New Issue
Block a user