Compare commits

..

12 Commits

Author SHA1 Message Date
Matt Jadud
f53639af2f Queueing, prepping to fetch
A lot more to go, but this is the core.

Need to think about how to test the queue handlers.
2025-11-30 21:29:30 -05:00
Matt Jadud
06cdc68be7 Integrated, working.
This integrates the liteq, and it prevents duplicates in a way that
matches my use-case.

I might try and push things back out to a separate module, but for now,
this will do.
2025-11-30 18:01:35 -05:00
Matt Jadud
d212a354fe Fixing import. 2025-11-30 17:20:18 -05:00
Matt Jadud
23923b7be4 Adding properly
The .git folder messed up the initial commit.
2025-11-30 17:18:41 -05:00
Matt Jadud
80b55d1a3b Removing reference. 2025-11-30 17:18:13 -05:00
Matt Jadud
437393a1e7 What 2025-11-30 17:15:30 -05:00
Matt Jadud
6a4f812027 Imported 2025-11-30 17:10:48 -05:00
Matt Jadud
4a05b11843 Adding liteq as a library
This is easier.
2025-11-30 17:05:33 -05:00
Matt Jadud
460f2734ef Domain64 and DB backing tested 2025-11-30 14:06:55 -05:00
Matt Jadud
f72c6b020f Interim while bringing sqlc in 2025-11-30 08:20:39 -05:00
Matt Jadud
0fbf88101f Core structure 2025-11-29 17:05:21 -05:00
Matt Jadud
b80e2421f1 Fixing include 2025-11-29 16:08:30 -05:00
37 changed files with 2564 additions and 6 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
*.db*
*.sqlite*

10
Makefile Normal file
View File

