Persist repo ref hashes in db
This commit is contained in:
@@ -159,6 +159,16 @@ CREATE TABLE runners (
|
|||||||
name TEXT,
|
name TEXT,
|
||||||
secret TEXT
|
secret TEXT
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE TABLE pipeline_refs (
|
||||||
|
name TEXT PRIMARY KEY NOT NULL,
|
||||||
|
pipeline_id UUID NOT NULL,
|
||||||
|
hash TEXT NOT NULL,
|
||||||
|
|
||||||
|
CONSTRAINT fk_pipeline_id
|
||||||
|
FOREIGN KEY(pipeline_id)
|
||||||
|
REFERENCES pipelines(id)
|
||||||
|
);
|
||||||
`
|
`
|
||||||
|
|
||||||
_, err := conn.Exec(context.Background(), createTablesQuery)
|
_, err := conn.Exec(context.Background(), createTablesQuery)
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (db *Database) GetPipelines() ([]Pipeline, error) {
|
func (db *Database) GetPipelines() ([]Pipeline, error) {
|
||||||
@@ -344,3 +345,53 @@ WHERE pipeline=$1;`
|
|||||||
|
|
||||||
return runs, nil
|
return runs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *Database) GetPipelineRefs(pipelineId uuid.UUID) (map[string]string, error) {
|
||||||
|
query := `
|
||||||
|
SELECT name, hash
|
||||||
|
FROM pipeline_refs
|
||||||
|
WHERE pipeline_id=$1;`
|
||||||
|
|
||||||
|
refsMap := make(map[string]string)
|
||||||
|
|
||||||
|
refs, err := db.Conn.Query(context.Background(), query, pipelineId)
|
||||||
|
if err != nil {
|
||||||
|
return refsMap, fmt.Errorf("Could not get pipeline refs for pipeline with id \"%v\": %w", pipelineId, err)
|
||||||
|
}
|
||||||
|
defer refs.Close()
|
||||||
|
|
||||||
|
for refs.Next() {
|
||||||
|
var name string
|
||||||
|
var hash string
|
||||||
|
if err := refs.Scan(
|
||||||
|
&name,
|
||||||
|
&hash,
|
||||||
|
); err != nil {
|
||||||
|
return refsMap, err
|
||||||
|
}
|
||||||
|
|
||||||
|
refsMap[name] = hash
|
||||||
|
}
|
||||||
|
|
||||||
|
return refsMap, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) UpdatePipelineRefs(pipelineId uuid.UUID, refsMap map[string]string) error {
|
||||||
|
|
||||||
|
refsSlice := make([][]interface{}, 0)
|
||||||
|
for name, ref := range refsMap {
|
||||||
|
refsSlice = append(refsSlice, []interface{}{name, pipelineId, ref})
|
||||||
|
}
|
||||||
|
|
||||||
|
copyCount, err := db.Conn.CopyFrom(
|
||||||
|
context.Background(),
|
||||||
|
pgx.Identifier{"pipeline_refs"},
|
||||||
|
[]string{"name", "pipeline_id", "hash"},
|
||||||
|
pgx.CopyFromRows(refsSlice),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not insert updated pipeline refs: %w", err)
|
||||||
|
}
|
||||||
|
log.Debugf("copyCount: %v", copyCount)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
+18
-7
@@ -27,8 +27,13 @@ type tag struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db database.Database) {
|
func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db database.Database) {
|
||||||
prevCommits := make(map[string]string)
|
|
||||||
for {
|
for {
|
||||||
|
prevRefs, err := db.GetPipelineRefs(pipeline.Id)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Could not get pipeline refs from db: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
time.Sleep(time.Duration(pipeline.PollInterval) * time.Second)
|
time.Sleep(time.Duration(pipeline.PollInterval) * time.Second)
|
||||||
log.Infof("Polling repo %v", pipeline.Name)
|
log.Infof("Polling repo %v", pipeline.Name)
|
||||||
|
|
||||||
@@ -50,18 +55,18 @@ func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db da
|
|||||||
|
|
||||||
branches.ForEach(func(branch *plumbing.Reference) error {
|
branches.ForEach(func(branch *plumbing.Reference) error {
|
||||||
log.Debugf("Processing branch %v from repo %v (id: %v)", branch.Name().String(), pipeline.Name, pipeline.Id)
|
log.Debugf("Processing branch %v from repo %v (id: %v)", branch.Name().String(), pipeline.Name, pipeline.Id)
|
||||||
prevRef, ok := prevCommits[branch.Name().String()]
|
prevRef, ok := prevRefs[branch.Name().String()]
|
||||||
if ok {
|
if ok {
|
||||||
if branch.Hash().String() != prevRef {
|
if branch.Hash().String() != prevRef {
|
||||||
log.Debugf("Queuing job for branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String())
|
log.Debugf("Queuing job for branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String())
|
||||||
prevCommits[branch.Name().String()] = branch.Hash().String()
|
prevRefs[branch.Name().String()] = branch.Hash().String()
|
||||||
refsToRunFor = append(refsToRunFor, branch.Name().String())
|
refsToRunFor = append(refsToRunFor, branch.Name().String())
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("Branch %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String(), prevRef)
|
log.Debugf("Branch %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String(), prevRef)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("Queuing job for newly discovered branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String())
|
log.Debugf("Queuing job for newly discovered branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String())
|
||||||
prevCommits[branch.Name().String()] = branch.Hash().String()
|
prevRefs[branch.Name().String()] = branch.Hash().String()
|
||||||
refsToRunFor = append(refsToRunFor, branch.Name().String())
|
refsToRunFor = append(refsToRunFor, branch.Name().String())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -74,23 +79,29 @@ func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db da
|
|||||||
}
|
}
|
||||||
tags.ForEach(func(tag *plumbing.Reference) error {
|
tags.ForEach(func(tag *plumbing.Reference) error {
|
||||||
log.Debugf("Processing tag %v from repo %v (id: %v)", tag.Name().String(), pipeline.Name, pipeline.Id)
|
log.Debugf("Processing tag %v from repo %v (id: %v)", tag.Name().String(), pipeline.Name, pipeline.Id)
|
||||||
prevRef, ok := prevCommits[tag.Name().String()]
|
prevRef, ok := prevRefs[tag.Name().String()]
|
||||||
if ok {
|
if ok {
|
||||||
if tag.Hash().String() != prevRef {
|
if tag.Hash().String() != prevRef {
|
||||||
log.Debugf("Queuing job for tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String())
|
log.Debugf("Queuing job for tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String())
|
||||||
prevCommits[tag.Name().String()] = tag.Hash().String()
|
prevRefs[tag.Name().String()] = tag.Hash().String()
|
||||||
refsToRunFor = append(refsToRunFor, tag.Name().String())
|
refsToRunFor = append(refsToRunFor, tag.Name().String())
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("Tag %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String(), prevRef)
|
log.Debugf("Tag %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String(), prevRef)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("Queuing job for newly discovered tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String())
|
log.Debugf("Queuing job for newly discovered tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String())
|
||||||
prevCommits[tag.Name().String()] = tag.Hash().String()
|
prevRefs[tag.Name().String()] = tag.Hash().String()
|
||||||
refsToRunFor = append(refsToRunFor, tag.Name().String())
|
refsToRunFor = append(refsToRunFor, tag.Name().String())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
err = db.UpdatePipelineRefs(pipeline.Id, prevRefs)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Could not update pipeline refs: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
for _, ref := range refsToRunFor {
|
for _, ref := range refsToRunFor {
|
||||||
log.Debugf("Dispatching job for ref %v in repo %v (id: %v)", ref, pipeline.Name, pipeline.Id)
|
log.Debugf("Dispatching job for ref %v in repo %v (id: %v)", ref, pipeline.Name, pipeline.Id)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user