From 6d2936393ba7d3b5190062c35833c69ec1ab0c20 Mon Sep 17 00:00:00 2001 From: restitux Date: Wed, 8 Feb 2023 18:44:54 -0700 Subject: [PATCH] Persist repo ref hashes in db --- database/db.go | 10 ++++++++++ database/func.go | 51 ++++++++++++++++++++++++++++++++++++++++++++++++ poll/poll.go | 25 +++++++++++++++++------- 3 files changed, 79 insertions(+), 7 deletions(-) diff --git a/database/db.go b/database/db.go index 8838977..6eb0b7b 100644 --- a/database/db.go +++ b/database/db.go @@ -159,6 +159,16 @@ CREATE TABLE runners ( name TEXT, secret TEXT ); + +CREATE TABLE pipeline_refs ( + name TEXT PRIMARY KEY NOT NULL, + pipeline_id UUID NOT NULL, + hash TEXT NOT NULL, + + CONSTRAINT fk_pipeline_id + FOREIGN KEY(pipeline_id) + REFERENCES pipelines(id) +); ` _, err := conn.Exec(context.Background(), createTablesQuery) diff --git a/database/func.go b/database/func.go index 97cdbd1..fce2489 100644 --- a/database/func.go +++ b/database/func.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/google/uuid" + "github.com/jackc/pgx/v5" ) func (db *Database) GetPipelines() ([]Pipeline, error) { @@ -344,3 +345,53 @@ WHERE pipeline=$1;` return runs, nil } + +func (db *Database) GetPipelineRefs(pipelineId uuid.UUID) (map[string]string, error) { + query := ` +SELECT name, hash +FROM pipeline_refs +WHERE pipeline_id=$1;` + + refsMap := make(map[string]string) + + refs, err := db.Conn.Query(context.Background(), query, pipelineId) + if err != nil { + return refsMap, fmt.Errorf("Could not get pipeline refs for pipeline with id \"%v\": %w", pipelineId, err) + } + defer refs.Close() + + for refs.Next() { + var name string + var hash string + if err := refs.Scan( + &name, + &hash, + ); err != nil { + return refsMap, err + } + + refsMap[name] = hash + } + + return refsMap, nil +} + +func (db *Database) UpdatePipelineRefs(pipelineId uuid.UUID, refsMap map[string]string) error { + + refsSlice := make([][]interface{}, 0) + for name, ref := range refsMap { + refsSlice = append(refsSlice, []interface{}{name, pipelineId, ref}) + } + + copyCount, err := db.Conn.CopyFrom( + context.Background(), + pgx.Identifier{"pipeline_refs"}, + []string{"name", "pipeline_id", "hash"}, + pgx.CopyFromRows(refsSlice), + ) + if err != nil { + return fmt.Errorf("could not insert updated pipeline refs: %w", err) + } + log.Debugf("copyCount: %v", copyCount) + return nil +} diff --git a/poll/poll.go b/poll/poll.go index 367d5b7..5b75a2c 100644 --- a/poll/poll.go +++ b/poll/poll.go @@ -27,8 +27,13 @@ type tag struct { } func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db database.Database) { - prevCommits := make(map[string]string) for { + prevRefs, err := db.GetPipelineRefs(pipeline.Id) + if err != nil { + log.Errorf("Could not get pipeline refs from db: %v", err) + return + } + time.Sleep(time.Duration(pipeline.PollInterval) * time.Second) log.Infof("Polling repo %v", pipeline.Name) @@ -50,18 +55,18 @@ func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db da branches.ForEach(func(branch *plumbing.Reference) error { log.Debugf("Processing branch %v from repo %v (id: %v)", branch.Name().String(), pipeline.Name, pipeline.Id) - prevRef, ok := prevCommits[branch.Name().String()] + prevRef, ok := prevRefs[branch.Name().String()] if ok { if branch.Hash().String() != prevRef { log.Debugf("Queuing job for branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String()) - prevCommits[branch.Name().String()] = branch.Hash().String() + prevRefs[branch.Name().String()] = branch.Hash().String() refsToRunFor = append(refsToRunFor, branch.Name().String()) } else { log.Debugf("Branch %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String(), prevRef) } } else { log.Debugf("Queuing job for newly discovered branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String()) - prevCommits[branch.Name().String()] = branch.Hash().String() + prevRefs[branch.Name().String()] = branch.Hash().String() refsToRunFor = append(refsToRunFor, branch.Name().String()) } return nil @@ -74,23 +79,29 @@ func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db da } tags.ForEach(func(tag *plumbing.Reference) error { log.Debugf("Processing tag %v from repo %v (id: %v)", tag.Name().String(), pipeline.Name, pipeline.Id) - prevRef, ok := prevCommits[tag.Name().String()] + prevRef, ok := prevRefs[tag.Name().String()] if ok { if tag.Hash().String() != prevRef { log.Debugf("Queuing job for tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String()) - prevCommits[tag.Name().String()] = tag.Hash().String() + prevRefs[tag.Name().String()] = tag.Hash().String() refsToRunFor = append(refsToRunFor, tag.Name().String()) } else { log.Debugf("Tag %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String(), prevRef) } } else { log.Debugf("Queuing job for newly discovered tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String()) - prevCommits[tag.Name().String()] = tag.Hash().String() + prevRefs[tag.Name().String()] = tag.Hash().String() refsToRunFor = append(refsToRunFor, tag.Name().String()) } return nil }) + err = db.UpdatePipelineRefs(pipeline.Id, prevRefs) + if err != nil { + log.Errorf("Could not update pipeline refs: %v", err) + return + } + for _, ref := range refsToRunFor { log.Debugf("Dispatching job for ref %v in repo %v (id: %v)", ref, pipeline.Name, pipeline.Id)