@@ -0,0 +1,10 @@
test: generate
cd internal/domain64 ; make test
generate:
cd internal/domain64 ; make generate
cd internal/liteq ; sqlc generate
cd internal/liteq ; make schema-const
run: generate
go run cmd/api/*.go

View File

@@ -17,3 +17,22 @@ Two reasons.
* SQlite or Postgres * SQlite or Postgres
The use-case is (essentially) single-user. The use-case is (essentially) single-user.
## API
### /fetch/<b64:URL>
## prereqs
* go install github.com/sqlc-dev/sqlc/cmd/sqlc@latest
## packages used
* https://github.com/jpillora/go-tld
* https://pkg.go.dev/modernc.org/sqlite
* https://github.com/sqlc-dev/sqlc
* https://github.com/spf13/afero
* https://github.com/tidwall/gjson
//go:generate stringer -type=UpdateFrequency

78
cmd/api/init.go Normal file
View File

@@ -0,0 +1,78 @@
package main
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"time"
"git.jadud.com/jadudm/grosbeak/internal/domain64"
"git.jadud.com/jadudm/grosbeak/internal/engine"
liteq "git.jadud.com/jadudm/grosbeak/internal/liteq"
"git.jadud.com/jadudm/grosbeak/internal/types"
_ "modernc.org/sqlite"
)
func setupDB() *sql.DB {
// FIXME: This path needs to come from the env.
db, err := sql.Open("sqlite", "grosbeak.sqlite")
db.SetMaxOpenConns(1)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
return db
}
func runQ(queue *liteq.JobQueue, queueName string, worker types.QueueWorker) {
for {
log.Printf("runQ %s\n", queueName)
err := queue.Consume(context.Background(), liteq.ConsumeParams{
Queue: queueName,
PoolSize: 3,
VisibilityTimeout: 20,
Worker: worker,
})
if err != nil {
log.Printf("runQ/%s: %s", queueName, err.Error())
time.Sleep(2 * time.Second)
}
time.Sleep(1 * time.Second)
}
}
func setupLiteQ(db *sql.DB, d64m *domain64.Domain64Map) *liteq.JobQueue {
liteq.Setup(db)
queue := liteq.New(db)
// The queue processes as long as this context is not cancelled.
log.Println("setting up worker queues...")
queues := []struct {
queueName string
worker types.QueueWorker
}{
{"fetch", engine.Fetch(d64m)},
}
for _, q := range queues {
go runQ(queue, q.queueName, q.worker)
}
return queue
}
func setupDomain64Map(db *sql.DB) *domain64.Domain64Map {
d64m, err := domain64.NewDomain64Map()
if err != nil {
log.Printf("newdomain64map err: %s", err.Error())
os.Exit(1)
}
d64m.Setup(db)
return d64m
}

View File

@@ -1,9 +1,40 @@
package main package main
import ( import (
"fmt" "log"
"sync"
"time"
_ "modernc.org/sqlite"
"git.jadud.com/jadudm/grosbeak/internal/engine"
base "git.jadud.com/jadudm/grosbeak/internal/types"
) )
func main() { func main() {
fmt.Printf("True: %s", domain64.ReturnsTrue()) // Don't let `main()` exit
wg := &sync.WaitGroup{}
wg.Add(1)
db := setupDB()
d64m := setupDomain64Map(db)
queue := setupLiteQ(db, d64m)
// Enqueue URLs
urls := []struct {
url string
uf base.UpdateFrequency
}{
{"https://jadud.com/", base.UPDATE_DAILY},
{"https://berea.us/", base.UPDATE_WEEKLY},
}
for _, u := range urls {
engine.Entre(queue, &base.EntreJob{URL: u.url, UpdateFrequency: u.uf})
time.Sleep(1 * time.Second)
}
// Don't exit.
log.Println("Waiting for godot...")
wg.Wait()
} }

30
go.mod
View File

@@ -1,3 +1,29 @@
module git.jadud.com/grosbeak module git.jadud.com/jadudm/grosbeak
go 1.23.2 go 1.24.0
toolchain go1.24.10
require (
github.com/alitto/pond v1.9.2
github.com/jpillora/go-tld v1.2.1
github.com/matryer/is v1.4.1
github.com/tidwall/gjson v1.18.0
modernc.org/sqlite v1.40.1
)
require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/ncruces/go-strftime v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/tidwall/match v1.2.0 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/sys v0.38.0 // indirect
modernc.org/libc v1.67.1 // indirect
modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.11.0 // indirect
)

75
go.sum Normal file
View File

@@ -0,0 +1,75 @@
github.com/alitto/pond v1.9.2 h1:9Qb75z/scEZVCoSU+osVmQ0I0JOeLfdTDafrbcJ8CLs=
github.com/alitto/pond v1.9.2/go.mod h1:xQn3P/sHTYcU/1BR3i86IGIrilcrGC2LiS+E2+CJWsI=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/jpillora/go-tld v1.2.1 h1:kDKOkmXLlskqjcvNs7w5XHLep7c8WM7Xd4HQjxllVMk=
github.com/jpillora/go-tld v1.2.1/go.mod h1:plzIl7xr5UWKGy7R+giuv+L/nOjrPjsoWxy/ST9OBUk=
github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ=
github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/match v1.2.0 h1:0pt8FlkOwjN2fPt4bIl4BoNxb98gGHN2ObFEDkrfZnM=
github.com/tidwall/match v1.2.0/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 h1:DHNhtq3sNNzrvduZZIiFyXWOL9IWaDPHqTnLJp+rCBY=
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39/go.mod h1:46edojNIoXTNOhySWIWdix628clX9ODXwPsQuG6hsK0=
golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk=
golang.org/x/mod v0.30.0/go.mod h1:lAsf5O2EvJeSFMiBxXDki7sCgAxEUcZHXoXMKT4GJKc=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.39.0 h1:ik4ho21kwuQln40uelmciQPp9SipgNDdrafrYA4TmQQ=
golang.org/x/tools v0.39.0/go.mod h1:JnefbkDPyD8UU2kI5fuf8ZX4/yUeh9W877ZeBONxUqQ=
modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis=
modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
modernc.org/ccgo/v4 v4.30.1 h1:4r4U1J6Fhj98NKfSjnPUN7Ze2c6MnAdL0hWw6+LrJpc=
modernc.org/ccgo/v4 v4.30.1/go.mod h1:bIOeI1JL54Utlxn+LwrFyjCx2n2RDiYEaJVSrgdrRfM=
modernc.org/fileutil v1.3.40 h1:ZGMswMNc9JOCrcrakF1HrvmergNLAmxOPjizirpfqBA=
modernc.org/fileutil v1.3.40/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc=
modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
modernc.org/gc/v3 v3.1.1 h1:k8T3gkXWY9sEiytKhcgyiZ2L0DTyCQ/nvX+LoCljoRE=
modernc.org/gc/v3 v3.1.1/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY=
modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks=
modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI=
modernc.org/libc v1.67.1 h1:bFaqOaa5/zbWYJo8aW0tXPX21hXsngG2M7mckCnFSVk=
modernc.org/libc v1.67.1/go.mod h1:QvvnnJ5P7aitu0ReNpVIEyesuhmDLQ8kaEoyMjIFZJA=
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
modernc.org/sqlite v1.40.1 h1:VfuXcxcUWWKRBuP8+BR9L7VnmusMgBNNnBYGEe9w/iY=
modernc.org/sqlite v1.40.1/go.mod h1:9fjQZ0mB1LLP0GYrp39oOJXx/I2sxEnZtzCmEQIKvGE=
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=

View File

@@ -0,0 +1,9 @@
clean:
cd sqlc ; rm -f test.sqlite
cd sqlc ; rm -f *.go
generate: clean
cd sqlc ; sqlc generate
test: generate
go test *.go

114
internal/domain64/README.md Normal file
View File

@@ -0,0 +1,114 @@
# domain64
`domain64` is a BIGINT (or 64-bit) type that can be used to encode all domains we are likely to encounter. It represents well as JSonnet/JSON, and can be used in partitioning database tables easily.
## what is it
To encode all of the TLDs, domains, and subdomains we will encounter, we'll use a `domain64` encoding. It maps the entire URL space into a single, 64-bit number (or, `BIGSERIAL` in Postgres).
```mermaid
packet-beta
0-7: "FF | TLD"
8-31: "FFFFFF | Domain"
32-39: "FF | Subdomain"
40-63: "FFFFFF | Path"
```
```
FF:FFFFFF:FF:FFFFFF
```
or
```
tld:domain:subdomain:path
```
or
```
com:jadud:www:teaching:berea
```
can be indexed/partitioned uniquely.
This lets us track
* 255 (#FF) TLDs
* 16,777,216 (#FFFFFF) domains under each TLD
* 255 (#FF) subdomains under each domain
* 16,777,216 (#FFFFFF) paths on a given domain
## what that means
There are only around 10 TLDs that make up the majority of all sites on the internet. The search engine maxes out at tracking 256 unique TLDs (#00-#FF).
Each TLD can hold up to 16M unique sites. There are 302M `.com` domains, meaning , 36M `.cn`, and 20M `.org`. Again, this is for a "personal" search engine, and it is not intended to scale to handling all of the internet. Handling ~ 5% of `.com` (or 75% of `.org`) is *just fine*.
Under a domain, it is possible to uniquely partition off 255 subdomains (where `00` is "no subdomain").
Paths can be indexed uniquely, up to 16M per subdomain.
## example
```
01:000001:00:000000 com.jadud
01:000001:01:000000 gov.jadud.research
01:000001:02:000000 gov.jadud.teaching
01:000001:02:000001 gov.jadud.teaching/olin
01:000001:02:000002 gov.jadud.teaching/berea
```
| tld | domain | sub | path | hex | dec |
| --- | --- | --- | --- | --- | --- |
| com | jadud | _ | _ | #x0100000100000000 | 72057598332895232 |
| com | jadud | research | _ | #x0100000101000000 | 72057598332895488 |
| com | jadud | teaching | _ | #x0100000102000000 | 72057598366449664 |
| com | jadud | teaching | olin | #x0100000102000001 | 72057598366449665 |
| com | jadud | teaching | berea | #x0100000102000002 | 72057598366449666 |
## for partitioning
On a table that contains a `domain64` value, we can partition based on numeric ranges very efficiently.
```sql
CREATE TABLE comjadud PARTITION OF com
FOR VALUES FROM (0x0100000100000000) TO (0x01000001FFFFFFFF);
```
Or
```sql
CREATE TABLE comjadudresearch PARTITION OF com
FOR VALUES FROM (0x0100000101000000) TO (0xx0100000101FFFFFF);
```
## As Jsonnet/JSON
Jsonnet will naturally sort by the hex key values.
```
{
"01": {
"name": "com",
"children": {
"00000001": {
"name": "jadud",
"children": {
"01": "research",
"02": "teaching",
}
}
}
},
"02": {
"name": "org",
"children": {
...
}
}
}
```

View File

@@ -0,0 +1,183 @@
package domain64
import (
"context"
"database/sql"
_ "embed"
"fmt"
"log"
"slices"
"sync"
sqlc "git.jadud.com/jadudm/grosbeak/internal/domain64/sqlc"
"github.com/jpillora/go-tld"
)
//go:embed sqlc/schema.sql
var ddl string
type Domain64Map struct {
mu sync.Mutex
m map[string]int
DB *sql.DB
Queries *sqlc.Queries
}
func NewDomain64Map() (*Domain64Map, error) {
d64m := &Domain64Map{}
d64m.DB = nil
d64m.Queries = nil
return d64m, nil
}
func (d64m *Domain64Map) Setup(db *sql.DB) {
log.Printf("creating domain64 tables\n")
if _, err := db.ExecContext(context.Background(), ddl); err != nil {
log.Printf("setup err: %s", err.Error())
panic(err)
}
d64m.DB = db
d64m.Queries = sqlc.New(db)
}
func (d64m *Domain64Map) URLToRFQDN(url *tld.URL) string {
s := ""
if url.TLD != "" {
s += url.TLD
}
if url.Domain != "" {
s += "." + url.Domain
}
if url.Subdomain != "" {
s += "." + url.Subdomain
}
s += url.Path
return s
}
func _get_or_insert_tld(queries *sqlc.Queries, d64 *Domain64, url *tld.URL) error {
ctx := context.Background()
tld_id, err := queries.GetTLDId(ctx, url.TLD)
if err != nil {
cnt, err := queries.CountTLDs(ctx)
if err != nil {
return err
}
d64.TLD = cnt + 1
err = queries.InsertTLD(ctx, sqlc.InsertTLDParams{TldID: d64.TLD, Tld: url.TLD})
if err != nil {
return err
}
} else {
d64.TLD = tld_id
}
return nil
}
func _get_or_insert_domain(queries *sqlc.Queries, d64 *Domain64, url *tld.URL) error {
ctx := context.Background()
domain_id, err := queries.GetDomainId(ctx, sqlc.GetDomainIdParams{TldID: d64.TLD, Domain: url.Domain})
if err != nil {
cnt, err := queries.CountDomains(ctx, d64.TLD)
if err != nil {
return err
}
d64.Domain = cnt + 1
err = queries.InsertDomain(ctx, sqlc.InsertDomainParams{TldID: d64.TLD, DomainID: d64.Domain, Domain: url.Domain})
if err != nil {
return err
}
} else {
d64.Domain = domain_id
}
return nil
}
func _get_or_insert_subdomain(queries *sqlc.Queries, d64 *Domain64, url *tld.URL) error {
ctx := context.Background()
subdomain_id, err := queries.GetSubdomainId(ctx, sqlc.GetSubdomainIdParams{
TldID: int64(d64.TLD), DomainID: int64(d64.Domain), Subdomain: url.Subdomain,
})
if err != nil {
if url.Subdomain == "" {
d64.Subdomain = 0
} else {
cnt, err := queries.CountSubdomains(ctx, sqlc.CountSubdomainsParams{TldID: d64.TLD, DomainID: d64.Domain})
if err != nil {
return err
}
d64.Subdomain = cnt + 1
err = queries.InsertSubdomain(ctx, sqlc.InsertSubdomainParams{TldID: d64.TLD, DomainID: d64.Domain, SubdomainID: d64.Subdomain, Subdomain: url.Subdomain})
if err != nil {
return err
}
}
} else {
d64.Subdomain = subdomain_id
}
return nil
}
func _get_or_insert_path(queries *sqlc.Queries, d64 *Domain64, url *tld.URL) error {
ctx := context.Background()
log.Println(url, url.Path)
path_id, err := queries.GetPathId(ctx, sqlc.GetPathIdParams{
TldID: d64.TLD, DomainID: d64.Domain, SubdomainID: d64.Subdomain, Path: url.Path,
})
if err != nil {
if url.Path == "/" {
d64.Path = 0
} else {
cnt, err := queries.CountPaths(ctx, sqlc.CountPathsParams{TldID: d64.TLD, DomainID: d64.Domain, SubdomainID: d64.Subdomain})
if err != nil {
return err
}
d64.Path = cnt + 1
err = queries.InsertPath(ctx, sqlc.InsertPathParams{TldID: d64.TLD, DomainID: d64.Domain, SubdomainID: d64.Subdomain, PathID: d64.Path, Path: url.Path})
if err != nil {
return err
}
}
} else {
d64.Path = path_id
}
return nil
}
// FIXME: This feels like a very convoluted way to maintain the domain names.
// However, I also need to maintain uniqueness. Can I do this in one table?
func (d64m *Domain64Map) URLToDomain64(url *tld.URL) (*Domain64, error) {
allowed_schemes := []string{"https"}
if !slices.Contains(allowed_schemes, url.Scheme) {
return nil, fmt.Errorf("URL scheme must be in %q; given %s", allowed_schemes, url.Scheme)
}
d64 := &Domain64{}
// These manipulate both the DB and the Domain64 struct
err := _get_or_insert_tld(d64m.Queries, d64, url)
if err != nil {
return nil, err
}
err = _get_or_insert_domain(d64m.Queries, d64, url)
if err != nil {
return nil, err
}
err = _get_or_insert_subdomain(d64m.Queries, d64, url)
if err != nil {
return nil, err
}
err = _get_or_insert_path(d64m.Queries, d64, url)
if err != nil {
return nil, err
}
return d64, nil
}
func NullInt64(i int64) sql.NullInt64 {
return sql.NullInt64{Int64: i, Valid: true}
}
func NullString(s string) sql.NullString {
return sql.NullString{String: s, Valid: true}
}

View File

@@ -0,0 +1,140 @@
package domain64
import (
"context"
"database/sql"
"testing"
"github.com/jpillora/go-tld"
_ "modernc.org/sqlite"
)
func setup() *sql.DB {
ctx := context.Background()
// db, err := sql.Open("sqlite", "sqlc/test.sqlite")
db, err := sql.Open("sqlite", ":memory:")
if err != nil {
// TODO
panic(err)
}
// create tables
if _, err := db.ExecContext(ctx, ddl); err != nil {
panic(err)
}
return db
}
func TestNewDomain64Map(t *testing.T) {
db := setup()
M, err := NewDomain64Map()
M.Setup(db)
if err != nil {
// TODO
t.Error(err)
}
if M.DB == nil {
t.Error("DB should not be nil")
}
}
func TestURLToRFQDN(t *testing.T) {
M, _ := NewDomain64Map()
// M.Setup(db)
simple, _ := tld.Parse("https://jadud.com/")
rfqdn := M.URLToRFQDN(simple)
if rfqdn != "com.jadud/" {
t.Errorf("Expected `com.jadud/`, got %s", rfqdn)
}
}
func TestURLToDomain64(t *testing.T) {
db := setup()
M, _ := NewDomain64Map()
M.Setup(db)
simple, _ := tld.Parse("https://jadud.com/")
d64, _ := M.URLToDomain64(simple)
if d64.TLD != 1 {
t.Errorf("expected TLD == 1, got %d", d64.TLD)
}
if d64.Domain != 1 {
t.Errorf("expected domain == 1, got %d", d64.Domain)
}
}
func TestURLToDomain64_02(t *testing.T) {
db := setup()
M, _ := NewDomain64Map()
M.Setup(db)
simple1, _ := tld.Parse("https://jadud.com/")
simple2, _ := tld.Parse("https://another.com/")
d64_1, _ := M.URLToDomain64(simple1)
d64_2, _ := M.URLToDomain64(simple2)
if d64_1.TLD != 1 {
t.Errorf("expected TLD == 1, got %d", d64_1.TLD)
}
if d64_1.Domain != 1 {
t.Errorf("expected domain == 1, got %d", d64_1.Domain)
}
if d64_2.TLD != 1 {
t.Errorf("expected TLD == 1, got %d", d64_2.TLD)
}
if d64_2.Domain != 2 {
t.Errorf("expected domain == 2, got %d", d64_2.Domain)
}
}
func TestURLToDomain64_03(t *testing.T) {
db := setup()
M, _ := NewDomain64Map()
M.Setup(db)
var tests = []struct {
url string
tld int64
domain int64
subdomain int64
path int64
d64 int64
}{
{"https://jadud.com/", 1, 1, 0, 0, 0x0100000100000000},
{"https://research.jadud.com/", 1, 1, 1, 0, 0x0100000101000000},
{"https://teaching.jadud.com/", 1, 1, 2, 0, 0x0100000102000000},
{"https://teaching.jadud.com/classes", 1, 1, 2, 1, 0x0100000102000001},
{"https://teaching.jadud.com/other-classes", 1, 1, 2, 2, 0x0100000102000002},
{"https://research.jadud.com/papers", 1, 1, 1, 1, 0x0100000101000001},
{"https://research.jadud.com/experiments", 1, 1, 1, 2, 0x0100000101000002},
{"https://teaching.another.com/classes", 1, 2, 1, 1, 0x0100000201000001},
{"https://teaching.jadud.org/classes", 2, 1, 1, 1, 0x0200000101000001},
// The ordering here matters; if we see a "bare" domain after first seeing the
// subdomain, I expect the numbering to come out right. That is, subdomain <- 0 and
// path <- 0. That is because of "" and "/" checking on subdomain and path, respectively.
{"https://jadud.org/", 2, 1, 0, 0, 0x0200000100000000},
}
for _, tt := range tests {
parsed, _ := tld.Parse(tt.url)
d64, _ := M.URLToDomain64(parsed)
if d64.TLD != tt.tld {
t.Errorf("%s TLD expected %d given %d", tt.url, tt.tld, d64.TLD)
}
if d64.Domain != tt.domain {
t.Errorf("%s Domain expected %d given %d", tt.url, tt.domain, d64.Domain)
}
if d64.Subdomain != tt.subdomain {
t.Errorf("%s Subdomain expected %d given %d", tt.url, tt.subdomain, d64.Subdomain)
}
if d64.Path != tt.path {
t.Errorf("%s Path expected %d given %d", tt.url, tt.path, d64.Path)
}
if d64.ToInt64() != tt.d64 {
t.Errorf("%s int64 value expected %d given %d", tt.url, tt.d64, d64.ToInt64())
}
}
}

View File

@@ -1,5 +1,50 @@
package domain64 package domain64
func ReturnsTrue() bool { type Domain64 struct {
return true // The TLD is FF
TLD int64
// The Domain is FFFFFF, so the uint16 is the closest we'll get
Domain int64
// Subdomains are FF
Subdomain int64
// Paths are, again, FFFFFF
Path int64
}
/*
```mermaid
packet-beta
0-7: "FF | TLD"
8-31: "FFFFFF | Domain"
32-39: "FF | Subdomain"
40-63: "FFFFFF | Path"
```
*/
const SHIFT_TLD = (64 - 8)
const SHIFT_DOMAIN = (64 - (8 + 24))
const SHIFT_SUBDOMAIN = (64 - (8 + 24 + 8))
const SHIFT_PATH = (64 - (8 + 24 + 8 + 24))
const MASK_TLD = 0xFF0000000000000
const MASK_DOMAIN = 0x00FFFFFF00000000
const MASK_SUBDOMAIN = 0x00000000FF000000
const MASK_PATH = 0x0000000000FFFFFF
func (d64 Domain64) ToInt64() int64 {
var result int64 = 0
result = result | (int64(d64.TLD) << SHIFT_TLD)
result = result | (int64(d64.Domain) << SHIFT_DOMAIN)
result = result | (int64(d64.Subdomain) << SHIFT_SUBDOMAIN)
result = result | (int64(d64.Path) << SHIFT_PATH)
return result
}
func IntToDomain64(i int64) Domain64 {
d64 := Domain64{}
d64.TLD = (i & MASK_TLD) >> SHIFT_TLD
d64.Domain = (i & MASK_DOMAIN) >> SHIFT_DOMAIN
d64.Subdomain = (i & MASK_SUBDOMAIN) >> SHIFT_SUBDOMAIN
d64.Path = i & MASK_PATH
return d64
} }

