Queueing, prepping to fetch

A lot more to go, but this is the core.

Need to think about how to test the queue handlers.
This commit is contained in:
Matt Jadud
2025-11-30 21:29:30 -05:00
parent 06cdc68be7
commit f53639af2f
14 changed files with 258 additions and 105 deletions

View File

@@ -5,3 +5,6 @@ generate:
cd internal/domain64 ; make generate cd internal/domain64 ; make generate
cd internal/liteq ; sqlc generate cd internal/liteq ; sqlc generate
cd internal/liteq ; make schema-const cd internal/liteq ; make schema-const
run: generate
go run cmd/api/*.go

View File

@@ -32,3 +32,7 @@ The use-case is (essentially) single-user.
* https://github.com/jpillora/go-tld * https://github.com/jpillora/go-tld
* https://pkg.go.dev/modernc.org/sqlite * https://pkg.go.dev/modernc.org/sqlite
* https://github.com/sqlc-dev/sqlc * https://github.com/sqlc-dev/sqlc
* https://github.com/spf13/afero
* https://github.com/tidwall/gjson
//go:generate stringer -type=UpdateFrequency

78
cmd/api/init.go Normal file
View File

@@ -0,0 +1,78 @@
package main
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"time"
"git.jadud.com/jadudm/grosbeak/internal/domain64"
"git.jadud.com/jadudm/grosbeak/internal/engine"
liteq "git.jadud.com/jadudm/grosbeak/internal/liteq"
"git.jadud.com/jadudm/grosbeak/internal/types"
_ "modernc.org/sqlite"
)
func setupDB() *sql.DB {
// FIXME: This path needs to come from the env.
db, err := sql.Open("sqlite", "grosbeak.sqlite")
db.SetMaxOpenConns(1)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
return db
}
func runQ(queue *liteq.JobQueue, queueName string, worker types.QueueWorker) {
for {
log.Printf("runQ %s\n", queueName)
err := queue.Consume(context.Background(), liteq.ConsumeParams{
Queue: queueName,
PoolSize: 3,
VisibilityTimeout: 20,
Worker: worker,
})
if err != nil {
log.Printf("runQ/%s: %s", queueName, err.Error())
time.Sleep(2 * time.Second)
}
time.Sleep(1 * time.Second)
}
}
func setupLiteQ(db *sql.DB, d64m *domain64.Domain64Map) *liteq.JobQueue {
liteq.Setup(db)
queue := liteq.New(db)
// The queue processes as long as this context is not cancelled.
log.Println("setting up worker queues...")
queues := []struct {
queueName string
worker types.QueueWorker
}{
{"fetch", engine.Fetch(d64m)},
}
for _, q := range queues {
go runQ(queue, q.queueName, q.worker)
}
return queue
}
func setupDomain64Map(db *sql.DB) *domain64.Domain64Map {
d64m, err := domain64.NewDomain64Map()
if err != nil {
log.Printf("newdomain64map err: %s", err.Error())
os.Exit(1)
}
d64m.Setup(db)
return d64m
}

View File

@@ -1,107 +1,37 @@
package main package main
import ( import (
"context"
"database/sql"
"fmt"
"log" "log"
"math/rand"
"os"
"sync" "sync"
"time" "time"
_ "modernc.org/sqlite" _ "modernc.org/sqlite"
liteq "git.jadud.com/jadudm/grosbeak/internal/liteq" "git.jadud.com/jadudm/grosbeak/internal/engine"
base "git.jadud.com/jadudm/grosbeak/internal/types"
) )
type queueWorker func(ctx context.Context, job *liteq.Job) error
func Fetch(ctx context.Context, job *liteq.Job) error {
n := rand.Intn(50)
time.Sleep(time.Duration(n) * time.Millisecond)
log.Println("Fetching", job.Job)
return nil
}
func runQ(queue *liteq.JobQueue, queueName string, worker queueWorker) {
for {
err := queue.Consume(context.Background(), liteq.ConsumeParams{
Queue: queueName,
PoolSize: 3,
VisibilityTimeout: 20,
Worker: worker,
})
if err != nil {
log.Printf("runQ/%s: %w", queueName, err.Error())
time.Sleep(2 * time.Second)
}
time.Sleep(1 * time.Second)
}
}
func Entre(queue *liteq.JobQueue, chUrl <-chan string) {
ctx := context.Background()
for {
url := <-chUrl
n := time.Now()
ignore_tag := fmt.Sprintf("%s:%d:%d", url, n.Year(), n.YearDay())
log.Println("entre", url, ignore_tag)
// Don't duplicate jobs on the same day of the year.
err := queue.QueueJob(ctx, liteq.QueueJobParams{
Queue: "fetch",
// This only works for things in the `queued` state
DedupingKey: liteq.IgnoreDuplicate(ignore_tag),
Job: url,
})
if err != nil {
log.Println("entre err", err.Error())
}
}
}
func setupLiteQ() *liteq.JobQueue {
// FIXME: This path needs to come from the env.
liteqDB, err := sql.Open("sqlite", "liteq.db")
liteqDB.SetMaxOpenConns(1)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
liteq.Setup(liteqDB)
queue := liteq.New(liteqDB)
// The queue processes as long as this context is not cancelled.
log.Println("Setting up worker queues...")
queues := []struct {
queueName string
worker queueWorker
}{
{"fetch", Fetch},
}
for _, q := range queues {
go runQ(queue, q.queueName, q.worker)
}
return queue
}
func main() { func main() {
// Don't let `main()` exit // Don't let `main()` exit
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(1) wg.Add(1)
queue := setupLiteQ() db := setupDB()
d64m := setupDomain64Map(db)
queue := setupLiteQ(db, d64m)
// Create the network for the search engine. // Enqueue URLs
chUrl := make(chan string) urls := []struct {
url string
uf base.UpdateFrequency
}{
{"https://jadud.com/", base.UPDATE_DAILY},
{"https://berea.us/", base.UPDATE_WEEKLY},
}
go Entre(queue, chUrl) for _, u := range urls {
for range 5 { engine.Entre(queue, &base.EntreJob{URL: u.url, UpdateFrequency: u.uf})
chUrl <- "https://jadud.com/" time.Sleep(1 * time.Second)
chUrl <- "https://berea.us/"
} }
// Don't exit. // Don't exit.

3
go.mod
View File

@@ -8,6 +8,7 @@ require (
github.com/alitto/pond v1.9.2 github.com/alitto/pond v1.9.2
github.com/jpillora/go-tld v1.2.1 github.com/jpillora/go-tld v1.2.1
github.com/matryer/is v1.4.1 github.com/matryer/is v1.4.1
github.com/tidwall/gjson v1.18.0
modernc.org/sqlite v1.40.1 modernc.org/sqlite v1.40.1
) )
@@ -17,6 +18,8 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-isatty v0.0.20 // indirect
github.com/ncruces/go-strftime v1.0.0 // indirect github.com/ncruces/go-strftime v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/tidwall/match v1.2.0 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 // indirect golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 // indirect
golang.org/x/net v0.47.0 // indirect golang.org/x/net v0.47.0 // indirect
golang.org/x/sys v0.38.0 // indirect golang.org/x/sys v0.38.0 // indirect

8
go.sum
View File

@@ -18,6 +18,14 @@ github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOF
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/match v1.2.0 h1:0pt8FlkOwjN2fPt4bIl4BoNxb98gGHN2ObFEDkrfZnM=
github.com/tidwall/match v1.2.0/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 h1:DHNhtq3sNNzrvduZZIiFyXWOL9IWaDPHqTnLJp+rCBY= golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 h1:DHNhtq3sNNzrvduZZIiFyXWOL9IWaDPHqTnLJp+rCBY=
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39/go.mod h1:46edojNIoXTNOhySWIWdix628clX9ODXwPsQuG6hsK0= golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39/go.mod h1:46edojNIoXTNOhySWIWdix628clX9ODXwPsQuG6hsK0=
golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk= golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk=

View File

@@ -3,6 +3,7 @@ package domain64
import ( import (
"context" "context"
"database/sql" "database/sql"
_ "embed"
"fmt" "fmt"
"log" "log"
"slices" "slices"
@@ -12,21 +13,33 @@ import (
"github.com/jpillora/go-tld" "github.com/jpillora/go-tld"
) )
//go:embed sqlc/schema.sql
var ddl string
type Domain64Map struct { type Domain64Map struct {
mu sync.Mutex mu sync.Mutex
m map[string]int m map[string]int
DB *sql.DB DB *sql.DB
Flushed bool Queries *sqlc.Queries
} }
func NewDomain64Map(db *sql.DB) (*Domain64Map, error) { func NewDomain64Map() (*Domain64Map, error) {
d64m := &Domain64Map{} d64m := &Domain64Map{}
d64m.DB = db d64m.DB = nil
d64m.Flushed = true d64m.Queries = nil
// Init the tables if a DB pointer is passed in.
return d64m, nil return d64m, nil
} }
func (d64m *Domain64Map) Setup(db *sql.DB) {
log.Printf("creating domain64 tables\n")
if _, err := db.ExecContext(context.Background(), ddl); err != nil {
log.Printf("setup err: %s", err.Error())
panic(err)
}
d64m.DB = db
d64m.Queries = sqlc.New(db)
}
func (d64m *Domain64Map) URLToRFQDN(url *tld.URL) string { func (d64m *Domain64Map) URLToRFQDN(url *tld.URL) string {
s := "" s := ""
if url.TLD != "" { if url.TLD != "" {
@@ -140,21 +153,21 @@ func (d64m *Domain64Map) URLToDomain64(url *tld.URL) (*Domain64, error) {
} }
d64 := &Domain64{} d64 := &Domain64{}
queries := sqlc.New(d64m.DB)
// These manipulate both the DB and the Domain64 struct // These manipulate both the DB and the Domain64 struct
err := _get_or_insert_tld(queries, d64, url) err := _get_or_insert_tld(d64m.Queries, d64, url)
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = _get_or_insert_domain(queries, d64, url) err = _get_or_insert_domain(d64m.Queries, d64, url)
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = _get_or_insert_subdomain(queries, d64, url) err = _get_or_insert_subdomain(d64m.Queries, d64, url)
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = _get_or_insert_path(queries, d64, url) err = _get_or_insert_path(d64m.Queries, d64, url)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -3,16 +3,12 @@ package domain64
import ( import (
"context" "context"
"database/sql" "database/sql"
_ "embed"
"testing" "testing"
"github.com/jpillora/go-tld" "github.com/jpillora/go-tld"
_ "modernc.org/sqlite" _ "modernc.org/sqlite"
) )
//go:embed sqlc/schema.sql
var ddl string
func setup() *sql.DB { func setup() *sql.DB {
ctx := context.Background() ctx := context.Background()
@@ -32,7 +28,8 @@ func setup() *sql.DB {
func TestNewDomain64Map(t *testing.T) { func TestNewDomain64Map(t *testing.T) {
db := setup() db := setup()
M, err := NewDomain64Map(db) M, err := NewDomain64Map()
M.Setup(db)
if err != nil { if err != nil {
// TODO // TODO
t.Error(err) t.Error(err)
@@ -43,7 +40,8 @@ func TestNewDomain64Map(t *testing.T) {
} }
func TestURLToRFQDN(t *testing.T) { func TestURLToRFQDN(t *testing.T) {
M, _ := NewDomain64Map(nil) M, _ := NewDomain64Map()
// M.Setup(db)
simple, _ := tld.Parse("https://jadud.com/") simple, _ := tld.Parse("https://jadud.com/")
rfqdn := M.URLToRFQDN(simple) rfqdn := M.URLToRFQDN(simple)
if rfqdn != "com.jadud/" { if rfqdn != "com.jadud/" {
@@ -53,7 +51,8 @@ func TestURLToRFQDN(t *testing.T) {
func TestURLToDomain64(t *testing.T) { func TestURLToDomain64(t *testing.T) {
db := setup() db := setup()
M, _ := NewDomain64Map(db) M, _ := NewDomain64Map()
M.Setup(db)
simple, _ := tld.Parse("https://jadud.com/") simple, _ := tld.Parse("https://jadud.com/")
d64, _ := M.URLToDomain64(simple) d64, _ := M.URLToDomain64(simple)
if d64.TLD != 1 { if d64.TLD != 1 {
@@ -67,7 +66,8 @@ func TestURLToDomain64(t *testing.T) {
func TestURLToDomain64_02(t *testing.T) { func TestURLToDomain64_02(t *testing.T) {
db := setup() db := setup()
M, _ := NewDomain64Map(db) M, _ := NewDomain64Map()
M.Setup(db)
simple1, _ := tld.Parse("https://jadud.com/") simple1, _ := tld.Parse("https://jadud.com/")
simple2, _ := tld.Parse("https://another.com/") simple2, _ := tld.Parse("https://another.com/")
d64_1, _ := M.URLToDomain64(simple1) d64_1, _ := M.URLToDomain64(simple1)
@@ -93,7 +93,8 @@ func TestURLToDomain64_02(t *testing.T) {
func TestURLToDomain64_03(t *testing.T) { func TestURLToDomain64_03(t *testing.T) {
db := setup() db := setup()
M, _ := NewDomain64Map(db) M, _ := NewDomain64Map()
M.Setup(db)
var tests = []struct { var tests = []struct {
url string url string
tld int64 tld int64

39
internal/engine/entre.go Normal file
View File

@@ -0,0 +1,39 @@
package engine
import (
"context"
"fmt"
"log"
"time"
"git.jadud.com/jadudm/grosbeak/internal/liteq"
base "git.jadud.com/jadudm/grosbeak/internal/types"
)
func Entre(queue *liteq.JobQueue, ej *base.EntreJob) error {
n := time.Now()
var ignore_tag string
switch ej.UpdateFrequency {
case base.UPDATE_DAILY:
ignore_tag = fmt.Sprintf("%s|y%d-yd%d", ej.URL, n.Year(), n.YearDay())
case base.UPDATE_WEEKLY:
ignore_tag = fmt.Sprintf("%s|y%d-w%d", ej.URL, n.Year(), n.YearDay()/7)
case base.UPDATE_MONTHLY:
ignore_tag = fmt.Sprintf("%s|y%d-m%d", ej.URL, n.Year(), n.Month())
default:
ignore_tag = fmt.Sprintf("%s|y%d-yd%d", ej.URL, n.Year(), n.YearDay())
}
err := queue.QueueJob(context.Background(), liteq.QueueJobParams{
Queue: "fetch",
// This only works for things in the `queued` state
DedupingKey: liteq.IgnoreDuplicate(ignore_tag),
Job: ej.AsJson(),
})
if err != nil {
log.Printf("entre err %s: %s\n", ej.URL, err.Error())
}
return err
}

34
internal/engine/fetch.go Normal file
View File

@@ -0,0 +1,34 @@
package engine
import (
"context"
"log"
"git.jadud.com/jadudm/grosbeak/internal/domain64"
"git.jadud.com/jadudm/grosbeak/internal/liteq"
"git.jadud.com/jadudm/grosbeak/internal/types"
"github.com/jpillora/go-tld"
"github.com/tidwall/gjson"
)
func Fetch(d64m *domain64.Domain64Map) types.QueueWorker {
_f := func(ctx context.Context, job *liteq.Job) error {
url := gjson.Get(job.Job, "url").String()
turl, err := tld.Parse(url)
if err != nil {
// If we can't parse it, shut the job down as completed.
return nil
}
d64, err := d64m.URLToDomain64(turl)
if err != nil {
log.Printf("urltodomain64 err %s: %s\n", url, err.Error())
}
log.Printf("fetching %s 0x%016x\n", url, d64.ToInt64())
return nil
}
return _f
}

9
internal/types/base.go Normal file
View File

@@ -0,0 +1,9 @@
package types
import (
"context"
"git.jadud.com/jadudm/grosbeak/internal/liteq"
)
type QueueWorker func(ctx context.Context, job *liteq.Job) error

View File

@@ -0,0 +1,9 @@
package types
type UpdateFrequency string
const (
UPDATE_DAILY UpdateFrequency = "DAILY"
UPDATE_WEEKLY UpdateFrequency = "WEEKLY"
UPDATE_MONTHLY UpdateFrequency = "MONTHLY"
)

22
internal/types/jobs.go Normal file
View File

@@ -0,0 +1,22 @@
package types
import "encoding/json"
type Job interface {
AsJson() string
}
// Used by all of the jobs.
func _as_json(j Job) string {
as_json, _ := json.Marshal(j)
return string(as_json)
}
type EntreJob struct {
URL string `json:"url"`
UpdateFrequency UpdateFrequency `json:"update_frequency"`
}
func (ej *EntreJob) AsJson() string {
return _as_json(ej)
}