Compare commits
12 Commits
7beeacab1a
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f53639af2f
|
||
|
|
06cdc68be7
|
||
|
|
d212a354fe
|
||
|
|
23923b7be4
|
||
|
|
80b55d1a3b
|
||
|
|
437393a1e7
|
||
|
|
6a4f812027
|
||
|
|
4a05b11843
|
||
|
|
460f2734ef
|
||
|
|
f72c6b020f
|
||
|
|
0fbf88101f
|
||
|
|
b80e2421f1
|
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
*.db*
|
||||
*.sqlite*
|
||||
10
Makefile
Normal file
10
Makefile
Normal file
@@ -0,0 +1,10 @@
|
||||
test: generate
|
||||
cd internal/domain64 ; make test
|
||||
|
||||
generate:
|
||||
cd internal/domain64 ; make generate
|
||||
cd internal/liteq ; sqlc generate
|
||||
cd internal/liteq ; make schema-const
|
||||
|
||||
run: generate
|
||||
go run cmd/api/*.go
|
||||
19
README.md
19
README.md
@@ -17,3 +17,22 @@ Two reasons.
|
||||
* SQlite or Postgres
|
||||
|
||||
The use-case is (essentially) single-user.
|
||||
|
||||
## API
|
||||
|
||||
### /fetch/<b64:URL>
|
||||
|
||||
## prereqs
|
||||
|
||||
* go install github.com/sqlc-dev/sqlc/cmd/sqlc@latest
|
||||
|
||||
|
||||
## packages used
|
||||
|
||||
* https://github.com/jpillora/go-tld
|
||||
* https://pkg.go.dev/modernc.org/sqlite
|
||||
* https://github.com/sqlc-dev/sqlc
|
||||
* https://github.com/spf13/afero
|
||||
* https://github.com/tidwall/gjson
|
||||
|
||||
//go:generate stringer -type=UpdateFrequency
|
||||
|
||||
78
cmd/api/init.go
Normal file
78
cmd/api/init.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.jadud.com/jadudm/grosbeak/internal/domain64"
|
||||
"git.jadud.com/jadudm/grosbeak/internal/engine"
|
||||
liteq "git.jadud.com/jadudm/grosbeak/internal/liteq"
|
||||
"git.jadud.com/jadudm/grosbeak/internal/types"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
func setupDB() *sql.DB {
|
||||
|
||||
// FIXME: This path needs to come from the env.
|
||||
db, err := sql.Open("sqlite", "grosbeak.sqlite")
|
||||
db.SetMaxOpenConns(1)
|
||||
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
return db
|
||||
}
|
||||
|
||||
func runQ(queue *liteq.JobQueue, queueName string, worker types.QueueWorker) {
|
||||
for {
|
||||
log.Printf("runQ %s\n", queueName)
|
||||
|
||||
err := queue.Consume(context.Background(), liteq.ConsumeParams{
|
||||
Queue: queueName,
|
||||
PoolSize: 3,
|
||||
VisibilityTimeout: 20,
|
||||
Worker: worker,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Printf("runQ/%s: %s", queueName, err.Error())
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func setupLiteQ(db *sql.DB, d64m *domain64.Domain64Map) *liteq.JobQueue {
|
||||
liteq.Setup(db)
|
||||
queue := liteq.New(db)
|
||||
// The queue processes as long as this context is not cancelled.
|
||||
|
||||
log.Println("setting up worker queues...")
|
||||
queues := []struct {
|
||||
queueName string
|
||||
worker types.QueueWorker
|
||||
}{
|
||||
{"fetch", engine.Fetch(d64m)},
|
||||
}
|
||||
for _, q := range queues {
|
||||
go runQ(queue, q.queueName, q.worker)
|
||||
}
|
||||
return queue
|
||||
}
|
||||
|
||||
func setupDomain64Map(db *sql.DB) *domain64.Domain64Map {
|
||||
d64m, err := domain64.NewDomain64Map()
|
||||
if err != nil {
|
||||
log.Printf("newdomain64map err: %s", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
d64m.Setup(db)
|
||||
return d64m
|
||||
}
|
||||
@@ -1,9 +1,40 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
|
||||
"git.jadud.com/jadudm/grosbeak/internal/engine"
|
||||
base "git.jadud.com/jadudm/grosbeak/internal/types"
|
||||
)
|
||||
|
||||
func main() {
|
||||
fmt.Printf("True: %s", domain64.ReturnsTrue())
|
||||
// Don't let `main()` exit
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
|
||||
db := setupDB()
|
||||
d64m := setupDomain64Map(db)
|
||||
queue := setupLiteQ(db, d64m)
|
||||
|
||||
// Enqueue URLs
|
||||
urls := []struct {
|
||||
url string
|
||||
uf base.UpdateFrequency
|
||||
}{
|
||||
{"https://jadud.com/", base.UPDATE_DAILY},
|
||||
{"https://berea.us/", base.UPDATE_WEEKLY},
|
||||
}
|
||||
|
||||
for _, u := range urls {
|
||||
engine.Entre(queue, &base.EntreJob{URL: u.url, UpdateFrequency: u.uf})
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
// Don't exit.
|
||||
log.Println("Waiting for godot...")
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
30
go.mod
30
go.mod
@@ -1,3 +1,29 @@
|
||||
module git.jadud.com/grosbeak
|
||||
module git.jadud.com/jadudm/grosbeak
|
||||
|
||||
go 1.23.2
|
||||
go 1.24.0
|
||||
|
||||
toolchain go1.24.10
|
||||
|
||||
require (
|
||||
github.com/alitto/pond v1.9.2
|
||||
github.com/jpillora/go-tld v1.2.1
|
||||
github.com/matryer/is v1.4.1
|
||||
github.com/tidwall/gjson v1.18.0
|
||||
modernc.org/sqlite v1.40.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/ncruces/go-strftime v1.0.0 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
github.com/tidwall/match v1.2.0 // indirect
|
||||
github.com/tidwall/pretty v1.2.1 // indirect
|
||||
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 // indirect
|
||||
golang.org/x/net v0.47.0 // indirect
|
||||
golang.org/x/sys v0.38.0 // indirect
|
||||
modernc.org/libc v1.67.1 // indirect
|
||||
modernc.org/mathutil v1.7.1 // indirect
|
||||
modernc.org/memory v1.11.0 // indirect
|
||||
)
|
||||
|
||||
75
go.sum
Normal file
75
go.sum
Normal file
@@ -0,0 +1,75 @@
|
||||
github.com/alitto/pond v1.9.2 h1:9Qb75z/scEZVCoSU+osVmQ0I0JOeLfdTDafrbcJ8CLs=
|
||||
github.com/alitto/pond v1.9.2/go.mod h1:xQn3P/sHTYcU/1BR3i86IGIrilcrGC2LiS+E2+CJWsI=
|
||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
github.com/jpillora/go-tld v1.2.1 h1:kDKOkmXLlskqjcvNs7w5XHLep7c8WM7Xd4HQjxllVMk=
|
||||
github.com/jpillora/go-tld v1.2.1/go.mod h1:plzIl7xr5UWKGy7R+giuv+L/nOjrPjsoWxy/ST9OBUk=
|
||||
github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ=
|
||||
github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
|
||||
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
|
||||
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
|
||||
github.com/tidwall/match v1.2.0 h1:0pt8FlkOwjN2fPt4bIl4BoNxb98gGHN2ObFEDkrfZnM=
|
||||
github.com/tidwall/match v1.2.0/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
|
||||
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
|
||||
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 h1:DHNhtq3sNNzrvduZZIiFyXWOL9IWaDPHqTnLJp+rCBY=
|
||||
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39/go.mod h1:46edojNIoXTNOhySWIWdix628clX9ODXwPsQuG6hsK0=
|
||||
golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk=
|
||||
golang.org/x/mod v0.30.0/go.mod h1:lAsf5O2EvJeSFMiBxXDki7sCgAxEUcZHXoXMKT4GJKc=
|
||||
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
|
||||
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
|
||||
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
|
||||
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
|
||||
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
|
||||
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.39.0 h1:ik4ho21kwuQln40uelmciQPp9SipgNDdrafrYA4TmQQ=
|
||||
golang.org/x/tools v0.39.0/go.mod h1:JnefbkDPyD8UU2kI5fuf8ZX4/yUeh9W877ZeBONxUqQ=
|
||||
modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis=
|
||||
modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
|
||||
modernc.org/ccgo/v4 v4.30.1 h1:4r4U1J6Fhj98NKfSjnPUN7Ze2c6MnAdL0hWw6+LrJpc=
|
||||
modernc.org/ccgo/v4 v4.30.1/go.mod h1:bIOeI1JL54Utlxn+LwrFyjCx2n2RDiYEaJVSrgdrRfM=
|
||||
modernc.org/fileutil v1.3.40 h1:ZGMswMNc9JOCrcrakF1HrvmergNLAmxOPjizirpfqBA=
|
||||
modernc.org/fileutil v1.3.40/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc=
|
||||
modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
|
||||
modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
|
||||
modernc.org/gc/v3 v3.1.1 h1:k8T3gkXWY9sEiytKhcgyiZ2L0DTyCQ/nvX+LoCljoRE=
|
||||
modernc.org/gc/v3 v3.1.1/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY=
|
||||
modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks=
|
||||
modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI=
|
||||
modernc.org/libc v1.67.1 h1:bFaqOaa5/zbWYJo8aW0tXPX21hXsngG2M7mckCnFSVk=
|
||||
modernc.org/libc v1.67.1/go.mod h1:QvvnnJ5P7aitu0ReNpVIEyesuhmDLQ8kaEoyMjIFZJA=
|
||||
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
|
||||
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
|
||||
modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
|
||||
modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
|
||||
modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
|
||||
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
|
||||
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
|
||||
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
|
||||
modernc.org/sqlite v1.40.1 h1:VfuXcxcUWWKRBuP8+BR9L7VnmusMgBNNnBYGEe9w/iY=
|
||||
modernc.org/sqlite v1.40.1/go.mod h1:9fjQZ0mB1LLP0GYrp39oOJXx/I2sxEnZtzCmEQIKvGE=
|
||||
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
|
||||
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
|
||||
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
|
||||
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
|
||||
9
internal/domain64/Makefile
Normal file
9
internal/domain64/Makefile
Normal file
@@ -0,0 +1,9 @@
|
||||
clean:
|
||||
cd sqlc ; rm -f test.sqlite
|
||||
cd sqlc ; rm -f *.go
|
||||
|
||||
generate: clean
|
||||
cd sqlc ; sqlc generate
|
||||
|
||||
test: generate
|
||||
go test *.go
|
||||
114
internal/domain64/README.md
Normal file
114
internal/domain64/README.md
Normal file
@@ -0,0 +1,114 @@
|
||||
# domain64
|
||||
|
||||
`domain64` is a BIGINT (or 64-bit) type that can be used to encode all domains we are likely to encounter. It represents well as JSonnet/JSON, and can be used in partitioning database tables easily.
|
||||
|
||||
## what is it
|
||||
|
||||
To encode all of the TLDs, domains, and subdomains we will encounter, we'll use a `domain64` encoding. It maps the entire URL space into a single, 64-bit number (or, `BIGSERIAL` in Postgres).
|
||||
|
||||
|
||||
```mermaid
|
||||
packet-beta
|
||||
0-7: "FF | TLD"
|
||||
8-31: "FFFFFF | Domain"
|
||||
32-39: "FF | Subdomain"
|
||||
40-63: "FFFFFF | Path"
|
||||
```
|
||||
|
||||
```
|
||||
FF:FFFFFF:FF:FFFFFF
|
||||
```
|
||||
|
||||
or
|
||||
|
||||
```
|
||||
tld:domain:subdomain:path
|
||||
```
|
||||
|
||||
or
|
||||
|
||||
```
|
||||
com:jadud:www:teaching:berea
|
||||
```
|
||||
|
||||
can be indexed/partitioned uniquely.
|
||||
|
||||
This lets us track
|
||||
|
||||
* 255 (#FF) TLDs
|
||||
* 16,777,216 (#FFFFFF) domains under each TLD
|
||||
* 255 (#FF) subdomains under each domain
|
||||
* 16,777,216 (#FFFFFF) paths on a given domain
|
||||
|
||||
## what that means
|
||||
|
||||
There are only around 10 TLDs that make up the majority of all sites on the internet. The search engine maxes out at tracking 256 unique TLDs (#00-#FF).
|
||||
|
||||
Each TLD can hold up to 16M unique sites. There are 302M `.com` domains, meaning , 36M `.cn`, and 20M `.org`. Again, this is for a "personal" search engine, and it is not intended to scale to handling all of the internet. Handling ~ 5% of `.com` (or 75% of `.org`) is *just fine*.
|
||||
|
||||
Under a domain, it is possible to uniquely partition off 255 subdomains (where `00` is "no subdomain").
|
||||
|
||||
Paths can be indexed uniquely, up to 16M per subdomain.
|
||||
|
||||
## example
|
||||
|
||||
```
|
||||
01:000001:00:000000 com.jadud
|
||||
01:000001:01:000000 gov.jadud.research
|
||||
01:000001:02:000000 gov.jadud.teaching
|
||||
01:000001:02:000001 gov.jadud.teaching/olin
|
||||
01:000001:02:000002 gov.jadud.teaching/berea
|
||||
```
|
||||
|
||||
|
||||
| tld | domain | sub | path | hex | dec |
|
||||
| --- | --- | --- | --- | --- | --- |
|
||||
| com | jadud | _ | _ | #x0100000100000000 | 72057598332895232 |
|
||||
| com | jadud | research | _ | #x0100000101000000 | 72057598332895488 |
|
||||
| com | jadud | teaching | _ | #x0100000102000000 | 72057598366449664 |
|
||||
| com | jadud | teaching | olin | #x0100000102000001 | 72057598366449665 |
|
||||
| com | jadud | teaching | berea | #x0100000102000002 | 72057598366449666 |
|
||||
|
||||
## for partitioning
|
||||
|
||||
On a table that contains a `domain64` value, we can partition based on numeric ranges very efficiently.
|
||||
|
||||
|
||||
```sql
|
||||
CREATE TABLE comjadud PARTITION OF com
|
||||
FOR VALUES FROM (0x0100000100000000) TO (0x01000001FFFFFFFF);
|
||||
```
|
||||
|
||||
Or
|
||||
|
||||
```sql
|
||||
CREATE TABLE comjadudresearch PARTITION OF com
|
||||
FOR VALUES FROM (0x0100000101000000) TO (0xx0100000101FFFFFF);
|
||||
```
|
||||
|
||||
## As Jsonnet/JSON
|
||||
|
||||
Jsonnet will naturally sort by the hex key values.
|
||||
|
||||
```
|
||||
{
|
||||
"01": {
|
||||
"name": "com",
|
||||
"children": {
|
||||
"00000001": {
|
||||
"name": "jadud",
|
||||
"children": {
|
||||
"01": "research",
|
||||
"02": "teaching",
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"02": {
|
||||
"name": "org",
|
||||
"children": {
|
||||
...
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
183
internal/domain64/database.go
Normal file
183
internal/domain64/database.go
Normal file
@@ -0,0 +1,183 @@
|
||||
package domain64
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
_ "embed"
|
||||
"fmt"
|
||||
"log"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
sqlc "git.jadud.com/jadudm/grosbeak/internal/domain64/sqlc"
|
||||
"github.com/jpillora/go-tld"
|
||||
)
|
||||
|
||||
//go:embed sqlc/schema.sql
|
||||
var ddl string
|
||||
|
||||
type Domain64Map struct {
|
||||
mu sync.Mutex
|
||||
m map[string]int
|
||||
DB *sql.DB
|
||||
Queries *sqlc.Queries
|
||||
}
|
||||
|
||||
func NewDomain64Map() (*Domain64Map, error) {
|
||||
d64m := &Domain64Map{}
|
||||
d64m.DB = nil
|
||||
d64m.Queries = nil
|
||||
return d64m, nil
|
||||
}
|
||||
|
||||
func (d64m *Domain64Map) Setup(db *sql.DB) {
|
||||
log.Printf("creating domain64 tables\n")
|
||||
if _, err := db.ExecContext(context.Background(), ddl); err != nil {
|
||||
log.Printf("setup err: %s", err.Error())
|
||||
panic(err)
|
||||
}
|
||||
d64m.DB = db
|
||||
d64m.Queries = sqlc.New(db)
|
||||
}
|
||||
|
||||
func (d64m *Domain64Map) URLToRFQDN(url *tld.URL) string {
|
||||
s := ""
|
||||
if url.TLD != "" {
|
||||
s += url.TLD
|
||||
}
|
||||
if url.Domain != "" {
|
||||
s += "." + url.Domain
|
||||
}
|
||||
if url.Subdomain != "" {
|
||||
s += "." + url.Subdomain
|
||||
}
|
||||
s += url.Path
|
||||
return s
|
||||
}
|
||||
|
||||
func _get_or_insert_tld(queries *sqlc.Queries, d64 *Domain64, url *tld.URL) error {
|
||||
ctx := context.Background()
|
||||
tld_id, err := queries.GetTLDId(ctx, url.TLD)
|
||||
if err != nil {
|
||||
cnt, err := queries.CountTLDs(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d64.TLD = cnt + 1
|
||||
err = queries.InsertTLD(ctx, sqlc.InsertTLDParams{TldID: d64.TLD, Tld: url.TLD})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
d64.TLD = tld_id
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func _get_or_insert_domain(queries *sqlc.Queries, d64 *Domain64, url *tld.URL) error {
|
||||
ctx := context.Background()
|
||||
domain_id, err := queries.GetDomainId(ctx, sqlc.GetDomainIdParams{TldID: d64.TLD, Domain: url.Domain})
|
||||
if err != nil {
|
||||
cnt, err := queries.CountDomains(ctx, d64.TLD)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d64.Domain = cnt + 1
|
||||
err = queries.InsertDomain(ctx, sqlc.InsertDomainParams{TldID: d64.TLD, DomainID: d64.Domain, Domain: url.Domain})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
d64.Domain = domain_id
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func _get_or_insert_subdomain(queries *sqlc.Queries, d64 *Domain64, url *tld.URL) error {
|
||||
ctx := context.Background()
|
||||
subdomain_id, err := queries.GetSubdomainId(ctx, sqlc.GetSubdomainIdParams{
|
||||
TldID: int64(d64.TLD), DomainID: int64(d64.Domain), Subdomain: url.Subdomain,
|
||||
})
|
||||
if err != nil {
|
||||
if url.Subdomain == "" {
|
||||
d64.Subdomain = 0
|
||||
} else {
|
||||
cnt, err := queries.CountSubdomains(ctx, sqlc.CountSubdomainsParams{TldID: d64.TLD, DomainID: d64.Domain})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d64.Subdomain = cnt + 1
|
||||
err = queries.InsertSubdomain(ctx, sqlc.InsertSubdomainParams{TldID: d64.TLD, DomainID: d64.Domain, SubdomainID: d64.Subdomain, Subdomain: url.Subdomain})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
d64.Subdomain = subdomain_id
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func _get_or_insert_path(queries *sqlc.Queries, d64 *Domain64, url *tld.URL) error {
|
||||
ctx := context.Background()
|
||||
log.Println(url, url.Path)
|
||||
path_id, err := queries.GetPathId(ctx, sqlc.GetPathIdParams{
|
||||
TldID: d64.TLD, DomainID: d64.Domain, SubdomainID: d64.Subdomain, Path: url.Path,
|
||||
})
|
||||
if err != nil {
|
||||
if url.Path == "/" {
|
||||
d64.Path = 0
|
||||
} else {
|
||||
cnt, err := queries.CountPaths(ctx, sqlc.CountPathsParams{TldID: d64.TLD, DomainID: d64.Domain, SubdomainID: d64.Subdomain})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d64.Path = cnt + 1
|
||||
err = queries.InsertPath(ctx, sqlc.InsertPathParams{TldID: d64.TLD, DomainID: d64.Domain, SubdomainID: d64.Subdomain, PathID: d64.Path, Path: url.Path})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
d64.Path = path_id
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// FIXME: This feels like a very convoluted way to maintain the domain names.
|
||||
// However, I also need to maintain uniqueness. Can I do this in one table?
|
||||
func (d64m *Domain64Map) URLToDomain64(url *tld.URL) (*Domain64, error) {
|
||||
allowed_schemes := []string{"https"}
|
||||
if !slices.Contains(allowed_schemes, url.Scheme) {
|
||||
return nil, fmt.Errorf("URL scheme must be in %q; given %s", allowed_schemes, url.Scheme)
|
||||
}
|
||||
|
||||
d64 := &Domain64{}
|
||||
|
||||
// These manipulate both the DB and the Domain64 struct
|
||||
err := _get_or_insert_tld(d64m.Queries, d64, url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = _get_or_insert_domain(d64m.Queries, d64, url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = _get_or_insert_subdomain(d64m.Queries, d64, url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = _get_or_insert_path(d64m.Queries, d64, url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return d64, nil
|
||||
}
|
||||
|
||||
func NullInt64(i int64) sql.NullInt64 {
|
||||
return sql.NullInt64{Int64: i, Valid: true}
|
||||
}
|
||||
|
||||
func NullString(s string) sql.NullString {
|
||||
return sql.NullString{String: s, Valid: true}
|
||||
}
|
||||
140
internal/domain64/database_test.go
Normal file
140
internal/domain64/database_test.go
Normal file
@@ -0,0 +1,140 @@
|
||||
package domain64
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"testing"
|
||||
|
||||
"github.com/jpillora/go-tld"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
func setup() *sql.DB {
|
||||
ctx := context.Background()
|
||||
|
||||
// db, err := sql.Open("sqlite", "sqlc/test.sqlite")
|
||||
db, err := sql.Open("sqlite", ":memory:")
|
||||
if err != nil {
|
||||
// TODO
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// create tables
|
||||
if _, err := db.ExecContext(ctx, ddl); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return db
|
||||
}
|
||||
|
||||
func TestNewDomain64Map(t *testing.T) {
|
||||
db := setup()
|
||||
M, err := NewDomain64Map()
|
||||
M.Setup(db)
|
||||
if err != nil {
|
||||
// TODO
|
||||
t.Error(err)
|
||||
}
|
||||
if M.DB == nil {
|
||||
t.Error("DB should not be nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestURLToRFQDN(t *testing.T) {
|
||||
M, _ := NewDomain64Map()
|
||||
// M.Setup(db)
|
||||
simple, _ := tld.Parse("https://jadud.com/")
|
||||
rfqdn := M.URLToRFQDN(simple)
|
||||
if rfqdn != "com.jadud/" {
|
||||
t.Errorf("Expected `com.jadud/`, got %s", rfqdn)
|
||||
}
|
||||
}
|
||||
|
||||
func TestURLToDomain64(t *testing.T) {
|
||||
db := setup()
|
||||
M, _ := NewDomain64Map()
|
||||
M.Setup(db)
|
||||
simple, _ := tld.Parse("https://jadud.com/")
|
||||
d64, _ := M.URLToDomain64(simple)
|
||||
if d64.TLD != 1 {
|
||||
t.Errorf("expected TLD == 1, got %d", d64.TLD)
|
||||
}
|
||||
|
||||
if d64.Domain != 1 {
|
||||
t.Errorf("expected domain == 1, got %d", d64.Domain)
|
||||
}
|
||||
}
|
||||
|
||||
func TestURLToDomain64_02(t *testing.T) {
|
||||
db := setup()
|
||||
M, _ := NewDomain64Map()
|
||||
M.Setup(db)
|
||||
simple1, _ := tld.Parse("https://jadud.com/")
|
||||
simple2, _ := tld.Parse("https://another.com/")
|
||||
d64_1, _ := M.URLToDomain64(simple1)
|
||||
d64_2, _ := M.URLToDomain64(simple2)
|
||||
|
||||
if d64_1.TLD != 1 {
|
||||
t.Errorf("expected TLD == 1, got %d", d64_1.TLD)
|
||||
}
|
||||
|
||||
if d64_1.Domain != 1 {
|
||||
t.Errorf("expected domain == 1, got %d", d64_1.Domain)
|
||||
}
|
||||
|
||||
if d64_2.TLD != 1 {
|
||||
t.Errorf("expected TLD == 1, got %d", d64_2.TLD)
|
||||
}
|
||||
|
||||
if d64_2.Domain != 2 {
|
||||
t.Errorf("expected domain == 2, got %d", d64_2.Domain)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestURLToDomain64_03(t *testing.T) {
|
||||
db := setup()
|
||||
M, _ := NewDomain64Map()
|
||||
M.Setup(db)
|
||||
var tests = []struct {
|
||||
url string
|
||||
tld int64
|
||||
domain int64
|
||||
subdomain int64
|
||||
path int64
|
||||
d64 int64
|
||||
}{
|
||||
{"https://jadud.com/", 1, 1, 0, 0, 0x0100000100000000},
|
||||
{"https://research.jadud.com/", 1, 1, 1, 0, 0x0100000101000000},
|
||||
{"https://teaching.jadud.com/", 1, 1, 2, 0, 0x0100000102000000},
|
||||
{"https://teaching.jadud.com/classes", 1, 1, 2, 1, 0x0100000102000001},
|
||||
{"https://teaching.jadud.com/other-classes", 1, 1, 2, 2, 0x0100000102000002},
|
||||
{"https://research.jadud.com/papers", 1, 1, 1, 1, 0x0100000101000001},
|
||||
{"https://research.jadud.com/experiments", 1, 1, 1, 2, 0x0100000101000002},
|
||||
{"https://teaching.another.com/classes", 1, 2, 1, 1, 0x0100000201000001},
|
||||
{"https://teaching.jadud.org/classes", 2, 1, 1, 1, 0x0200000101000001},
|
||||
// The ordering here matters; if we see a "bare" domain after first seeing the
|
||||
// subdomain, I expect the numbering to come out right. That is, subdomain <- 0 and
|
||||
// path <- 0. That is because of "" and "/" checking on subdomain and path, respectively.
|
||||
{"https://jadud.org/", 2, 1, 0, 0, 0x0200000100000000},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
parsed, _ := tld.Parse(tt.url)
|
||||
d64, _ := M.URLToDomain64(parsed)
|
||||
if d64.TLD != tt.tld {
|
||||
t.Errorf("%s TLD expected %d given %d", tt.url, tt.tld, d64.TLD)
|
||||
}
|
||||
if d64.Domain != tt.domain {
|
||||
t.Errorf("%s Domain expected %d given %d", tt.url, tt.domain, d64.Domain)
|
||||
}
|
||||
if d64.Subdomain != tt.subdomain {
|
||||
t.Errorf("%s Subdomain expected %d given %d", tt.url, tt.subdomain, d64.Subdomain)
|
||||
}
|
||||
if d64.Path != tt.path {
|
||||
t.Errorf("%s Path expected %d given %d", tt.url, tt.path, d64.Path)
|
||||
}
|
||||
if d64.ToInt64() != tt.d64 {
|
||||
t.Errorf("%s int64 value expected %d given %d", tt.url, tt.d64, d64.ToInt64())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,50 @@
|
||||
package domain64
|
||||
|
||||
func ReturnsTrue() bool {
|
||||
return true
|
||||
type Domain64 struct {
|
||||
// The TLD is FF
|
||||
TLD int64
|
||||
// The Domain is FFFFFF, so the uint16 is the closest we'll get
|
||||
Domain int64
|
||||
// Subdomains are FF
|
||||
Subdomain int64
|
||||
// Paths are, again, FFFFFF
|
||||
Path int64
|
||||
}
|
||||
|
||||
/*
|
||||
```mermaid
|
||||
packet-beta
|
||||
0-7: "FF | TLD"
|
||||
8-31: "FFFFFF | Domain"
|
||||
32-39: "FF | Subdomain"
|
||||
40-63: "FFFFFF | Path"
|
||||
```
|
||||
*/
|
||||
const SHIFT_TLD = (64 - 8)
|
||||
const SHIFT_DOMAIN = (64 - (8 + 24))
|
||||
const SHIFT_SUBDOMAIN = (64 - (8 + 24 + 8))
|
||||
const SHIFT_PATH = (64 - (8 + 24 + 8 + 24))
|
||||
|
||||
const MASK_TLD = 0xFF0000000000000
|
||||
const MASK_DOMAIN = 0x00FFFFFF00000000
|
||||
const MASK_SUBDOMAIN = 0x00000000FF000000
|
||||
const MASK_PATH = 0x0000000000FFFFFF
|
||||
|
||||
func (d64 Domain64) ToInt64() int64 {
|
||||
var result int64 = 0
|
||||
result = result | (int64(d64.TLD) << SHIFT_TLD)
|
||||
result = result | (int64(d64.Domain) << SHIFT_DOMAIN)
|
||||
result = result | (int64(d64.Subdomain) << SHIFT_SUBDOMAIN)
|
||||
result = result | (int64(d64.Path) << SHIFT_PATH)
|
||||
return result
|
||||
}
|
||||
|
||||
func IntToDomain64(i int64) Domain64 {
|
||||
d64 := Domain64{}
|
||||
|
||||
d64.TLD = (i & MASK_TLD) >> SHIFT_TLD
|
||||
d64.Domain = (i & MASK_DOMAIN) >> SHIFT_DOMAIN
|
||||
d64.Subdomain = (i & MASK_SUBDOMAIN) >> SHIFT_SUBDOMAIN
|
||||
d64.Path = i & MASK_PATH
|
||||
return d64
|
||||
}
|
||||
|
||||
45
internal/domain64/domain64_test.go
Normal file
45
internal/domain64/domain64_test.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package domain64
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
// https://gobyexample.com/testing-and-benchmarking
|
||||
|
||||
// For testing the conversion between a Domain64 struct
|
||||
// and the int64 representation of that domain.
|
||||
var tests = []struct {
|
||||
d64 Domain64
|
||||
asInt int64
|
||||
}{
|
||||
{
|
||||
Domain64{
|
||||
TLD: 1,
|
||||
Domain: 1,
|
||||
Subdomain: 1,
|
||||
Path: 1,
|
||||
}, 72057598349672449},
|
||||
{
|
||||
Domain64{
|
||||
TLD: 2,
|
||||
Domain: 1,
|
||||
Subdomain: 1,
|
||||
Path: 0,
|
||||
}, 144115192387600384},
|
||||
}
|
||||
|
||||
func TestDomain64ToInt(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
if tt.d64.ToInt64() != tt.asInt {
|
||||
t.Errorf("%q != %d Domain64 did not convert", tt.d64, tt.asInt)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIntToDomain64(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
if IntToDomain64(tt.asInt) != tt.d64 {
|
||||
t.Errorf("%d != %q int64 did not convert", tt.asInt, tt.d64)
|
||||
}
|
||||
}
|
||||
}
|
||||
2
internal/domain64/sqlc/.gitignore
vendored
Normal file
2
internal/domain64/sqlc/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
test.sqlite
|
||||
*.go
|
||||
107
internal/domain64/sqlc/query.sql
Normal file
107
internal/domain64/sqlc/query.sql
Normal file
@@ -0,0 +1,107 @@
|
||||
-- name: InsertIntoDomain64 :one
|
||||
INSERT INTO domain64
|
||||
(rfqdn, d64)
|
||||
VALUES
|
||||
(?, ?)
|
||||
RETURNING d64
|
||||
;
|
||||
|
||||
-- If you forget a semicolon, the previous
|
||||
-- query gets flagged as a duplicate.
|
||||
-- https://github.com/sqlc-dev/sqlc/issues/3851
|
||||
|
||||
-- name: GetTLDId :one
|
||||
SELECT tld_id FROM tlds
|
||||
WHERE tld = ?
|
||||
;
|
||||
|
||||
-- name: CountTLDs :one
|
||||
SELECT COUNT(*) FROM tlds;
|
||||
|
||||
-- name: InsertTLD :exec
|
||||
INSERT INTO tlds
|
||||
(tld_id, tld)
|
||||
VALUES
|
||||
(?, ?)
|
||||
;
|
||||
|
||||
-- name: GetDomainId :one
|
||||
SELECT domain_id FROM domains
|
||||
WHERE
|
||||
tld_id = ?
|
||||
AND
|
||||
domain = ?
|
||||
;
|
||||
|
||||
-- name: CountDomains :one
|
||||
SELECT COUNT(DISTINCT domain_id)
|
||||
FROM domains
|
||||
WHERE
|
||||
tld_id = ?
|
||||
;
|
||||
|
||||
-- name: InsertDomain :exec
|
||||
INSERT INTO domains
|
||||
(tld_id, domain_id, domain)
|
||||
VALUES
|
||||
(?, ?, ?)
|
||||
;
|
||||
|
||||
-- name: GetSubdomainId :one
|
||||
SELECT subdomain_id FROM subdomains
|
||||
WHERE
|
||||
tld_id = ?
|
||||
AND
|
||||
domain_id = ?
|
||||
AND
|
||||
subdomain = ?
|
||||
;
|
||||
|
||||
-- name: CountSubdomains :one
|
||||
SELECT COUNT(DISTINCT subdomain_id)
|
||||
FROM subdomains
|
||||
WHERE
|
||||
tld_id = ?
|
||||
AND
|
||||
domain_id = ?
|
||||
;
|
||||
|
||||
-- name: InsertSubdomain :exec
|
||||
INSERT INTO subdomains
|
||||
(tld_id, domain_id, subdomain_id, subdomain)
|
||||
VALUES
|
||||
(?, ?, ?, ?)
|
||||
;
|
||||
|
||||
-- name: GetPathId :one
|
||||
SELECT path_id FROM paths
|
||||
WHERE
|
||||
tld_id = ?
|
||||
AND
|
||||
domain_id = ?
|
||||
AND
|
||||
subdomain_id = ?
|
||||
AND
|
||||
path = ?
|
||||
;
|
||||
|
||||
-- name: CountPaths :one
|
||||
SELECT COUNT(DISTINCT path_id)
|
||||
FROM paths
|
||||
WHERE
|
||||
tld_id = ?
|
||||
AND
|
||||
domain_id = ?
|
||||
AND
|
||||
subdomain_id = ?
|
||||
;
|
||||
|
||||
-- name: InsertPath :exec
|
||||
INSERT INTO paths
|
||||
(tld_id, domain_id, subdomain_id, path_id, path)
|
||||
VALUES
|
||||
(?, ?, ?, ?, ?)
|
||||
;
|
||||
|
||||
-- -- name: CountTLD :one
|
||||
-- SELECT COUNT(DISTINCT tld_id) FROM domain64 WHERE tld_id = ?;
|
||||
39
internal/domain64/sqlc/schema.sql
Normal file
39
internal/domain64/sqlc/schema.sql
Normal file
@@ -0,0 +1,39 @@
|
||||
CREATE TABLE IF NOT EXISTS tlds (
|
||||
tld_id INTEGER PRIMARY KEY NOT NULL,
|
||||
tld TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS domains (
|
||||
id INTEGER PRIMARY KEY,
|
||||
tld_id INTEGER NOT NULL REFERENCES tlds(tld_id),
|
||||
domain_id INTEGER NOT NULL,
|
||||
domain TEXT NOT NULL
|
||||
);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS ndx_domains_uniq ON domains(tld_id, domain_id);
|
||||
|
||||
|
||||
CREATE TABLE IF NOT EXISTS subdomains (
|
||||
id INTEGER PRIMARY KEY,
|
||||
tld_id INTEGER NOT NULL REFERENCES tlds(tld_id),
|
||||
domain_id INTEGER NOT NULL REFERENCES domains(domain_id),
|
||||
subdomain_id INTEGER NOT NULL,
|
||||
subdomain TEXT NOT NULL
|
||||
);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS ndx_subdomains_uniq ON subdomains(tld_id, domain_id, subdomain_id);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS paths (
|
||||
id INTEGER PRIMARY KEY,
|
||||
tld_id INTEGER NOT NULL REFERENCES tlds(tld_id),
|
||||
domain_id INTEGER NOT NULL REFERENCES domains(domain_id),
|
||||
subdomain_id INTEGER NOT NULL REFERENCES subdomains(subdomain_id),
|
||||
path_id INTEGER NOT NULL,
|
||||
path TEXT NOT NULL
|
||||
);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS ndx_paths_uniq ON paths(tld_id, domain_id, subdomain_id, path_id);
|
||||
|
||||
|
||||
CREATE TABLE IF NOT EXISTS domain64 (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
rfqdn TEXT,
|
||||
d64 BIGINT UNIQUE NOT NULL
|
||||
);
|
||||
9
internal/domain64/sqlc/sqlc.yaml
Normal file
9
internal/domain64/sqlc/sqlc.yaml
Normal file
@@ -0,0 +1,9 @@
|
||||
version: "2"
|
||||
sql:
|
||||
- engine: "sqlite"
|
||||
queries: "query.sql"
|
||||
schema: "schema.sql"
|
||||
gen:
|
||||
go:
|
||||
package: "test_db"
|
||||
out: "."
|
||||
39
internal/engine/entre.go
Normal file
39
internal/engine/entre.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"git.jadud.com/jadudm/grosbeak/internal/liteq"
|
||||
base "git.jadud.com/jadudm/grosbeak/internal/types"
|
||||
)
|
||||
|
||||
func Entre(queue *liteq.JobQueue, ej *base.EntreJob) error {
|
||||
n := time.Now()
|
||||
|
||||
var ignore_tag string
|
||||
|
||||
switch ej.UpdateFrequency {
|
||||
case base.UPDATE_DAILY:
|
||||
ignore_tag = fmt.Sprintf("%s|y%d-yd%d", ej.URL, n.Year(), n.YearDay())
|
||||
case base.UPDATE_WEEKLY:
|
||||
ignore_tag = fmt.Sprintf("%s|y%d-w%d", ej.URL, n.Year(), n.YearDay()/7)
|
||||
case base.UPDATE_MONTHLY:
|
||||
ignore_tag = fmt.Sprintf("%s|y%d-m%d", ej.URL, n.Year(), n.Month())
|
||||
default:
|
||||
ignore_tag = fmt.Sprintf("%s|y%d-yd%d", ej.URL, n.Year(), n.YearDay())
|
||||
}
|
||||
|
||||
err := queue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "fetch",
|
||||
// This only works for things in the `queued` state
|
||||
DedupingKey: liteq.IgnoreDuplicate(ignore_tag),
|
||||
Job: ej.AsJson(),
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("entre err %s: %s\n", ej.URL, err.Error())
|
||||
}
|
||||
return err
|
||||
}
|
||||
34
internal/engine/fetch.go
Normal file
34
internal/engine/fetch.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
|
||||
"git.jadud.com/jadudm/grosbeak/internal/domain64"
|
||||
"git.jadud.com/jadudm/grosbeak/internal/liteq"
|
||||
"git.jadud.com/jadudm/grosbeak/internal/types"
|
||||
"github.com/jpillora/go-tld"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
func Fetch(d64m *domain64.Domain64Map) types.QueueWorker {
|
||||
_f := func(ctx context.Context, job *liteq.Job) error {
|
||||
url := gjson.Get(job.Job, "url").String()
|
||||
|
||||
turl, err := tld.Parse(url)
|
||||
if err != nil {
|
||||
// If we can't parse it, shut the job down as completed.
|
||||
return nil
|
||||
}
|
||||
|
||||
d64, err := d64m.URLToDomain64(turl)
|
||||
if err != nil {
|
||||
log.Printf("urltodomain64 err %s: %s\n", url, err.Error())
|
||||
}
|
||||
|
||||
log.Printf("fetching %s 0x%016x\n", url, d64.ToInt64())
|
||||
return nil
|
||||
}
|
||||
|
||||
return _f
|
||||
}
|
||||
3
internal/liteq/.gitignore
vendored
Normal file
3
internal/liteq/.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
.DS_Store
|
||||
/bench/bench.db*
|
||||
/TODO
|
||||
21
internal/liteq/LICENSE
Normal file
21
internal/liteq/LICENSE
Normal file
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2024 Sebastien Armand
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
29
internal/liteq/Makefile
Normal file
29
internal/liteq/Makefile
Normal file
@@ -0,0 +1,29 @@
|
||||
default:
|
||||
go run *.go || (go get ./... && go run *.go)
|
||||
|
||||
watch:
|
||||
(watchexec -e sql -- make schema-const) & \
|
||||
(watchexec -e sql -- sqlc generate) & \
|
||||
(watchexec -w sqlc.yaml -- sqlc generate) & \
|
||||
(watchexec -e go -c -r -- go test ./... -count 1) & \
|
||||
wait
|
||||
# SQL Watcher done
|
||||
|
||||
# Puts the schema in a constant in go so it can be used to create the table directly
|
||||
schema-const:
|
||||
echo "// Code generated by Makefile. DO NOT EDIT." > internal/schema.go
|
||||
echo "package internal\n" >> internal/schema.go
|
||||
echo "const Schema = \`" >> internal/schema.go
|
||||
cat db/schema.sql >> internal/schema.go
|
||||
echo "\`" >> internal/schema.go
|
||||
|
||||
|
||||
.PHONY: bench
|
||||
bench:
|
||||
make cleanup
|
||||
make schema-const
|
||||
sqlc generate
|
||||
cd bench && go run main.go
|
||||
|
||||
cleanup:
|
||||
rm -f bench/bench.db*
|
||||
172
internal/liteq/README.md
Normal file
172
internal/liteq/README.md
Normal file
@@ -0,0 +1,172 @@
|
||||
# liteq
|
||||
|
||||
Library to have a persistent job queue in Go backed by a SQLite DB.
|
||||
|
||||
## Motivation
|
||||
|
||||
I needed a way to have a persistent queue for a small app so it would survive restarts and allow me to schedule jobs in the future.
|
||||
Since I already had SQLite as a dependency, I didn't want to add another dependency to make the app work. Especially not an external one like RabbitMQ, Redis, SQS or others.
|
||||
|
||||
liteq allows to run tens of thousands of jobs per second if needed. It can also be made to use more than a single DB file to keep growing the concurrency should you need it.
|
||||
|
||||
## Usage
|
||||
|
||||
### Install
|
||||
|
||||
```sh
|
||||
go get github.com/jadudm/liteq
|
||||
```
|
||||
|
||||
### Setup and DB creation
|
||||
|
||||
```go
|
||||
import (
|
||||
"github.com/jadudm/liteq"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Open the sqlite3 DB in the file "liteq.db"
|
||||
liteqDb, err := sql.Open("sqlite3", "liteq.db")
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
// Create the DB table if it doesn't exist
|
||||
liteq.Setup(liteqDb)
|
||||
// create a job queue
|
||||
jqueue := liteq.New(liteqDb)
|
||||
}
|
||||
```
|
||||
|
||||
### Queuing a job
|
||||
|
||||
```go
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "notify.email",
|
||||
Job: `{"email_address": "bob@example.com", "content": "..."}`,
|
||||
})
|
||||
```
|
||||
|
||||
This will send the job with the given payload on a queue called `notify.email`.
|
||||
|
||||
### Consuming
|
||||
|
||||
To consume jobs from the queue, you call the consume method:
|
||||
|
||||
```go
|
||||
jqueue.Consume(context.Background(), liteq.ConsumeParams{
|
||||
Queue: "notify.email",
|
||||
PoolSize: 3,
|
||||
VisibilityTimeout: 20,
|
||||
Worker: func (ctx context.Context, job *liteq.Job) error {
|
||||
return sendEmail(job)
|
||||
},
|
||||
})
|
||||
```
|
||||
|
||||
- `context.Background()` You can pass in a cancellable context and the queue will stop processing when the context is canceled
|
||||
- `Queue` this is the name of the queue we want this consumer to consume from
|
||||
- `PoolSize` this is the number of concurrent consumer we want for this queue
|
||||
- `VisibilityTimeout` is the time that a job will remain reserved to a consumer. After that time has elapsed, if the job hasn't been marked either failed
|
||||
or successful, it will be put back in the queue for others to consume. The error will be added to the job's error list and the number of remaining attempts will be
|
||||
decreased.
|
||||
- `Worker` A callback to process the job. When an error is returned, the job is either returned to the queue for processing with a decreased number of remaining attempts or marked as failed if no more attempts remain.
|
||||
|
||||
### Multiple attempts
|
||||
|
||||
When queueing a job, it is possible to decide how many times this job can be attempted in case of failures:
|
||||
|
||||
```go
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "notify.email",
|
||||
RemainingAttempts: 3,
|
||||
Job: `{"email_address": "bob@example.com", "content": "..."}`,
|
||||
})
|
||||
```
|
||||
|
||||
### Delayed jobs
|
||||
|
||||
When queueing a job, you can decide to execute it at a later point in time:
|
||||
|
||||
```go
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "notify.email",
|
||||
ExecuteAfter: time.Now().Add(6*time.Minute).Unix(),
|
||||
Job: `{"email_address": "bob@example.com", "content": "..."}`,
|
||||
})
|
||||
```
|
||||
|
||||
In this case, the job won't run until the given time.
|
||||
|
||||
### Deduplication
|
||||
|
||||
Sometimes it can be useful to prevent the queueing of multiple messages that would essentially be performing the same task to avoid un-necessary work.
|
||||
|
||||
This is possible in `liteq` via the `DedupingKey` job parameter. There are 2 types of deduping keys:
|
||||
|
||||
- `IgnoreDuplicate` will ignore the new job that was sent and keep the one that was already on the queue
|
||||
- `ReplaceDuplicate` will instead remove the job currently on the queue and use the new one instead
|
||||
|
||||
Assuming we have the following consumer:
|
||||
|
||||
```go
|
||||
jqueue.Consume(context.Background(), liteq.ConsumeParams{
|
||||
Queue: "print",
|
||||
VisibilityTimeout: 20,
|
||||
Worker: func (ctx context.Context, job *liteq.Job) error {
|
||||
fmt.Println(job.Payload)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
```
|
||||
|
||||
And we send the following jobs:
|
||||
|
||||
```go
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "print",
|
||||
Job: `first`,
|
||||
DedupingKey: liteq.IgnoreDuplicate("print.job")
|
||||
})
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "print",
|
||||
Job: `second`,
|
||||
DedupingKey: liteq.IgnoreDuplicate("print.job")
|
||||
})
|
||||
```
|
||||
|
||||
Then the result would be a single output line:
|
||||
|
||||
```
|
||||
first
|
||||
```
|
||||
|
||||
If instead we use `liteq.ReplaceDuplicate`
|
||||
|
||||
```go
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "print",
|
||||
Job: `third`,
|
||||
DedupingKey: liteq.ReplaceDuplicate("print.job")
|
||||
})
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "print",
|
||||
Job: `fourth`,
|
||||
DedupingKey: liteq.ReplaceDuplicate("print.job")
|
||||
})
|
||||
```
|
||||
|
||||
We will output `fourth`
|
||||
|
||||
If we think for example of the scenario of sending email or text notifications about an order to a customer, we could construct a deduping key like:
|
||||
|
||||
```go
|
||||
jqueue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: "email.notify",
|
||||
Job: `{"order_id": 123, "customer_id": "abc"}`,
|
||||
DedupingKey: liteq.ReplaceDuplicate("email.notify:%s:%s", customer.ID, order.ID)
|
||||
})
|
||||
```
|
||||
|
||||
That way if the `Order Prepared` status update email hasn't been sent yet by the time we're ready to send the `Order Shipped` email, we can skip the `Order Prepared` one and only send the most recent update to the customer.
|
||||
71
internal/liteq/bench/main.go
Normal file
71
internal/liteq/bench/main.go
Normal file
@@ -0,0 +1,71 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
liteq "git.jadud.com/jadudm/grosbeak/internal/liteq"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
func main() {
|
||||
howMany := 100_000
|
||||
|
||||
go func() {
|
||||
db, err := sql.Open("sqlite", "bench.db")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
liteq.Setup(db)
|
||||
|
||||
jq := liteq.New(db)
|
||||
|
||||
for i := 0; i < howMany; i++ {
|
||||
jq.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||
Queue: fmt.Sprintf("default-%d", i%30),
|
||||
Job: "SendEmail",
|
||||
})
|
||||
}
|
||||
}()
|
||||
|
||||
c := make(chan struct{})
|
||||
for i := 0; i < 30; i++ {
|
||||
db, err := sql.Open("sqlite", "bench.db")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
liteq.Setup(db)
|
||||
|
||||
jq := liteq.New(db)
|
||||
|
||||
go func(i int) {
|
||||
err := jq.Consume(context.Background(), liteq.ConsumeParams{
|
||||
Queue: fmt.Sprintf("default-%d", i),
|
||||
PoolSize: 10,
|
||||
Worker: func(ctx context.Context, job *liteq.Job) error {
|
||||
// random sleep
|
||||
n := rand.Intn(50)
|
||||
|
||||
time.Sleep(time.Duration(n) * time.Millisecond)
|
||||
c <- struct{}{}
|
||||
return nil
|
||||
},
|
||||
})
|
||||
fmt.Println(err)
|
||||
}(i)
|
||||
}
|
||||
|
||||
rec := 0
|
||||
for range c {
|
||||
rec++
|
||||
if rec%1000 == 0 {
|
||||
fmt.Println(rec)
|
||||
}
|
||||
if rec == howMany {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
137
internal/liteq/db/queries.sql
Normal file
137
internal/liteq/db/queries.sql
Normal file
@@ -0,0 +1,137 @@
|
||||
-- name: doQueueJobIgnoreDupe :exec
|
||||
INSERT INTO
|
||||
jobs (
|
||||
queue,
|
||||
job,
|
||||
execute_after,
|
||||
job_status,
|
||||
created_at,
|
||||
updated_at,
|
||||
remaining_attempts,
|
||||
deduping_key
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
?,
|
||||
?,
|
||||
?,
|
||||
'queued',
|
||||
unixepoch(),
|
||||
unixepoch(),
|
||||
?,
|
||||
?
|
||||
) ON CONFLICT (deduping_key)
|
||||
WHERE
|
||||
deduping_key != '' DO NOTHING;
|
||||
|
||||
-- name: doQueueJobReplaceDupe :exec
|
||||
INSERT INTO
|
||||
jobs (
|
||||
queue,
|
||||
job,
|
||||
execute_after,
|
||||
job_status,
|
||||
created_at,
|
||||
updated_at,
|
||||
remaining_attempts,
|
||||
deduping_key
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
?,
|
||||
?,
|
||||
?,
|
||||
'queued',
|
||||
unixepoch(),
|
||||
unixepoch(),
|
||||
?,
|
||||
?
|
||||
) ON CONFLICT (deduping_key, job_status)
|
||||
WHERE
|
||||
deduping_key != ''
|
||||
AND job_status = 'queued' DO
|
||||
UPDATE
|
||||
SET
|
||||
job = EXCLUDED.job,
|
||||
execute_after = EXCLUDED.execute_after,
|
||||
updated_at = unixepoch(),
|
||||
remaining_attempts = EXCLUDED.remaining_attempts;
|
||||
|
||||
-- name: CompleteJob :exec
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
job_status = 'completed',
|
||||
finished_at = unixepoch(),
|
||||
updated_at = unixepoch(),
|
||||
consumer_fetched_at = 0,
|
||||
remaining_attempts = 0
|
||||
WHERE
|
||||
id = ?;
|
||||
|
||||
-- name: FailJob :exec
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
job_status = CASE
|
||||
WHEN remaining_attempts <= 1 THEN 'failed'
|
||||
ELSE 'queued'
|
||||
END,
|
||||
finished_at = 0,
|
||||
updated_at = unixepoch(),
|
||||
consumer_fetched_at = 0,
|
||||
remaining_attempts = MAX(remaining_attempts - 1, 0),
|
||||
errors = ?
|
||||
WHERE
|
||||
id = ?;
|
||||
|
||||
-- name: MarkJobsForConsumer :many
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
consumer_fetched_at = unixepoch(),
|
||||
updated_at = unixepoch(),
|
||||
job_status = 'fetched'
|
||||
WHERE
|
||||
jobs.job_status = 'queued'
|
||||
AND jobs.remaining_attempts > 0
|
||||
AND jobs.id IN (
|
||||
SELECT
|
||||
id
|
||||
FROM
|
||||
jobs js
|
||||
WHERE
|
||||
js.queue = ?
|
||||
AND js.job_status = 'queued'
|
||||
AND js.execute_after <= ?
|
||||
AND js.remaining_attempts > 0
|
||||
ORDER BY
|
||||
execute_after ASC
|
||||
LIMIT
|
||||
?
|
||||
) RETURNING *;
|
||||
|
||||
-- name: ResetJobs :execrows
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
job_status = CASE
|
||||
WHEN remaining_attempts <= 1 THEN 'failed'
|
||||
ELSE 'queued'
|
||||
END,
|
||||
updated_at = unixepoch(),
|
||||
consumer_fetched_at = 0,
|
||||
remaining_attempts = MAX(remaining_attempts - 1, 0),
|
||||
errors = json_insert(errors, '$[#]', 'visibility timeout expired')
|
||||
WHERE
|
||||
job_status = 'fetched'
|
||||
AND queue = ?
|
||||
AND consumer_fetched_at < ?;
|
||||
|
||||
-- name: FindJob :one
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
jobs
|
||||
WHERE
|
||||
id = ?;
|
||||
31
internal/liteq/db/schema.sql
Normal file
31
internal/liteq/db/schema.sql
Normal file
@@ -0,0 +1,31 @@
|
||||
PRAGMA journal_mode = WAL;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
id INTEGER NOT NULL,
|
||||
queue TEXT NOT NULL,
|
||||
job TEXT NOT NULL,
|
||||
job_status TEXT NOT NULL DEFAULT 'queued',
|
||||
execute_after INTEGER NOT NULL DEFAULT 0,
|
||||
remaining_attempts INTEGER NOT NULL DEFAULT 1,
|
||||
consumer_fetched_at INTEGER NOT NULL DEFAULT 0,
|
||||
finished_at INTEGER NOT NULL DEFAULT 0,
|
||||
deduping_key TEXT NOT NULL DEFAULT '',
|
||||
errors TEXT NOT NULL DEFAULT "[]",
|
||||
created_at INTEGER NOT NULL,
|
||||
updated_at INTEGER NOT NULL,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS todo ON jobs (queue, job_status, execute_after)
|
||||
WHERE
|
||||
job_status = 'queued'
|
||||
OR job_status = 'fetched';
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key)
|
||||
WHERE
|
||||
deduping_key != '';
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status)
|
||||
WHERE
|
||||
deduping_key != ''
|
||||
AND job_status = 'queued';
|
||||
31
internal/liteq/internal/db.go
Normal file
31
internal/liteq/internal/db.go
Normal file
@@ -0,0 +1,31 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.30.0
|
||||
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
type DBTX interface {
|
||||
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
|
||||
PrepareContext(context.Context, string) (*sql.Stmt, error)
|
||||
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
|
||||
}
|
||||
|
||||
func New(db DBTX) *Queries {
|
||||
return &Queries{db: db}
|
||||
}
|
||||
|
||||
type Queries struct {
|
||||
db DBTX
|
||||
}
|
||||
|
||||
func (q *Queries) WithTx(tx *sql.Tx) *Queries {
|
||||
return &Queries{
|
||||
db: tx,
|
||||
}
|
||||
}
|
||||
181
internal/liteq/internal/methods.go
Normal file
181
internal/liteq/internal/methods.go
Normal file
@@ -0,0 +1,181 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"database/sql/driver"
|
||||
|
||||
"github.com/alitto/pond"
|
||||
)
|
||||
|
||||
type QueueJobParams struct {
|
||||
Queue string
|
||||
Job string
|
||||
ExecuteAfter int64
|
||||
RemainingAttempts int64
|
||||
DedupingKey DedupingKey
|
||||
}
|
||||
|
||||
type DedupingKey interface {
|
||||
String() string
|
||||
ReplaceDuplicate() bool
|
||||
}
|
||||
|
||||
type IgnoreDuplicate string
|
||||
|
||||
func (i IgnoreDuplicate) String() string {
|
||||
return string(i)
|
||||
}
|
||||
func (i IgnoreDuplicate) ReplaceDuplicate() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type ReplaceDuplicate string
|
||||
|
||||
func (r ReplaceDuplicate) String() string {
|
||||
return string(r)
|
||||
}
|
||||
func (r ReplaceDuplicate) ReplaceDuplicate() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (q *Queries) QueueJob(ctx context.Context, params QueueJobParams) error {
|
||||
if params.RemainingAttempts == 0 {
|
||||
params.RemainingAttempts = 1
|
||||
}
|
||||
|
||||
if params.DedupingKey == nil {
|
||||
params.DedupingKey = IgnoreDuplicate("")
|
||||
}
|
||||
|
||||
doParams := doQueueJobIgnoreDupeParams{
|
||||
Queue: params.Queue,
|
||||
Job: params.Job,
|
||||
ExecuteAfter: params.ExecuteAfter,
|
||||
RemainingAttempts: params.RemainingAttempts,
|
||||
DedupingKey: params.DedupingKey.String(),
|
||||
}
|
||||
|
||||
if params.DedupingKey.String() == "" {
|
||||
return q.doQueueJobIgnoreDupe(ctx, doParams)
|
||||
}
|
||||
|
||||
if params.DedupingKey.ReplaceDuplicate() {
|
||||
return q.doQueueJobReplaceDupe(ctx, doQueueJobReplaceDupeParams(doParams))
|
||||
}
|
||||
|
||||
return q.doQueueJobIgnoreDupe(ctx, doParams)
|
||||
}
|
||||
|
||||
type GrabJobsParams struct {
|
||||
Queue string
|
||||
ExecuteAfter int64
|
||||
Count int64
|
||||
}
|
||||
|
||||
func (q *Queries) GrabJobs(ctx context.Context, params GrabJobsParams) ([]*Job, error) {
|
||||
executeAfter := time.Now().Unix()
|
||||
if params.ExecuteAfter > 0 {
|
||||
executeAfter = params.ExecuteAfter
|
||||
}
|
||||
limit := int64(1)
|
||||
if params.Count > 0 {
|
||||
limit = params.Count
|
||||
}
|
||||
|
||||
return q.MarkJobsForConsumer(ctx, MarkJobsForConsumerParams{
|
||||
Queue: params.Queue,
|
||||
ExecuteAfter: executeAfter,
|
||||
Limit: limit,
|
||||
})
|
||||
}
|
||||
|
||||
type ConsumeParams struct {
|
||||
Queue string
|
||||
PoolSize int
|
||||
Worker func(context.Context, *Job) error
|
||||
VisibilityTimeout int64
|
||||
OnEmptySleep time.Duration
|
||||
}
|
||||
|
||||
func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
|
||||
workers := pond.New(params.PoolSize, params.PoolSize)
|
||||
sleep := params.OnEmptySleep
|
||||
if sleep == 0 {
|
||||
sleep = 1 * time.Second
|
||||
}
|
||||
for {
|
||||
// If the context gets canceled for example, stop consuming
|
||||
if ctx.Err() != nil {
|
||||
log.Println("context cancelled in Consume()")
|
||||
return nil
|
||||
}
|
||||
|
||||
if params.VisibilityTimeout > 0 {
|
||||
_, err := q.ResetJobs(ctx, ResetJobsParams{
|
||||
Queue: params.Queue,
|
||||
ConsumerFetchedAt: time.Now().Unix() - params.VisibilityTimeout,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Println("error resetting jobs", err.Error())
|
||||
return fmt.Errorf("error resetting jobs: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
jobs, err := q.GrabJobs(ctx, GrabJobsParams{
|
||||
Queue: params.Queue,
|
||||
Count: int64(params.PoolSize),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("error grabbing jobs: %w", err)
|
||||
}
|
||||
|
||||
if len(jobs) == 0 {
|
||||
time.Sleep(sleep)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
job := job
|
||||
workers.Submit(func() {
|
||||
err := params.Worker(ctx, job)
|
||||
if err != nil {
|
||||
log.Println("worker failled", err.Error())
|
||||
q.FailJob(ctx, FailJobParams{
|
||||
ID: job.ID,
|
||||
Errors: ErrorList(append(job.Errors, err.Error())),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
q.CompleteJob(ctx, job.ID)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type ErrorList []string
|
||||
|
||||
func (e ErrorList) Value() (driver.Value, error) {
|
||||
if len(e) == 0 {
|
||||
return "[]", nil
|
||||
}
|
||||
return json.Marshal(e)
|
||||
}
|
||||
|
||||
func (e *ErrorList) Scan(src interface{}) error {
|
||||
switch src := src.(type) {
|
||||
case string:
|
||||
return json.Unmarshal([]byte(src), e)
|
||||
case []byte:
|
||||
return json.Unmarshal(src, e)
|
||||
default:
|
||||
return fmt.Errorf("unsupported type: %T", src)
|
||||
}
|
||||
}
|
||||
20
internal/liteq/internal/models.go
Normal file
20
internal/liteq/internal/models.go
Normal file
@@ -0,0 +1,20 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.30.0
|
||||
|
||||
package internal
|
||||
|
||||
type Job struct {
|
||||
ID int64
|
||||
Queue string
|
||||
Job string
|
||||
JobStatus string
|
||||
ExecuteAfter int64
|
||||
RemainingAttempts int64
|
||||
ConsumerFetchedAt int64
|
||||
FinishedAt int64
|
||||
DedupingKey string
|
||||
Errors ErrorList
|
||||
CreatedAt int64
|
||||
UpdatedAt int64
|
||||
}
|
||||
283
internal/liteq/internal/queries.sql.go
Normal file
283
internal/liteq/internal/queries.sql.go
Normal file
@@ -0,0 +1,283 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.30.0
|
||||
// source: queries.sql
|
||||
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
const completeJob = `-- name: CompleteJob :exec
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
job_status = 'completed',
|
||||
finished_at = unixepoch(),
|
||||
updated_at = unixepoch(),
|
||||
consumer_fetched_at = 0,
|
||||
remaining_attempts = 0
|
||||
WHERE
|
||||
id = ?
|
||||
`
|
||||
|
||||
func (q *Queries) CompleteJob(ctx context.Context, id int64) error {
|
||||
_, err := q.db.ExecContext(ctx, completeJob, id)
|
||||
return err
|
||||
}
|
||||
|
||||
const failJob = `-- name: FailJob :exec
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
job_status = CASE
|
||||
WHEN remaining_attempts <= 1 THEN 'failed'
|
||||
ELSE 'queued'
|
||||
END,
|
||||
finished_at = 0,
|
||||
updated_at = unixepoch(),
|
||||
consumer_fetched_at = 0,
|
||||
remaining_attempts = MAX(remaining_attempts - 1, 0),
|
||||
errors = ?
|
||||
WHERE
|
||||
id = ?
|
||||
`
|
||||
|
||||
type FailJobParams struct {
|
||||
Errors ErrorList
|
||||
ID int64
|
||||
}
|
||||
|
||||
func (q *Queries) FailJob(ctx context.Context, arg FailJobParams) error {
|
||||
_, err := q.db.ExecContext(ctx, failJob, arg.Errors, arg.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
const findJob = `-- name: FindJob :one
|
||||
SELECT
|
||||
id, queue, job, job_status, execute_after, remaining_attempts, consumer_fetched_at, finished_at, deduping_key, errors, created_at, updated_at
|
||||
FROM
|
||||
jobs
|
||||
WHERE
|
||||
id = ?
|
||||
`
|
||||
|
||||
func (q *Queries) FindJob(ctx context.Context, id int64) (*Job, error) {
|
||||
row := q.db.QueryRowContext(ctx, findJob, id)
|
||||
var i Job
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.Queue,
|
||||
&i.Job,
|
||||
&i.JobStatus,
|
||||
&i.ExecuteAfter,
|
||||
&i.RemainingAttempts,
|
||||
&i.ConsumerFetchedAt,
|
||||
&i.FinishedAt,
|
||||
&i.DedupingKey,
|
||||
&i.Errors,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return &i, err
|
||||
}
|
||||
|
||||
const markJobsForConsumer = `-- name: MarkJobsForConsumer :many
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
consumer_fetched_at = unixepoch(),
|
||||
updated_at = unixepoch(),
|
||||
job_status = 'fetched'
|
||||
WHERE
|
||||
jobs.job_status = 'queued'
|
||||
AND jobs.remaining_attempts > 0
|
||||
AND jobs.id IN (
|
||||
SELECT
|
||||
id
|
||||
FROM
|
||||
jobs js
|
||||
WHERE
|
||||
js.queue = ?
|
||||
AND js.job_status = 'queued'
|
||||
AND js.execute_after <= ?
|
||||
AND js.remaining_attempts > 0
|
||||
ORDER BY
|
||||
execute_after ASC
|
||||
LIMIT
|
||||
?
|
||||
) RETURNING id, queue, job, job_status, execute_after, remaining_attempts, consumer_fetched_at, finished_at, deduping_key, errors, created_at, updated_at
|
||||
`
|
||||
|
||||
type MarkJobsForConsumerParams struct {
|
||||
Queue string
|
||||
ExecuteAfter int64
|
||||
Limit int64
|
||||
}
|
||||
|
||||
func (q *Queries) MarkJobsForConsumer(ctx context.Context, arg MarkJobsForConsumerParams) ([]*Job, error) {
|
||||
rows, err := q.db.QueryContext(ctx, markJobsForConsumer, arg.Queue, arg.ExecuteAfter, arg.Limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []*Job
|
||||
for rows.Next() {
|
||||
var i Job
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.Queue,
|
||||
&i.Job,
|
||||
&i.JobStatus,
|
||||
&i.ExecuteAfter,
|
||||
&i.RemainingAttempts,
|
||||
&i.ConsumerFetchedAt,
|
||||
&i.FinishedAt,
|
||||
&i.DedupingKey,
|
||||
&i.Errors,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, &i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const resetJobs = `-- name: ResetJobs :execrows
|
||||
UPDATE
|
||||
jobs
|
||||
SET
|
||||
job_status = CASE
|
||||
WHEN remaining_attempts <= 1 THEN 'failed'
|
||||
ELSE 'queued'
|
||||
END,
|
||||
updated_at = unixepoch(),
|
||||
consumer_fetched_at = 0,
|
||||
remaining_attempts = MAX(remaining_attempts - 1, 0),
|
||||
errors = json_insert(errors, '$[#]', 'visibility timeout expired')
|
||||
WHERE
|
||||
job_status = 'fetched'
|
||||
AND queue = ?
|
||||
AND consumer_fetched_at < ?
|
||||
`
|
||||
|
||||
type ResetJobsParams struct {
|
||||
Queue string
|
||||
ConsumerFetchedAt int64
|
||||
}
|
||||
|
||||
func (q *Queries) ResetJobs(ctx context.Context, arg ResetJobsParams) (int64, error) {
|
||||
result, err := q.db.ExecContext(ctx, resetJobs, arg.Queue, arg.ConsumerFetchedAt)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result.RowsAffected()
|
||||
}
|
||||
|
||||
const doQueueJobIgnoreDupe = `-- name: doQueueJobIgnoreDupe :exec
|
||||
INSERT INTO
|
||||
jobs (
|
||||
queue,
|
||||
job,
|
||||
execute_after,
|
||||
job_status,
|
||||
created_at,
|
||||
updated_at,
|
||||
remaining_attempts,
|
||||
deduping_key
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
?,
|
||||
?,
|
||||
?,
|
||||
'queued',
|
||||
unixepoch(),
|
||||
unixepoch(),
|
||||
?,
|
||||
?
|
||||
) ON CONFLICT (deduping_key)
|
||||
WHERE
|
||||
deduping_key != '' DO NOTHING
|
||||
`
|
||||
|
||||
type doQueueJobIgnoreDupeParams struct {
|
||||
Queue string
|
||||
Job string
|
||||
ExecuteAfter int64
|
||||
RemainingAttempts int64
|
||||
DedupingKey string
|
||||
}
|
||||
|
||||
func (q *Queries) doQueueJobIgnoreDupe(ctx context.Context, arg doQueueJobIgnoreDupeParams) error {
|
||||
_, err := q.db.ExecContext(ctx, doQueueJobIgnoreDupe,
|
||||
arg.Queue,
|
||||
arg.Job,
|
||||
arg.ExecuteAfter,
|
||||
arg.RemainingAttempts,
|
||||
arg.DedupingKey,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
const doQueueJobReplaceDupe = `-- name: doQueueJobReplaceDupe :exec
|
||||
INSERT INTO
|
||||
jobs (
|
||||
queue,
|
||||
job,
|
||||
execute_after,
|
||||
job_status,
|
||||
created_at,
|
||||
updated_at,
|
||||
remaining_attempts,
|
||||
deduping_key
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
?,
|
||||
?,
|
||||
?,
|
||||
'queued',
|
||||
unixepoch(),
|
||||
unixepoch(),
|
||||
?,
|
||||
?
|
||||
) ON CONFLICT (deduping_key, job_status)
|
||||
WHERE
|
||||
deduping_key != ''
|
||||
AND job_status = 'queued' DO
|
||||
UPDATE
|
||||
SET
|
||||
job = EXCLUDED.job,
|
||||
execute_after = EXCLUDED.execute_after,
|
||||
updated_at = unixepoch(),
|
||||
remaining_attempts = EXCLUDED.remaining_attempts
|
||||
`
|
||||
|
||||
type doQueueJobReplaceDupeParams struct {
|
||||
Queue string
|
||||
Job string
|
||||
ExecuteAfter int64
|
||||
RemainingAttempts int64
|
||||
DedupingKey string
|
||||
}
|
||||
|
||||
func (q *Queries) doQueueJobReplaceDupe(ctx context.Context, arg doQueueJobReplaceDupeParams) error {
|
||||
_, err := q.db.ExecContext(ctx, doQueueJobReplaceDupe,
|
||||
arg.Queue,
|
||||
arg.Job,
|
||||
arg.ExecuteAfter,
|
||||
arg.RemainingAttempts,
|
||||
arg.DedupingKey,
|
||||
)
|
||||
return err
|
||||
}
|
||||
36
internal/liteq/internal/schema.go
Normal file
36
internal/liteq/internal/schema.go
Normal file
@@ -0,0 +1,36 @@
|
||||
// Code generated by Makefile. DO NOT EDIT.
|
||||
package internal
|
||||
|
||||
const Schema = `
|
||||
PRAGMA journal_mode = WAL;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
id INTEGER NOT NULL,
|
||||
queue TEXT NOT NULL,
|
||||
job TEXT NOT NULL,
|
||||
job_status TEXT NOT NULL DEFAULT 'queued',
|
||||
execute_after INTEGER NOT NULL DEFAULT 0,
|
||||
remaining_attempts INTEGER NOT NULL DEFAULT 1,
|
||||
consumer_fetched_at INTEGER NOT NULL DEFAULT 0,
|
||||
finished_at INTEGER NOT NULL DEFAULT 0,
|
||||
deduping_key TEXT NOT NULL DEFAULT '',
|
||||
errors TEXT NOT NULL DEFAULT "[]",
|
||||
created_at INTEGER NOT NULL,
|
||||
updated_at INTEGER NOT NULL,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS todo ON jobs (queue, job_status, execute_after)
|
||||
WHERE
|
||||
job_status = 'queued'
|
||||
OR job_status = 'fetched';
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key)
|
||||
WHERE
|
||||
deduping_key != '';
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status)
|
||||
WHERE
|
||||
deduping_key != ''
|
||||
AND job_status = 'queued';
|
||||
`
|
||||
438
internal/liteq/internal_test/jobs_test.go
Normal file
438
internal/liteq/internal_test/jobs_test.go
Normal file
@@ -0,0 +1,438 @@
|
||||
package internal_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.jadud.com/jadudm/grosbeak/internal/liteq/internal"
|
||||
"github.com/matryer/is"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
e := m.Run()
|
||||
|
||||
removeFiles("jobs.db", "jobs.db-journal", "jobs.db-wal", "jobs.db-shm")
|
||||
os.Exit(e)
|
||||
}
|
||||
|
||||
func removeFiles(files ...string) error {
|
||||
for _, file := range files {
|
||||
err := os.Remove(file)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getDb(file string) (*sql.DB, error) {
|
||||
if file != ":memory:" {
|
||||
err := removeFiles(file, file+"-journal", file+"-wal", file+"-shm")
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
db, err := sql.Open("sqlite", file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
schema, err := os.ReadFile("../db/schema.sql")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = db.Exec(string(schema))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func getJqueue(file string) (*internal.Queries, error) {
|
||||
sqlcdb, err := getDb(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return internal.New(sqlcdb), nil
|
||||
}
|
||||
|
||||
func Test_QueueJob(t *testing.T) {
|
||||
is := is.New(t)
|
||||
sqlcdb, err := getDb("jobs.db")
|
||||
is.NoErr(err) // error opening sqlite3 database
|
||||
|
||||
jqueue := internal.New(sqlcdb)
|
||||
jobPaylod := `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: jobPaylod,
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected 1 job
|
||||
is.Equal(jobs[0].Job, jobPaylod)
|
||||
}
|
||||
|
||||
func Test_FetchTwice(t *testing.T) {
|
||||
is := is.New(t)
|
||||
sqlcdb, err := getDb("jobs.db")
|
||||
is.NoErr(err) // error opening sqlite3 database
|
||||
|
||||
jqueue := internal.New(sqlcdb)
|
||||
jobPaylod := `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: jobPaylod,
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected 1 job
|
||||
is.Equal(jobs[0].Job, jobPaylod)
|
||||
|
||||
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 0) // expected 0 job
|
||||
}
|
||||
|
||||
// delay jobs, fetch jobs, check that we get no jobs, then check that we get the job after the delay
|
||||
func Test_DelayedJob(t *testing.T) {
|
||||
is := is.New(t)
|
||||
jqueue, err := getJqueue("jobs.db")
|
||||
is.NoErr(err) // error getting job queue
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
|
||||
ExecuteAfter: time.Now().Unix() + 1,
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 0) // expected 0 job
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected 1 job
|
||||
}
|
||||
|
||||
func Test_PrefetchJobs(t *testing.T) {
|
||||
is := is.New(t)
|
||||
jqueue, err := getJqueue(":memory:")
|
||||
is.NoErr(err) // error getting job queue
|
||||
for i := 0; i < 10; i++ {
|
||||
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
}
|
||||
|
||||
// grab the first 2
|
||||
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
Count: 2,
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 2) // expected 2 jobs
|
||||
|
||||
// the next 6
|
||||
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
Count: 6,
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 6) // expected 6 jobs
|
||||
|
||||
// try for 5 but only 2 are left
|
||||
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
Count: 5,
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 2) // expected 2 jobs
|
||||
}
|
||||
|
||||
func Test_Consume(t *testing.T) {
|
||||
is := is.New(t)
|
||||
// This somehow doesn't work well with in memory db
|
||||
jqueue, err := getJqueue("jobs.db")
|
||||
is.NoErr(err) // error getting job queue
|
||||
|
||||
fillq1 := make(chan struct{}, 1)
|
||||
go func() {
|
||||
for i := 0; i < 100; i++ {
|
||||
jobPayload := fmt.Sprintf("q1-%d", i)
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "q1",
|
||||
Job: jobPayload,
|
||||
})
|
||||
|
||||
is.NoErr(err) // error queuing job
|
||||
}
|
||||
fillq1 <- struct{}{}
|
||||
close(fillq1)
|
||||
}()
|
||||
|
||||
fillq2 := make(chan struct{}, 1)
|
||||
go func() {
|
||||
for i := 0; i < 100; i++ {
|
||||
jobPayload := fmt.Sprintf("q2-%d", i)
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "q2",
|
||||
Job: jobPayload,
|
||||
})
|
||||
|
||||
is.NoErr(err) // error queuing job
|
||||
}
|
||||
fillq2 <- struct{}{}
|
||||
close(fillq2)
|
||||
}()
|
||||
|
||||
q1 := make(chan string, 100)
|
||||
q1done := make(chan struct{}, 1)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
go func() {
|
||||
jqueue.Consume(ctx, internal.ConsumeParams{
|
||||
Queue: "q1",
|
||||
PoolSize: 1,
|
||||
Worker: func(ctx context.Context, job *internal.Job) error {
|
||||
q1 <- job.Job
|
||||
if job.Job == "q1-99" {
|
||||
q1done <- struct{}{}
|
||||
close(q1done)
|
||||
close(q1)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}()
|
||||
|
||||
q2 := make(chan string, 100)
|
||||
q2done := make(chan struct{}, 1)
|
||||
ctx2, cancel2 := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
jqueue.Consume(ctx2, internal.ConsumeParams{
|
||||
Queue: "q2",
|
||||
PoolSize: 1,
|
||||
Worker: func(ctx context.Context, job *internal.Job) error {
|
||||
q2 <- job.Job
|
||||
if job.Job == "q2-99" {
|
||||
q2done <- struct{}{}
|
||||
close(q2done)
|
||||
close(q2)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}()
|
||||
|
||||
<-fillq1
|
||||
<-q1done
|
||||
cancel()
|
||||
<-fillq2
|
||||
<-q2done
|
||||
cancel2()
|
||||
|
||||
i := 0
|
||||
for pl := range q1 {
|
||||
is.True(strings.HasPrefix(pl, "q1")) // expected q1-*
|
||||
i++
|
||||
}
|
||||
is.Equal(i, 100) // expected 100 jobs
|
||||
|
||||
j := 0
|
||||
for pl := range q2 {
|
||||
is.True(strings.HasPrefix(pl, "q2")) // expected q2-*
|
||||
j++
|
||||
}
|
||||
is.Equal(j, 100) // expected 100 jobs
|
||||
}
|
||||
|
||||
func Test_MultipleAttempts(t *testing.T) {
|
||||
is := is.New(t)
|
||||
jqueue, err := getJqueue("jobs.db")
|
||||
is.NoErr(err) // error getting job queue
|
||||
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
|
||||
RemainingAttempts: 3,
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected 1 job
|
||||
|
||||
thejob := jobs[0]
|
||||
is.Equal(thejob.RemainingAttempts, int64(3)) // expected 3 attempts
|
||||
|
||||
// Fail and verify
|
||||
err = jqueue.FailJob(context.Background(), internal.FailJobParams{
|
||||
ID: thejob.ID,
|
||||
Errors: internal.ErrorList{"error1"},
|
||||
})
|
||||
is.NoErr(err) // error failing job
|
||||
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected 1 job
|
||||
is.Equal(jobs[0].RemainingAttempts, int64(2)) // expected 2 attempts
|
||||
is.Equal(jobs[0].Errors, internal.ErrorList{"error1"})
|
||||
|
||||
// Fail again and verify
|
||||
err = jqueue.FailJob(context.Background(), internal.FailJobParams{
|
||||
ID: thejob.ID,
|
||||
Errors: internal.ErrorList{"error1", "error2"},
|
||||
})
|
||||
is.NoErr(err) // error failing job
|
||||
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected 1 job
|
||||
is.Equal(jobs[0].RemainingAttempts, int64(1)) // expected 1 attempts
|
||||
is.Equal(jobs[0].Errors, internal.ErrorList{"error1", "error2"})
|
||||
|
||||
// Fail again and verify
|
||||
err = jqueue.FailJob(context.Background(), internal.FailJobParams{
|
||||
ID: thejob.ID,
|
||||
Errors: internal.ErrorList{"error1", "error2", "error3"},
|
||||
})
|
||||
is.NoErr(err) // error failing job
|
||||
jobs, err = jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
})
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 0) // expected no job since no more attempts
|
||||
}
|
||||
|
||||
func Test_VisibilityTimeout(t *testing.T) {
|
||||
is := is.New(t)
|
||||
jqueue, err := getJqueue("jobs.db")
|
||||
is.NoErr(err) // error getting job queue
|
||||
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `{"type": "slack", "channel": "C01B2PZQZ3D", "text": "Hello world"}`,
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
id := int64(0)
|
||||
go func() {
|
||||
jqueue.Consume(ctx, internal.ConsumeParams{
|
||||
Queue: "",
|
||||
PoolSize: 1,
|
||||
VisibilityTimeout: 1,
|
||||
OnEmptySleep: 100 * time.Millisecond,
|
||||
Worker: func(ctx context.Context, job *internal.Job) error {
|
||||
id = job.ID
|
||||
time.Sleep(3 * time.Second)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}()
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
cancel()
|
||||
job, err := jqueue.FindJob(context.Background(), id)
|
||||
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(job.JobStatus, "failed") // expected fetched
|
||||
is.Equal(job.Errors, internal.ErrorList{"visibility timeout expired"})
|
||||
// Sleep to ensure enough time for the job to finish and avoid panics
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
|
||||
func Test_DedupeIgnore(t *testing.T) {
|
||||
is := is.New(t)
|
||||
jqueue, err := getJqueue("jobs.db")
|
||||
is.NoErr(err) // error getting job queue
|
||||
|
||||
log.Println("A")
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `job:1`,
|
||||
DedupingKey: internal.IgnoreDuplicate("dedupe_ignore"),
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
log.Println("B")
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `job:2`,
|
||||
DedupingKey: internal.IgnoreDuplicate("dedupe_ignore"),
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
Count: 10,
|
||||
})
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected only 1 job due to dedupe
|
||||
is.Equal(jobs[0].Job, `job:1`) // expected job:1
|
||||
}
|
||||
|
||||
func Test_DedupeReplace(t *testing.T) {
|
||||
is := is.New(t)
|
||||
jqueue, err := getJqueue("jobs.db")
|
||||
is.NoErr(err) // error getting job queue
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `job:1`,
|
||||
DedupingKey: internal.ReplaceDuplicate("dedupe_replace"),
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
err = jqueue.QueueJob(context.Background(), internal.QueueJobParams{
|
||||
Queue: "",
|
||||
Job: `job:2`,
|
||||
DedupingKey: internal.ReplaceDuplicate("dedupe_replace"),
|
||||
})
|
||||
is.NoErr(err) // error queuing job
|
||||
|
||||
jobs, err := jqueue.GrabJobs(context.Background(), internal.GrabJobsParams{
|
||||
Queue: "",
|
||||
Count: 10,
|
||||
})
|
||||
is.NoErr(err) // error fetching job for consumer
|
||||
is.Equal(len(jobs), 1) // expected only 1 job due to dedupe
|
||||
is.Equal(jobs[0].Job, `job:2`) // expected job:1
|
||||
}
|
||||
41
internal/liteq/liteq.go
Normal file
41
internal/liteq/liteq.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package liteq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
internal "git.jadud.com/jadudm/grosbeak/internal/liteq/internal"
|
||||
)
|
||||
|
||||
// Creates the db file with the tables and indexes
|
||||
func Setup(db *sql.DB) error {
|
||||
_, err := db.Exec(internal.Schema)
|
||||
return err
|
||||
}
|
||||
|
||||
func New(db *sql.DB) *JobQueue {
|
||||
queries := internal.New(db)
|
||||
return &JobQueue{queries}
|
||||
}
|
||||
|
||||
type JobQueue struct {
|
||||
queries *internal.Queries
|
||||
}
|
||||
|
||||
type QueueJobParams = internal.QueueJobParams
|
||||
type DedupingKey = internal.DedupingKey
|
||||
type IgnoreDuplicate = internal.IgnoreDuplicate
|
||||
type ReplaceDuplicate = internal.ReplaceDuplicate
|
||||
|
||||
func (jq *JobQueue) QueueJob(ctx context.Context, params QueueJobParams) error {
|
||||
return jq.queries.QueueJob(ctx, params)
|
||||
}
|
||||
|
||||
type ConsumeParams = internal.ConsumeParams
|
||||
|
||||
func (jq *JobQueue) Consume(ctx context.Context, params ConsumeParams) error {
|
||||
return jq.queries.Consume(ctx, params)
|
||||
}
|
||||
|
||||
type ErrorList = internal.ErrorList
|
||||
type Job = internal.Job
|
||||
17
internal/liteq/sqlc.yaml
Normal file
17
internal/liteq/sqlc.yaml
Normal file
@@ -0,0 +1,17 @@
|
||||
version: "2"
|
||||
sql:
|
||||
- engine: "sqlite"
|
||||
queries: "db/queries.sql"
|
||||
schema: "db/schema.sql"
|
||||
gen:
|
||||
go:
|
||||
package: "internal"
|
||||
out: "internal"
|
||||
emit_result_struct_pointers: true
|
||||
overrides:
|
||||
- column: jobs.consumer_id
|
||||
go_type: "string"
|
||||
- column: jobs.errors
|
||||
go_type:
|
||||
import: ""
|
||||
type: "ErrorList"
|
||||
9
internal/types/base.go
Normal file
9
internal/types/base.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.jadud.com/jadudm/grosbeak/internal/liteq"
|
||||
)
|
||||
|
||||
type QueueWorker func(ctx context.Context, job *liteq.Job) error
|
||||
9
internal/types/constants.go
Normal file
9
internal/types/constants.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package types
|
||||
|
||||
type UpdateFrequency string
|
||||
|
||||
const (
|
||||
UPDATE_DAILY UpdateFrequency = "DAILY"
|
||||
UPDATE_WEEKLY UpdateFrequency = "WEEKLY"
|
||||
UPDATE_MONTHLY UpdateFrequency = "MONTHLY"
|
||||
)
|
||||
22
internal/types/jobs.go
Normal file
22
internal/types/jobs.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package types
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
type Job interface {
|
||||
AsJson() string
|
||||
}
|
||||
|
||||
// Used by all of the jobs.
|
||||
func _as_json(j Job) string {
|
||||
as_json, _ := json.Marshal(j)
|
||||
return string(as_json)
|
||||
}
|
||||
|
||||
type EntreJob struct {
|
||||
URL string `json:"url"`
|
||||
UpdateFrequency UpdateFrequency `json:"update_frequency"`
|
||||
}
|
||||
|
||||
func (ej *EntreJob) AsJson() string {
|
||||
return _as_json(ej)
|
||||
}
|
||||
Reference in New Issue
Block a user