View File

@@ -0,0 +1,45 @@
package domain64
import (
"testing"
)
// https://gobyexample.com/testing-and-benchmarking
// For testing the conversion between a Domain64 struct
// and the int64 representation of that domain.
var tests = []struct {
d64 Domain64
asInt int64
}{
{
Domain64{
TLD: 1,
Domain: 1,
Subdomain: 1,
Path: 1,
}, 72057598349672449},
{
Domain64{
TLD: 2,
Domain: 1,
Subdomain: 1,
Path: 0,
}, 144115192387600384},
}
func TestDomain64ToInt(t *testing.T) {
for _, tt := range tests {
if tt.d64.ToInt64() != tt.asInt {
t.Errorf("%q != %d Domain64 did not convert", tt.d64, tt.asInt)
}
}
}
func TestIntToDomain64(t *testing.T) {
for _, tt := range tests {
if IntToDomain64(tt.asInt) != tt.d64 {
t.Errorf("%d != %q int64 did not convert", tt.asInt, tt.d64)
}
}
}

2
internal/domain64/sqlc/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
test.sqlite
*.go

View File

@@ -0,0 +1,107 @@
-- name: InsertIntoDomain64 :one
INSERT INTO domain64
(rfqdn, d64)
VALUES
(?, ?)
RETURNING d64
;
-- If you forget a semicolon, the previous
-- query gets flagged as a duplicate.
-- https://github.com/sqlc-dev/sqlc/issues/3851
-- name: GetTLDId :one
SELECT tld_id FROM tlds
WHERE tld = ?
;
-- name: CountTLDs :one
SELECT COUNT(*) FROM tlds;
-- name: InsertTLD :exec
INSERT INTO tlds
(tld_id, tld)
VALUES
(?, ?)
;
-- name: GetDomainId :one
SELECT domain_id FROM domains
WHERE
tld_id = ?
AND
domain = ?
;
-- name: CountDomains :one
SELECT COUNT(DISTINCT domain_id)
FROM domains
WHERE
tld_id = ?
;
-- name: InsertDomain :exec
INSERT INTO domains
(tld_id, domain_id, domain)
VALUES
(?, ?, ?)
;
-- name: GetSubdomainId :one
SELECT subdomain_id FROM subdomains
WHERE
tld_id = ?
AND
domain_id = ?
AND
subdomain = ?
;
-- name: CountSubdomains :one
SELECT COUNT(DISTINCT subdomain_id)
FROM subdomains
WHERE
tld_id = ?
AND
domain_id = ?
;
-- name: InsertSubdomain :exec
INSERT INTO subdomains
(tld_id, domain_id, subdomain_id, subdomain)
VALUES
(?, ?, ?, ?)
;
-- name: GetPathId :one
SELECT path_id FROM paths
WHERE
tld_id = ?
AND
domain_id = ?
AND
subdomain_id = ?
AND
path = ?
;
-- name: CountPaths :one
SELECT COUNT(DISTINCT path_id)
FROM paths
WHERE
tld_id = ?
AND
domain_id = ?
AND
subdomain_id = ?
;
-- name: InsertPath :exec
INSERT INTO paths
(tld_id, domain_id, subdomain_id, path_id, path)
VALUES
(?, ?, ?, ?, ?)
;
-- -- name: CountTLD :one
-- SELECT COUNT(DISTINCT tld_id) FROM domain64 WHERE tld_id = ?;

