From f53639af2f54d65fedf5357347e70feffeb36d99 Mon Sep 17 00:00:00 2001 From: Matt Jadud Date: Sun, 30 Nov 2025 21:29:30 -0500 Subject: [PATCH] Queueing, prepping to fetch A lot more to go, but this is the core. Need to think about how to test the queue handlers. --- Makefile | 3 + README.md | 4 + cmd/api/init.go | 78 ++++++++++++++ cmd/api/main.go | 102 +++--------------- go.mod | 3 + go.sum | 8 ++ internal/domain64/database.go | 33 ++++-- internal/domain64/database_test.go | 19 ++-- internal/engine/entre.go | 39 +++++++ internal/engine/fetch.go | 34 ++++++ .../{internal => internal_test}/jobs_test.go | 0 internal/types/base.go | 9 ++ internal/types/constants.go | 9 ++ internal/types/jobs.go | 22 ++++ 14 files changed, 258 insertions(+), 105 deletions(-) create mode 100644 cmd/api/init.go create mode 100644 internal/engine/entre.go create mode 100644 internal/engine/fetch.go rename internal/liteq/{internal => internal_test}/jobs_test.go (100%) create mode 100644 internal/types/base.go create mode 100644 internal/types/constants.go create mode 100644 internal/types/jobs.go diff --git a/Makefile b/Makefile index 8779f28..2bfeb75 100644 --- a/Makefile +++ b/Makefile @@ -5,3 +5,6 @@ generate: cd internal/domain64 ; make generate cd internal/liteq ; sqlc generate cd internal/liteq ; make schema-const + +run: generate + go run cmd/api/*.go \ No newline at end of file diff --git a/README.md b/README.md index b91d7f8..2bbcd85 100644 --- a/README.md +++ b/README.md @@ -32,3 +32,7 @@ The use-case is (essentially) single-user. * https://github.com/jpillora/go-tld * https://pkg.go.dev/modernc.org/sqlite * https://github.com/sqlc-dev/sqlc +* https://github.com/spf13/afero +* https://github.com/tidwall/gjson + +//go:generate stringer -type=UpdateFrequency diff --git a/cmd/api/init.go b/cmd/api/init.go new file mode 100644 index 0000000..d6eded4 --- /dev/null +++ b/cmd/api/init.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "log" + "os" + "time" + + "git.jadud.com/jadudm/grosbeak/internal/domain64" + "git.jadud.com/jadudm/grosbeak/internal/engine" + liteq "git.jadud.com/jadudm/grosbeak/internal/liteq" + "git.jadud.com/jadudm/grosbeak/internal/types" + _ "modernc.org/sqlite" +) + +func setupDB() *sql.DB { + + // FIXME: This path needs to come from the env. + db, err := sql.Open("sqlite", "grosbeak.sqlite") + db.SetMaxOpenConns(1) + + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + return db +} + +func runQ(queue *liteq.JobQueue, queueName string, worker types.QueueWorker) { + for { + log.Printf("runQ %s\n", queueName) + + err := queue.Consume(context.Background(), liteq.ConsumeParams{ + Queue: queueName, + PoolSize: 3, + VisibilityTimeout: 20, + Worker: worker, + }) + + if err != nil { + log.Printf("runQ/%s: %s", queueName, err.Error()) + time.Sleep(2 * time.Second) + } + + time.Sleep(1 * time.Second) + } +} + +func setupLiteQ(db *sql.DB, d64m *domain64.Domain64Map) *liteq.JobQueue { + liteq.Setup(db) + queue := liteq.New(db) + // The queue processes as long as this context is not cancelled. + + log.Println("setting up worker queues...") + queues := []struct { + queueName string + worker types.QueueWorker + }{ + {"fetch", engine.Fetch(d64m)}, + } + for _, q := range queues { + go runQ(queue, q.queueName, q.worker) + } + return queue +} + +func setupDomain64Map(db *sql.DB) *domain64.Domain64Map { + d64m, err := domain64.NewDomain64Map() + if err != nil { + log.Printf("newdomain64map err: %s", err.Error()) + os.Exit(1) + } + d64m.Setup(db) + return d64m +} diff --git a/cmd/api/main.go b/cmd/api/main.go index b8cbf32..1544d32 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -1,107 +1,37 @@ package main import ( - "context" - "database/sql" - "fmt" "log" - "math/rand" - "os" "sync" "time" _ "modernc.org/sqlite" - liteq "git.jadud.com/jadudm/grosbeak/internal/liteq" + "git.jadud.com/jadudm/grosbeak/internal/engine" + base "git.jadud.com/jadudm/grosbeak/internal/types" ) -type queueWorker func(ctx context.Context, job *liteq.Job) error - -func Fetch(ctx context.Context, job *liteq.Job) error { - n := rand.Intn(50) - - time.Sleep(time.Duration(n) * time.Millisecond) - log.Println("Fetching", job.Job) - return nil -} - -func runQ(queue *liteq.JobQueue, queueName string, worker queueWorker) { - for { - err := queue.Consume(context.Background(), liteq.ConsumeParams{ - Queue: queueName, - PoolSize: 3, - VisibilityTimeout: 20, - Worker: worker, - }) - if err != nil { - log.Printf("runQ/%s: %w", queueName, err.Error()) - time.Sleep(2 * time.Second) - } - time.Sleep(1 * time.Second) - } -} - -func Entre(queue *liteq.JobQueue, chUrl <-chan string) { - ctx := context.Background() - for { - url := <-chUrl - n := time.Now() - ignore_tag := fmt.Sprintf("%s:%d:%d", url, n.Year(), n.YearDay()) - log.Println("entre", url, ignore_tag) - // Don't duplicate jobs on the same day of the year. - err := queue.QueueJob(ctx, liteq.QueueJobParams{ - Queue: "fetch", - // This only works for things in the `queued` state - DedupingKey: liteq.IgnoreDuplicate(ignore_tag), - Job: url, - }) - if err != nil { - log.Println("entre err", err.Error()) - } - } -} - -func setupLiteQ() *liteq.JobQueue { - - // FIXME: This path needs to come from the env. - liteqDB, err := sql.Open("sqlite", "liteq.db") - liteqDB.SetMaxOpenConns(1) - - if err != nil { - fmt.Println(err) - os.Exit(1) - } - liteq.Setup(liteqDB) - queue := liteq.New(liteqDB) - // The queue processes as long as this context is not cancelled. - - log.Println("Setting up worker queues...") - queues := []struct { - queueName string - worker queueWorker - }{ - {"fetch", Fetch}, - } - for _, q := range queues { - go runQ(queue, q.queueName, q.worker) - } - return queue -} - func main() { // Don't let `main()` exit wg := &sync.WaitGroup{} wg.Add(1) - queue := setupLiteQ() + db := setupDB() + d64m := setupDomain64Map(db) + queue := setupLiteQ(db, d64m) - // Create the network for the search engine. - chUrl := make(chan string) + // Enqueue URLs + urls := []struct { + url string + uf base.UpdateFrequency + }{ + {"https://jadud.com/", base.UPDATE_DAILY}, + {"https://berea.us/", base.UPDATE_WEEKLY}, + } - go Entre(queue, chUrl) - for range 5 { - chUrl <- "https://jadud.com/" - chUrl <- "https://berea.us/" + for _, u := range urls { + engine.Entre(queue, &base.EntreJob{URL: u.url, UpdateFrequency: u.uf}) + time.Sleep(1 * time.Second) } // Don't exit. diff --git a/go.mod b/go.mod index b2a91dd..379d587 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/alitto/pond v1.9.2 github.com/jpillora/go-tld v1.2.1 github.com/matryer/is v1.4.1 + github.com/tidwall/gjson v1.18.0 modernc.org/sqlite v1.40.1 ) @@ -17,6 +18,8 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/ncruces/go-strftime v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/tidwall/match v1.2.0 // indirect + github.com/tidwall/pretty v1.2.1 // indirect golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 // indirect golang.org/x/net v0.47.0 // indirect golang.org/x/sys v0.38.0 // indirect diff --git a/go.sum b/go.sum index 79846f2..dd1ed08 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,14 @@ github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOF github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/match v1.2.0 h1:0pt8FlkOwjN2fPt4bIl4BoNxb98gGHN2ObFEDkrfZnM= +github.com/tidwall/match v1.2.0/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 h1:DHNhtq3sNNzrvduZZIiFyXWOL9IWaDPHqTnLJp+rCBY= golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39/go.mod h1:46edojNIoXTNOhySWIWdix628clX9ODXwPsQuG6hsK0= golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk= diff --git a/internal/domain64/database.go b/internal/domain64/database.go index 5165df4..8de92b0 100644 --- a/internal/domain64/database.go +++ b/internal/domain64/database.go @@ -3,6 +3,7 @@ package domain64 import ( "context" "database/sql" + _ "embed" "fmt" "log" "slices" @@ -12,21 +13,33 @@ import ( "github.com/jpillora/go-tld" ) +//go:embed sqlc/schema.sql +var ddl string + type Domain64Map struct { mu sync.Mutex m map[string]int DB *sql.DB - Flushed bool + Queries *sqlc.Queries } -func NewDomain64Map(db *sql.DB) (*Domain64Map, error) { +func NewDomain64Map() (*Domain64Map, error) { d64m := &Domain64Map{} - d64m.DB = db - d64m.Flushed = true - // Init the tables if a DB pointer is passed in. + d64m.DB = nil + d64m.Queries = nil return d64m, nil } +func (d64m *Domain64Map) Setup(db *sql.DB) { + log.Printf("creating domain64 tables\n") + if _, err := db.ExecContext(context.Background(), ddl); err != nil { + log.Printf("setup err: %s", err.Error()) + panic(err) + } + d64m.DB = db + d64m.Queries = sqlc.New(db) +} + func (d64m *Domain64Map) URLToRFQDN(url *tld.URL) string { s := "" if url.TLD != "" { @@ -140,21 +153,21 @@ func (d64m *Domain64Map) URLToDomain64(url *tld.URL) (*Domain64, error) { } d64 := &Domain64{} - queries := sqlc.New(d64m.DB) + // These manipulate both the DB and the Domain64 struct - err := _get_or_insert_tld(queries, d64, url) + err := _get_or_insert_tld(d64m.Queries, d64, url) if err != nil { return nil, err } - err = _get_or_insert_domain(queries, d64, url) + err = _get_or_insert_domain(d64m.Queries, d64, url) if err != nil { return nil, err } - err = _get_or_insert_subdomain(queries, d64, url) + err = _get_or_insert_subdomain(d64m.Queries, d64, url) if err != nil { return nil, err } - err = _get_or_insert_path(queries, d64, url) + err = _get_or_insert_path(d64m.Queries, d64, url) if err != nil { return nil, err } diff --git a/internal/domain64/database_test.go b/internal/domain64/database_test.go index 57740be..c61f992 100644 --- a/internal/domain64/database_test.go +++ b/internal/domain64/database_test.go @@ -3,16 +3,12 @@ package domain64 import ( "context" "database/sql" - _ "embed" "testing" "github.com/jpillora/go-tld" _ "modernc.org/sqlite" ) -//go:embed sqlc/schema.sql -var ddl string - func setup() *sql.DB { ctx := context.Background() @@ -32,7 +28,8 @@ func setup() *sql.DB { func TestNewDomain64Map(t *testing.T) { db := setup() - M, err := NewDomain64Map(db) + M, err := NewDomain64Map() + M.Setup(db) if err != nil { // TODO t.Error(err) @@ -43,7 +40,8 @@ func TestNewDomain64Map(t *testing.T) { } func TestURLToRFQDN(t *testing.T) { - M, _ := NewDomain64Map(nil) + M, _ := NewDomain64Map() + // M.Setup(db) simple, _ := tld.Parse("https://jadud.com/") rfqdn := M.URLToRFQDN(simple) if rfqdn != "com.jadud/" { @@ -53,7 +51,8 @@ func TestURLToRFQDN(t *testing.T) { func TestURLToDomain64(t *testing.T) { db := setup() - M, _ := NewDomain64Map(db) + M, _ := NewDomain64Map() + M.Setup(db) simple, _ := tld.Parse("https://jadud.com/") d64, _ := M.URLToDomain64(simple) if d64.TLD != 1 { @@ -67,7 +66,8 @@ func TestURLToDomain64(t *testing.T) { func TestURLToDomain64_02(t *testing.T) { db := setup() - M, _ := NewDomain64Map(db) + M, _ := NewDomain64Map() + M.Setup(db) simple1, _ := tld.Parse("https://jadud.com/") simple2, _ := tld.Parse("https://another.com/") d64_1, _ := M.URLToDomain64(simple1) @@ -93,7 +93,8 @@ func TestURLToDomain64_02(t *testing.T) { func TestURLToDomain64_03(t *testing.T) { db := setup() - M, _ := NewDomain64Map(db) + M, _ := NewDomain64Map() + M.Setup(db) var tests = []struct { url string tld int64 diff --git a/internal/engine/entre.go b/internal/engine/entre.go new file mode 100644 index 0000000..5645611 --- /dev/null +++ b/internal/engine/entre.go @@ -0,0 +1,39 @@ +package engine + +import ( + "context" + "fmt" + "log" + "time" + + "git.jadud.com/jadudm/grosbeak/internal/liteq" + base "git.jadud.com/jadudm/grosbeak/internal/types" +) + +func Entre(queue *liteq.JobQueue, ej *base.EntreJob) error { + n := time.Now() + + var ignore_tag string + + switch ej.UpdateFrequency { + case base.UPDATE_DAILY: + ignore_tag = fmt.Sprintf("%s|y%d-yd%d", ej.URL, n.Year(), n.YearDay()) + case base.UPDATE_WEEKLY: + ignore_tag = fmt.Sprintf("%s|y%d-w%d", ej.URL, n.Year(), n.YearDay()/7) + case base.UPDATE_MONTHLY: + ignore_tag = fmt.Sprintf("%s|y%d-m%d", ej.URL, n.Year(), n.Month()) + default: + ignore_tag = fmt.Sprintf("%s|y%d-yd%d", ej.URL, n.Year(), n.YearDay()) + } + + err := queue.QueueJob(context.Background(), liteq.QueueJobParams{ + Queue: "fetch", + // This only works for things in the `queued` state + DedupingKey: liteq.IgnoreDuplicate(ignore_tag), + Job: ej.AsJson(), + }) + if err != nil { + log.Printf("entre err %s: %s\n", ej.URL, err.Error()) + } + return err +} diff --git a/internal/engine/fetch.go b/internal/engine/fetch.go new file mode 100644 index 0000000..c58a668 --- /dev/null +++ b/internal/engine/fetch.go @@ -0,0 +1,34 @@ +package engine + +import ( + "context" + "log" + + "git.jadud.com/jadudm/grosbeak/internal/domain64" + "git.jadud.com/jadudm/grosbeak/internal/liteq" + "git.jadud.com/jadudm/grosbeak/internal/types" + "github.com/jpillora/go-tld" + "github.com/tidwall/gjson" +) + +func Fetch(d64m *domain64.Domain64Map) types.QueueWorker { + _f := func(ctx context.Context, job *liteq.Job) error { + url := gjson.Get(job.Job, "url").String() + + turl, err := tld.Parse(url) + if err != nil { + // If we can't parse it, shut the job down as completed. + return nil + } + + d64, err := d64m.URLToDomain64(turl) + if err != nil { + log.Printf("urltodomain64 err %s: %s\n", url, err.Error()) + } + + log.Printf("fetching %s 0x%016x\n", url, d64.ToInt64()) + return nil + } + + return _f +} diff --git a/internal/liteq/internal/jobs_test.go b/internal/liteq/internal_test/jobs_test.go similarity index 100% rename from internal/liteq/internal/jobs_test.go rename to internal/liteq/internal_test/jobs_test.go diff --git a/internal/types/base.go b/internal/types/base.go new file mode 100644 index 0000000..e10959a --- /dev/null +++ b/internal/types/base.go @@ -0,0 +1,9 @@ +package types + +import ( + "context" + + "git.jadud.com/jadudm/grosbeak/internal/liteq" +) + +type QueueWorker func(ctx context.Context, job *liteq.Job) error diff --git a/internal/types/constants.go b/internal/types/constants.go new file mode 100644 index 0000000..c8bdc82 --- /dev/null +++ b/internal/types/constants.go @@ -0,0 +1,9 @@ +package types + +type UpdateFrequency string + +const ( + UPDATE_DAILY UpdateFrequency = "DAILY" + UPDATE_WEEKLY UpdateFrequency = "WEEKLY" + UPDATE_MONTHLY UpdateFrequency = "MONTHLY" +) diff --git a/internal/types/jobs.go b/internal/types/jobs.go new file mode 100644 index 0000000..ac043ee --- /dev/null +++ b/internal/types/jobs.go @@ -0,0 +1,22 @@ +package types + +import "encoding/json" + +type Job interface { + AsJson() string +} + +// Used by all of the jobs. +func _as_json(j Job) string { + as_json, _ := json.Marshal(j) + return string(as_json) +} + +type EntreJob struct { + URL string `json:"url"` + UpdateFrequency UpdateFrequency `json:"update_frequency"` +} + +func (ej *EntreJob) AsJson() string { + return _as_json(ej) +}