View File

@@ -0,0 +1,39 @@
CREATE TABLE IF NOT EXISTS tlds (
tld_id INTEGER PRIMARY KEY NOT NULL,
tld TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS domains (
id INTEGER PRIMARY KEY,
tld_id INTEGER NOT NULL REFERENCES tlds(tld_id),
domain_id INTEGER NOT NULL,
domain TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS ndx_domains_uniq ON domains(tld_id, domain_id);
CREATE TABLE IF NOT EXISTS subdomains (
id INTEGER PRIMARY KEY,
tld_id INTEGER NOT NULL REFERENCES tlds(tld_id),
domain_id INTEGER NOT NULL REFERENCES domains(domain_id),
subdomain_id INTEGER NOT NULL,
subdomain TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS ndx_subdomains_uniq ON subdomains(tld_id, domain_id, subdomain_id);
CREATE TABLE IF NOT EXISTS paths (
id INTEGER PRIMARY KEY,
tld_id INTEGER NOT NULL REFERENCES tlds(tld_id),
domain_id INTEGER NOT NULL REFERENCES domains(domain_id),
subdomain_id INTEGER NOT NULL REFERENCES subdomains(subdomain_id),
path_id INTEGER NOT NULL,
path TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS ndx_paths_uniq ON paths(tld_id, domain_id, subdomain_id, path_id);
CREATE TABLE IF NOT EXISTS domain64 (
id INTEGER PRIMARY KEY AUTOINCREMENT,
rfqdn TEXT,
d64 BIGINT UNIQUE NOT NULL
);

View File

@@ -0,0 +1,9 @@
version: "2"
sql:
- engine: "sqlite"
queries: "query.sql"
schema: "schema.sql"
gen:
go:
package: "test_db"
out: "."

39
internal/engine/entre.go Normal file
View File

@@ -0,0 +1,39 @@
package engine
import (
"context"
"fmt"
"log"
"time"
"git.jadud.com/jadudm/grosbeak/internal/liteq"
base "git.jadud.com/jadudm/grosbeak/internal/types"
)
func Entre(queue *liteq.JobQueue, ej *base.EntreJob) error {
n := time.Now()
var ignore_tag string
switch ej.UpdateFrequency {
case base.UPDATE_DAILY:
ignore_tag = fmt.Sprintf("%s|y%d-yd%d", ej.URL, n.Year(), n.YearDay())
case base.UPDATE_WEEKLY:
ignore_tag = fmt.Sprintf("%s|y%d-w%d", ej.URL, n.Year(), n.YearDay()/7)
case base.UPDATE_MONTHLY:
ignore_tag = fmt.Sprintf("%s|y%d-m%d", ej.URL, n.Year(), n.Month())
default:
ignore_tag = fmt.Sprintf("%s|y%d-yd%d", ej.URL, n.Year(), n.YearDay())
}
err := queue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "fetch",
// This only works for things in the `queued` state
DedupingKey: liteq.IgnoreDuplicate(ignore_tag),
Job: ej.AsJson(),
})
if err != nil {
log.Printf("entre err %s: %s\n", ej.URL, err.Error())
}
return err
}

34
internal/engine/fetch.go Normal file
View File

@@ -0,0 +1,34 @@
package engine
import (
"context"
"log"
"git.jadud.com/jadudm/grosbeak/internal/domain64"
"git.jadud.com/jadudm/grosbeak/internal/liteq"
"git.jadud.com/jadudm/grosbeak/internal/types"
"github.com/jpillora/go-tld"
"github.com/tidwall/gjson"
)
func Fetch(d64m *domain64.Domain64Map) types.QueueWorker {
_f := func(ctx context.Context, job *liteq.Job) error {
url := gjson.Get(job.Job, "url").String()
turl, err := tld.Parse(url)
if err != nil {
// If we can't parse it, shut the job down as completed.
return nil
}
d64, err := d64m.URLToDomain64(turl)
if err != nil {
log.Printf("urltodomain64 err %s: %s\n", url, err.Error())
}
log.Printf("fetching %s 0x%016x\n", url, d64.ToInt64())
return nil
}
return _f
}

3
internal/liteq/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
.DS_Store
/bench/bench.db*
/TODO

21
internal/liteq/LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024 Sebastien Armand
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

29
internal/liteq/Makefile Normal file
View File

@@ -0,0 +1,29 @@
default:
go run *.go || (go get ./... && go run *.go)
watch:
(watchexec -e sql -- make schema-const) & \
(watchexec -e sql -- sqlc generate) & \
(watchexec -w sqlc.yaml -- sqlc generate) & \
(watchexec -e go -c -r -- go test ./... -count 1) & \
wait
# SQL Watcher done
# Puts the schema in a constant in go so it can be used to create the table directly
schema-const:
echo "// Code generated by Makefile. DO NOT EDIT." > internal/schema.go
echo "package internal\n" >> internal/schema.go
echo "const Schema = \`" >> internal/schema.go
cat db/schema.sql >> internal/schema.go
echo "\`" >> internal/schema.go
.PHONY: bench
bench:
make cleanup
make schema-const
sqlc generate
cd bench && go run main.go
cleanup:
rm -f bench/bench.db*

172
internal/liteq/README.md Normal file
View File

@@ -0,0 +1,172 @@
# liteq
Library to have a persistent job queue in Go backed by a SQLite DB.
## Motivation
I needed a way to have a persistent queue for a small app so it would survive restarts and allow me to schedule jobs in the future.
Since I already had SQLite as a dependency, I didn't want to add another dependency to make the app work. Especially not an external one like RabbitMQ, Redis, SQS or others.
liteq allows to run tens of thousands of jobs per second if needed. It can also be made to use more than a single DB file to keep growing the concurrency should you need it.
## Usage
### Install
```sh
go get github.com/jadudm/liteq
```
### Setup and DB creation
```go
import (
"github.com/jadudm/liteq"
_ "github.com/mattn/go-sqlite3"
)
func main() {
// Open the sqlite3 DB in the file "liteq.db"
liteqDb, err := sql.Open("sqlite3", "liteq.db")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
// Create the DB table if it doesn't exist
liteq.Setup(liteqDb)
// create a job queue
jqueue := liteq.New(liteqDb)
}
```
### Queuing a job
```go
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "notify.email",
Job: `{"email_address": "bob@example.com", "content": "..."}`,
})
```
This will send the job with the given payload on a queue called `notify.email`.
### Consuming
To consume jobs from the queue, you call the consume method:
```go
jqueue.Consume(context.Background(), liteq.ConsumeParams{
Queue: "notify.email",
PoolSize: 3,
VisibilityTimeout: 20,
Worker: func (ctx context.Context, job *liteq.Job) error {
return sendEmail(job)
},
})
```
- `context.Background()` You can pass in a cancellable context and the queue will stop processing when the context is canceled
- `Queue` this is the name of the queue we want this consumer to consume from
- `PoolSize` this is the number of concurrent consumer we want for this queue
- `VisibilityTimeout` is the time that a job will remain reserved to a consumer. After that time has elapsed, if the job hasn't been marked either failed
or successful, it will be put back in the queue for others to consume. The error will be added to the job's error list and the number of remaining attempts will be
decreased.
- `Worker` A callback to process the job. When an error is returned, the job is either returned to the queue for processing with a decreased number of remaining attempts or marked as failed if no more attempts remain.
### Multiple attempts
When queueing a job, it is possible to decide how many times this job can be attempted in case of failures:
```go
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "notify.email",
RemainingAttempts: 3,
Job: `{"email_address": "bob@example.com", "content": "..."}`,
})
```
### Delayed jobs
When queueing a job, you can decide to execute it at a later point in time:
```go
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "notify.email",
ExecuteAfter: time.Now().Add(6*time.Minute).Unix(),
Job: `{"email_address": "bob@example.com", "content": "..."}`,
})
```
In this case, the job won't run until the given time.
### Deduplication
Sometimes it can be useful to prevent the queueing of multiple messages that would essentially be performing the same task to avoid un-necessary work.
This is possible in `liteq` via the `DedupingKey` job parameter. There are 2 types of deduping keys:
- `IgnoreDuplicate` will ignore the new job that was sent and keep the one that was already on the queue
- `ReplaceDuplicate` will instead remove the job currently on the queue and use the new one instead
Assuming we have the following consumer:
```go
jqueue.Consume(context.Background(), liteq.ConsumeParams{
Queue: "print",
VisibilityTimeout: 20,
Worker: func (ctx context.Context, job *liteq.Job) error {
fmt.Println(job.Payload)
return nil
},
})
```
And we send the following jobs:
```go
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "print",
Job: `first`,
DedupingKey: liteq.IgnoreDuplicate("print.job")
})
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "print",
Job: `second`,
DedupingKey: liteq.IgnoreDuplicate("print.job")
})
```
Then the result would be a single output line:
```
first
```
If instead we use `liteq.ReplaceDuplicate`
```go
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "print",
Job: `third`,
DedupingKey: liteq.ReplaceDuplicate("print.job")
})
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "print",
Job: `fourth`,
DedupingKey: liteq.ReplaceDuplicate("print.job")
})
```
We will output `fourth`
If we think for example of the scenario of sending email or text notifications about an order to a customer, we could construct a deduping key like:
```go
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "email.notify",
Job: `{"order_id": 123, "customer_id": "abc"}`,
DedupingKey: liteq.ReplaceDuplicate("email.notify:%s:%s", customer.ID, order.ID)
})
```
That way if the `Order Prepared` status update email hasn't been sent yet by the time we're ready to send the `Order Shipped` email, we can skip the `Order Prepared` one and only send the most recent update to the customer.

View File

@@ -0,0 +1,71 @@
package main
import (
"context"
"database/sql"
"fmt"
"math/rand"
"time"
liteq "git.jadud.com/jadudm/grosbeak/internal/liteq"
_ "modernc.org/sqlite"
)
func main() {
howMany := 100_000
go func() {
db, err := sql.Open("sqlite", "bench.db")
if err != nil {
panic(err)
}
liteq.Setup(db)
jq := liteq.New(db)
for i := 0; i < howMany; i++ {
jq.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: fmt.Sprintf("default-%d", i%30),
Job: "SendEmail",
})
}
}()
c := make(chan struct{})
for i := 0; i < 30; i++ {
db, err := sql.Open("sqlite", "bench.db")
if err != nil {
panic(err)
}
liteq.Setup(db)
jq := liteq.New(db)
go func(i int) {
err := jq.Consume(context.Background(), liteq.ConsumeParams{
Queue: fmt.Sprintf("default-%d", i),
PoolSize: 10,
Worker: func(ctx context.Context, job *liteq.Job) error {
// random sleep
n := rand.Intn(50)
time.Sleep(time.Duration(n) * time.Millisecond)
c <- struct{}{}
return nil
},
})
fmt.Println(err)
}(i)
}
rec := 0
for range c {
rec++
if rec%1000 == 0 {
fmt.Println(rec)
}
if rec == howMany {
return
}
}
}

View File

@@ -0,0 +1,137 @@
-- name: doQueueJobIgnoreDupe :exec
INSERT INTO
jobs (
queue,
job,
execute_after,
job_status,
created_at,
updated_at,
remaining_attempts,
deduping_key
)
VALUES
(
?,
?,
?,
'queued',
unixepoch(),
unixepoch(),
?,
?
) ON CONFLICT (deduping_key)
WHERE
deduping_key != '' DO NOTHING;
-- name: doQueueJobReplaceDupe :exec
INSERT INTO
jobs (
queue,
job,
execute_after,
job_status,
created_at,
updated_at,
remaining_attempts,
deduping_key
)
VALUES
(
?,
?,
?,
'queued',
unixepoch(),
unixepoch(),
?,
?
) ON CONFLICT (deduping_key, job_status)
WHERE
deduping_key != ''
AND job_status = 'queued' DO
UPDATE
SET
job = EXCLUDED.job,
execute_after = EXCLUDED.execute_after,
updated_at = unixepoch(),
remaining_attempts = EXCLUDED.remaining_attempts;
-- name: CompleteJob :exec
UPDATE
jobs
SET
job_status = 'completed',
finished_at = unixepoch(),
updated_at = unixepoch(),
consumer_fetched_at = 0,
remaining_attempts = 0
WHERE
id = ?;
-- name: FailJob :exec
UPDATE
jobs
SET
job_status = CASE
WHEN remaining_attempts <= 1 THEN 'failed'
ELSE 'queued'
END,
finished_at = 0,
updated_at = unixepoch(),
consumer_fetched_at = 0,
remaining_attempts = MAX(remaining_attempts - 1, 0),
errors = ?
WHERE
id = ?;
-- name: MarkJobsForConsumer :many
UPDATE
jobs
SET
consumer_fetched_at = unixepoch(),
updated_at = unixepoch(),
job_status = 'fetched'
WHERE
jobs.job_status = 'queued'
AND jobs.remaining_attempts > 0
AND jobs.id IN (
SELECT
id
FROM
jobs js
WHERE
js.queue = ?
AND js.job_status = 'queued'
AND js.execute_after <= ?
AND js.remaining_attempts > 0
ORDER BY
execute_after ASC
LIMIT
?
) RETURNING *;
-- name: ResetJobs :execrows
UPDATE
jobs
SET
job_status = CASE
WHEN remaining_attempts <= 1 THEN 'failed'
ELSE 'queued'
END,
updated_at = unixepoch(),
consumer_fetched_at = 0,
remaining_attempts = MAX(remaining_attempts - 1, 0),
errors = json_insert(errors, '$[#]', 'visibility timeout expired')
WHERE
job_status = 'fetched'
AND queue = ?
AND consumer_fetched_at < ?;
-- name: FindJob :one
SELECT
*
FROM
jobs
WHERE
id = ?;

View File

@@ -0,0 +1,31 @@
PRAGMA journal_mode = WAL;
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER NOT NULL,
queue TEXT NOT NULL,
job TEXT NOT NULL,
job_status TEXT NOT NULL DEFAULT 'queued',
execute_after INTEGER NOT NULL DEFAULT 0,
remaining_attempts INTEGER NOT NULL DEFAULT 1,
consumer_fetched_at INTEGER NOT NULL DEFAULT 0,
finished_at INTEGER NOT NULL DEFAULT 0,
deduping_key TEXT NOT NULL DEFAULT '',
errors TEXT NOT NULL DEFAULT "[]",
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
PRIMARY KEY (id)
);
CREATE INDEX IF NOT EXISTS todo ON jobs (queue, job_status, execute_after)
WHERE
job_status = 'queued'
OR job_status = 'fetched';
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key)
WHERE
deduping_key != '';
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status)
WHERE
deduping_key != ''
AND job_status = 'queued';

View File

@@ -0,0 +1,31 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
package internal
import (
"context"
"database/sql"
)
type DBTX interface {
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
PrepareContext(context.Context, string) (*sql.Stmt, error)
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
}
func New(db DBTX) *Queries {
return &Queries{db: db}
}
type Queries struct {
db DBTX
}
func (q *Queries) WithTx(tx *sql.Tx) *Queries {
return &Queries{
db: tx,
}
}

View File

@@ -0,0 +1,181 @@
package internal
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"database/sql/driver"
"github.com/alitto/pond"
)
type QueueJobParams struct {
Queue string
Job string
ExecuteAfter int64
RemainingAttempts int64
DedupingKey DedupingKey
}
type DedupingKey interface {
String() string
ReplaceDuplicate() bool
}
type IgnoreDuplicate string
func (i IgnoreDuplicate) String() string {
return string(i)
}
func (i IgnoreDuplicate) ReplaceDuplicate() bool {
return false
}
type ReplaceDuplicate string
func (r ReplaceDuplicate) String() string {
return string(r)
}
func (r ReplaceDuplicate) ReplaceDuplicate() bool {
return true
}
func (q *Queries) QueueJob(ctx context.Context, params QueueJobParams) error {
if params.RemainingAttempts == 0 {
params.RemainingAttempts = 1
}
if params.DedupingKey == nil {
params.DedupingKey = IgnoreDuplicate("")
}
doParams := doQueueJobIgnoreDupeParams{
Queue: params.Queue,
Job: params.Job,
ExecuteAfter: params.ExecuteAfter,
RemainingAttempts: params.RemainingAttempts,
DedupingKey: params.DedupingKey.String(),
}
if params.DedupingKey.String() == "" {
return q.doQueueJobIgnoreDupe(ctx, doParams)
}
if params.DedupingKey.ReplaceDuplicate() {
return q.doQueueJobReplaceDupe(ctx, doQueueJobReplaceDupeParams(doParams))
}
return q.doQueueJobIgnoreDupe(ctx, doParams)
}
type GrabJobsParams struct {
Queue string
ExecuteAfter int64
Count int64
}
func (q *Queries) GrabJobs(ctx context.Context, params GrabJobsParams) ([]*Job, error) {
executeAfter := time.Now().Unix()
if params.ExecuteAfter > 0 {
executeAfter = params.ExecuteAfter
}
limit := int64(1)
if params.Count > 0 {
limit = params.Count
}
return q.MarkJobsForConsumer(ctx, MarkJobsForConsumerParams{
Queue: params.Queue,
ExecuteAfter: executeAfter,
Limit: limit,
})
}
type ConsumeParams struct {
Queue string
PoolSize int
Worker func(context.Context, *Job) error
VisibilityTimeout int64
OnEmptySleep time.Duration
}
func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
workers := pond.New(params.PoolSize, params.PoolSize)
sleep := params.OnEmptySleep
if sleep == 0 {
sleep = 1 * time.Second
}
for {
// If the context gets canceled for example, stop consuming
if ctx.Err() != nil {
log.Println("context cancelled in Consume()")
return nil
}
if params.VisibilityTimeout > 0 {
_, err := q.ResetJobs(ctx, ResetJobsParams{
Queue: params.Queue,
ConsumerFetchedAt: time.Now().Unix() - params.VisibilityTimeout,
})
if err != nil {
log.Println("error resetting jobs", err.Error())
return fmt.Errorf("error resetting jobs: %w", err)
}
}
jobs, err := q.GrabJobs(ctx, GrabJobsParams{
Queue: params.Queue,
Count: int64(params.PoolSize),
})
if err != nil {
return fmt.Errorf("error grabbing jobs: %w", err)
}
if len(jobs) == 0 {
time.Sleep(sleep)
continue
}
for _, job := range jobs {
job := job
workers.Submit(func() {
err := params.Worker(ctx, job)
if err != nil {
log.Println("worker failled", err.Error())
q.FailJob(ctx, FailJobParams{
ID: job.ID,
Errors: ErrorList(append(job.Errors, err.Error())),
})
return
}
q.CompleteJob(ctx, job.ID)
})
}
}
}
type ErrorList []string
func (e ErrorList) Value() (driver.Value, error) {
if len(e) == 0 {
return "[]", nil
}
return json.Marshal(e)
}
func (e *ErrorList) Scan(src interface{}) error {
switch src := src.(type) {
case string:
return json.Unmarshal([]byte(src), e)
case []byte:
return json.Unmarshal(src, e)
default:
return fmt.Errorf("unsupported type: %T", src)
}
}

View File

@@ -0,0 +1,20 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
package internal
type Job struct {
ID int64
Queue string
Job string
JobStatus string
ExecuteAfter int64
RemainingAttempts int64
ConsumerFetchedAt int64
FinishedAt int64
DedupingKey string
Errors ErrorList
CreatedAt int64
UpdatedAt int64
}

View File

@@ -0,0 +1,283 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// source: queries.sql
package internal
import (
"context"
)
const completeJob = `-- name: CompleteJob :exec
UPDATE
jobs
SET
job_status = 'completed',
finished_at = unixepoch(),
updated_at = unixepoch(),
consumer_fetched_at = 0,
remaining_attempts = 0
WHERE
id = ?
`
func (q *Queries) CompleteJob(ctx context.Context, id int64) error {
_, err := q.db.ExecContext(ctx, completeJob, id)
return err
}
const failJob = `-- name: FailJob :exec
UPDATE
jobs
SET
job_status = CASE
WHEN remaining_attempts <= 1 THEN 'failed'
ELSE 'queued'
END,
finished_at = 0,
updated_at = unixepoch(),
consumer_fetched_at = 0,
remaining_attempts = MAX(remaining_attempts - 1, 0),
errors = ?
WHERE
id = ?
`
type FailJobParams struct {
Errors ErrorList
ID int64
}
func (q *Queries) FailJob(ctx context.Context, arg FailJobParams) error {
_, err := q.db.ExecContext(ctx, failJob, arg.Errors, arg.ID)
return err
}
const findJob = `-- name: FindJob :one
SELECT
id, queue, job, job_status, execute_after, remaining_attempts, consumer_fetched_at, finished_at, deduping_key, errors, created_at, updated_at
FROM
jobs
WHERE
id = ?
`
func (q *Queries) FindJob(ctx context.Context, id int64) (*Job, error) {
row := q.db.QueryRowContext(ctx, findJob, id)
var i Job
err := row.Scan(
&i.ID,
&i.Queue,
&i.Job,
&i.JobStatus,
&i.ExecuteAfter,
&i.RemainingAttempts,
&i.ConsumerFetchedAt,
&i.FinishedAt,
&i.DedupingKey,
&i.Errors,
&i.CreatedAt,
&i.UpdatedAt,
)
return &i, err
}
const markJobsForConsumer = `-- name: MarkJobsForConsumer :many
UPDATE
jobs
SET
consumer_fetched_at = unixepoch(),
updated_at = unixepoch(),
job_status = 'fetched'
WHERE
jobs.job_status = 'queued'
AND jobs.remaining_attempts > 0
AND jobs.id IN (
SELECT
id
FROM
jobs js
WHERE
js.queue = ?
AND js.job_status = 'queued'
AND js.execute_after <= ?
AND js.remaining_attempts > 0
ORDER BY
execute_after ASC
LIMIT
?
) RETURNING id, queue, job, job_status, execute_after, remaining_attempts, consumer_fetched_at, finished_at, deduping_key, errors, created_at, updated_at
`
type MarkJobsForConsumerParams struct {
Queue string
ExecuteAfter int64
Limit int64
}
func (q *Queries) MarkJobsForConsumer(ctx context.Context, arg MarkJobsForConsumerParams) ([]*Job, error) {
rows, err := q.db.QueryContext(ctx, markJobsForConsumer, arg.Queue, arg.ExecuteAfter, arg.Limit)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*Job
for rows.Next() {
var i Job
if err := rows.Scan(
&i.ID,
&i.Queue,
&i.Job,
&i.JobStatus,
&i.ExecuteAfter,
&i.RemainingAttempts,
&i.ConsumerFetchedAt,
&i.FinishedAt,
&i.DedupingKey,
&i.Errors,
&i.CreatedAt,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const resetJobs = `-- name: ResetJobs :execrows
UPDATE
jobs
SET
job_status = CASE
WHEN remaining_attempts <= 1 THEN 'failed'
ELSE 'queued'
END,
updated_at = unixepoch(),
consumer_fetched_at = 0,
remaining_attempts = MAX(remaining_attempts - 1, 0),
errors = json_insert(errors, '$[#]', 'visibility timeout expired')
WHERE
job_status = 'fetched'
AND queue = ?
AND consumer_fetched_at < ?
`
type ResetJobsParams struct {
Queue string
ConsumerFetchedAt int64
}
func (q *Queries) ResetJobs(ctx context.Context, arg ResetJobsParams) (int64, error) {
result, err := q.db.ExecContext(ctx, resetJobs, arg.Queue, arg.ConsumerFetchedAt)
if err != nil {
return 0, err
}
return result.RowsAffected()
}
const doQueueJobIgnoreDupe = `-- name: doQueueJobIgnoreDupe :exec
INSERT INTO
jobs (
queue,
job,
execute_after,
job_status,
created_at,
updated_at,
remaining_attempts,
deduping_key
)
VALUES
(
?,
?,
?,
'queued',
unixepoch(),
unixepoch(),
?,
?
) ON CONFLICT (deduping_key)
WHERE
deduping_key != '' DO NOTHING
`
type doQueueJobIgnoreDupeParams struct {
Queue string
Job string
ExecuteAfter int64
RemainingAttempts int64
DedupingKey string
}
func (q *Queries) doQueueJobIgnoreDupe(ctx context.Context, arg doQueueJobIgnoreDupeParams) error {
_, err := q.db.ExecContext(ctx, doQueueJobIgnoreDupe,
arg.Queue,
arg.Job,
arg.ExecuteAfter,
arg.RemainingAttempts,
arg.DedupingKey,
)
return err
}
const doQueueJobReplaceDupe = `-- name: doQueueJobReplaceDupe :exec
INSERT INTO
jobs (
queue,
job,
execute_after,
job_status,
created_at,
updated_at,
remaining_attempts,
deduping_key
)
VALUES
(
?,
?,
?,
'queued',
unixepoch(),
unixepoch(),
?,
?
) ON CONFLICT (deduping_key, job_status)
WHERE
deduping_key != ''
AND job_status = 'queued' DO
UPDATE
SET
job = EXCLUDED.job,
execute_after = EXCLUDED.execute_after,
updated_at = unixepoch(),
remaining_attempts = EXCLUDED.remaining_attempts
`
type doQueueJobReplaceDupeParams struct {
Queue string
Job string
ExecuteAfter int64
RemainingAttempts int64
DedupingKey string
}
func (q *Queries) doQueueJobReplaceDupe(ctx context.Context, arg doQueueJobReplaceDupeParams) error {
_, err := q.db.ExecContext(ctx, doQueueJobReplaceDupe,
arg.Queue,
arg.Job,
arg.ExecuteAfter,
arg.RemainingAttempts,
arg.DedupingKey,
)
return err
}

View File

@@ -0,0 +1,36 @@
// Code generated by Makefile. DO NOT EDIT.
package internal
const Schema = `
PRAGMA journal_mode = WAL;
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER NOT NULL,
queue TEXT NOT NULL,
job TEXT NOT NULL,
job_status TEXT NOT NULL DEFAULT 'queued',
execute_after INTEGER NOT NULL DEFAULT 0,
remaining_attempts INTEGER NOT NULL DEFAULT 1,
consumer_fetched_at INTEGER NOT NULL DEFAULT 0,
finished_at INTEGER NOT NULL DEFAULT 0,
deduping_key TEXT NOT NULL DEFAULT '',
errors TEXT NOT NULL DEFAULT "[]",
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
PRIMARY KEY (id)
);
CREATE INDEX IF NOT EXISTS todo ON jobs (queue, job_status, execute_after)
WHERE
job_status = 'queued'
OR job_status = 'fetched';
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key)
WHERE
deduping_key != '';
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status)
WHERE
deduping_key != ''
AND job_status = 'queued';
`

View File

@@ -0,0 +1,438 @@
package internal_test
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"strings"
"testing"
"time"
"git.jadud.com/jadudm/grosbeak/internal/liteq/internal"
"github.com/matryer/is"
_ "modernc.org/sqlite"
)
func TestMain(m *testing.M) {
e := m.Run()
removeFiles("jobs.db", "jobs.db-journal", "jobs.db-wal", "jobs.db-shm")
os.Exit(e)
}
func removeFiles(files ...string) error {
for _, file := range files {
err := os.Remove(file)
if err != nil && !os.IsNotExist(err) {
return err
}
}
return nil
}
func getDb(file string) (*sql.DB, error) {
if file != ":memory:" {
err := removeFiles(file, file+"-journal", file+"-wal", file+"-shm")
if err != nil && !os.IsNotExist(err) {
return nil, err
}
}
db, err := sql.Open("sqlite", file)
if err != nil {
return nil, err
}
schema, err := os.ReadFile("../db/schema.sql")
if err != nil {
return nil, err
}
_, err = db.Exec(string(schema))
if err != nil {
return nil, err
}
return db, nil
}
func getJqueue(file string) (*internal.Queries, error) {
sqlcdb, err := getDb(file)
if err != nil {
return nil, err
}
return internal.New(sqlcdb), nil
}
func Test_QueueJob(t *testing.T) {
is := is.New(t)
sqlcdb, err := getDb("jobs.db")
is.NoErr(err) // error opening sqlite3 database
jqueue := internal.New(sqlcdb)
jobPaylod := `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: jobPaylod,
})
is.NoErr(err) // error queuing job
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected 1 job
is.Equal(jobs[0].Job, jobPaylod)
}
func Test_FetchTwice(t *testing.T) {
is := is.New(t)
sqlcdb, err := getDb("jobs.db")
is.NoErr(err) // error opening sqlite3 database
jqueue := internal.New(sqlcdb)
jobPaylod := `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: jobPaylod,
})
is.NoErr(err) // error queuing job
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected 1 job
is.Equal(jobs[0].Job, jobPaylod)
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 0) // expected 0 job
}
// delay jobs, fetch jobs, check that we get no jobs, then check that we get the job after the delay
func Test_DelayedJob(t *testing.T) {
is := is.New(t)
jqueue, err := getJqueue("jobs.db")
is.NoErr(err) // error getting job queue
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
ExecuteAfter: time.Now().Unix() + 1,
})
is.NoErr(err) // error queuing job
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 0) // expected 0 job
time.Sleep(1 * time.Second)
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected 1 job
}
func Test_PrefetchJobs(t *testing.T) {
is := is.New(t)
jqueue, err := getJqueue(":memory:")
is.NoErr(err) // error getting job queue
for i := 0; i < 10; i++ {
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
})
is.NoErr(err) // error queuing job
}
// grab the first 2
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
Count: 2,
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 2) // expected 2 jobs
// the next 6
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
Count: 6,
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 6) // expected 6 jobs
// try for 5 but only 2 are left
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
Count: 5,
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 2) // expected 2 jobs
}
func Test_Consume(t *testing.T) {
is := is.New(t)
// This somehow doesn't work well with in memory db
jqueue, err := getJqueue("jobs.db")
is.NoErr(err) // error getting job queue
fillq1 := make(chan struct{}, 1)
go func() {
for i := 0; i < 100; i++ {
jobPayload := fmt.Sprintf("q1-%d", i)
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "q1",
Job: jobPayload,
})
is.NoErr(err) // error queuing job
}
fillq1 <- struct{}{}
close(fillq1)
}()
fillq2 := make(chan struct{}, 1)
go func() {
for i := 0; i < 100; i++ {
jobPayload := fmt.Sprintf("q2-%d", i)
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "q2",
Job: jobPayload,
})
is.NoErr(err) // error queuing job
}
fillq2 <- struct{}{}
close(fillq2)
}()
q1 := make(chan string, 100)
q1done := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(context.Background())
go func() {
jqueue.Consume(ctx, internal.ConsumeParams{
Queue: "q1",
PoolSize: 1,
Worker: func(ctx context.Context, job *internal.Job) error {
q1 <- job.Job
if job.Job == "q1-99" {
q1done <- struct{}{}
close(q1done)
close(q1)
}
return nil
},
})
}()
q2 := make(chan string, 100)
q2done := make(chan struct{}, 1)
ctx2, cancel2 := context.WithCancel(context.Background())
go func() {
jqueue.Consume(ctx2, internal.ConsumeParams{
Queue: "q2",
PoolSize: 1,
Worker: func(ctx context.Context, job *internal.Job) error {
q2 <- job.Job
if job.Job == "q2-99" {
q2done <- struct{}{}
close(q2done)
close(q2)
}
return nil
},
})
}()
<-fillq1
<-q1done
cancel()
<-fillq2
<-q2done
cancel2()
i := 0
for pl := range q1 {
is.True(strings.HasPrefix(pl, "q1")) // expected q1-*
i++
}
is.Equal(i, 100) // expected 100 jobs
j := 0
for pl := range q2 {
is.True(strings.HasPrefix(pl, "q2")) // expected q2-*
j++
}
is.Equal(j, 100) // expected 100 jobs
}
func Test_MultipleAttempts(t *testing.T) {
is := is.New(t)
jqueue, err := getJqueue("jobs.db")
is.NoErr(err) // error getting job queue
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
RemainingAttempts: 3,
})
is.NoErr(err) // error queuing job
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected 1 job
thejob := jobs[0]
is.Equal(thejob.RemainingAttempts, int64(3)) // expected 3 attempts
// Fail and verify
err = jqueue.FailJob(context.Background(), internal.FailJobParams{
ID: thejob.ID,
Errors: internal.ErrorList{"error1"},
})
is.NoErr(err) // error failing job
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected 1 job
is.Equal(jobs[0].RemainingAttempts, int64(2)) // expected 2 attempts
is.Equal(jobs[0].Errors, internal.ErrorList{"error1"})
// Fail again and verify
err = jqueue.FailJob(context.Background(), internal.FailJobParams{
ID: thejob.ID,
Errors: internal.ErrorList{"error1", "error2"},
})
is.NoErr(err) // error failing job
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected 1 job
is.Equal(jobs[0].RemainingAttempts, int64(1)) // expected 1 attempts
is.Equal(jobs[0].Errors, internal.ErrorList{"error1", "error2"})
// Fail again and verify
err = jqueue.FailJob(context.Background(), internal.FailJobParams{
ID: thejob.ID,
Errors: internal.ErrorList{"error1", "error2", "error3"},
})
is.NoErr(err) // error failing job
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 0) // expected no job since no more attempts
}
func Test_VisibilityTimeout(t *testing.T) {
is := is.New(t)
jqueue, err := getJqueue("jobs.db")
is.NoErr(err) // error getting job queue
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
})
is.NoErr(err) // error queuing job
ctx, cancel := context.WithCancel(context.Background())
id := int64(0)
go func() {
jqueue.Consume(ctx, internal.ConsumeParams{
Queue: "",
PoolSize: 1,
VisibilityTimeout: 1,
OnEmptySleep: 100 * time.Millisecond,
Worker: func(ctx context.Context, job *internal.Job) error {
id = job.ID
time.Sleep(3 * time.Second)
return nil
},
})
}()
time.Sleep(2 * time.Second)
cancel()
job, err := jqueue.FindJob(context.Background(), id)
is.NoErr(err) // error fetching job for consumer
is.Equal(job.JobStatus, "failed") // expected fetched
is.Equal(job.Errors, internal.ErrorList{"visibility timeout expired"})
// Sleep to ensure enough time for the job to finish and avoid panics
time.Sleep(2 * time.Second)
}
func Test_DedupeIgnore(t *testing.T) {
is := is.New(t)
jqueue, err := getJqueue("jobs.db")
is.NoErr(err) // error getting job queue
log.Println("A")
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `job:1`,
DedupingKey: internal.IgnoreDuplicate("dedupe_ignore"),
})
is.NoErr(err) // error queuing job
log.Println("B")
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `job:2`,
DedupingKey: internal.IgnoreDuplicate("dedupe_ignore"),
})
is.NoErr(err) // error queuing job
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
Count: 10,
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected only 1 job due to dedupe
is.Equal(jobs[0].Job, `job:1`) // expected job:1
}
func Test_DedupeReplace(t *testing.T) {
is := is.New(t)
jqueue, err := getJqueue("jobs.db")
is.NoErr(err) // error getting job queue
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `job:1`,
DedupingKey: internal.ReplaceDuplicate("dedupe_replace"),
})
is.NoErr(err) // error queuing job
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
Queue: "",
Job: `job:2`,
DedupingKey: internal.ReplaceDuplicate("dedupe_replace"),
})
is.NoErr(err) // error queuing job
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
Queue: "",
Count: 10,
})
is.NoErr(err) // error fetching job for consumer
is.Equal(len(jobs), 1) // expected only 1 job due to dedupe
is.Equal(jobs[0].Job, `job:2`) // expected job:1
}

41
internal/liteq/liteq.go Normal file
View File

@@ -0,0 +1,41 @@
package liteq
import (
"context"
"database/sql"
internal "git.jadud.com/jadudm/grosbeak/internal/liteq/internal"
)
// Creates the db file with the tables and indexes
func Setup(db *sql.DB) error {
_, err := db.Exec(internal.Schema)
return err
}
func New(db *sql.DB) *JobQueue {
queries := internal.New(db)
return &JobQueue{queries}
}
type JobQueue struct {
queries *internal.Queries
}
type QueueJobParams = internal.QueueJobParams
type DedupingKey = internal.DedupingKey
type IgnoreDuplicate = internal.IgnoreDuplicate
type ReplaceDuplicate = internal.ReplaceDuplicate
func (jq *JobQueue) QueueJob(ctx context.Context, params QueueJobParams) error {
return jq.queries.QueueJob(ctx, params)
}
type ConsumeParams = internal.ConsumeParams
func (jq *JobQueue) Consume(ctx context.Context, params ConsumeParams) error {
return jq.queries.Consume(ctx, params)
}
type ErrorList = internal.ErrorList
type Job = internal.Job

17
internal/liteq/sqlc.yaml Normal file
View File

@@ -0,0 +1,17 @@
version: "2"
sql:
- engine: "sqlite"
queries: "db/queries.sql"
schema: "db/schema.sql"
gen:
go:
package: "internal"
out: "internal"
emit_result_struct_pointers: true
overrides:
- column: jobs.consumer_id
go_type: "string"
- column: jobs.errors
go_type:
import: ""
type: "ErrorList"

9
internal/types/base.go Normal file
View File

@@ -0,0 +1,9 @@
package types
import (
"context"
"git.jadud.com/jadudm/grosbeak/internal/liteq"
)
type QueueWorker func(ctx context.Context, job *liteq.Job) error

View File

@@ -0,0 +1,9 @@
package types
type UpdateFrequency string
const (
UPDATE_DAILY UpdateFrequency = "DAILY"
UPDATE_WEEKLY UpdateFrequency = "WEEKLY"
UPDATE_MONTHLY UpdateFrequency = "MONTHLY"
)

22
internal/types/jobs.go Normal file
View File

@@ -0,0 +1,22 @@
package types
import "encoding/json"
type Job interface {
AsJson() string
}
// Used by all of the jobs.
func _as_json(j Job) string {
as_json, _ := json.Marshal(j)
return string(as_json)
}
type EntreJob struct {
URL string `json:"url"`
UpdateFrequency UpdateFrequency `json:"update_frequency"`
}
func (ej *EntreJob) AsJson() string {
return _as_json(ej)
}