Add admin_api and database logging of runs #9 #10

This commit is contained in:
2023-01-31 20:25:52 -07:00
parent 724757b23c
commit 4bda3c7a3b
11 changed files with 400 additions and 136 deletions
+57 -25
View File
@@ -3,9 +3,11 @@ package poll
import (
"time"
"github.com/google/uuid"
"github.com/op/go-logging"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
"git.ohea.xyz/cursorius/server/pipeline_executor"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing"
@@ -24,17 +26,17 @@ type tag struct {
commitHash string
}
func pollJob(repoName string, jobConfig config.Job, pipelineConf config.PipelineConf) {
func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db database.Database) {
prevCommits := make(map[string]string)
for {
time.Sleep(time.Duration(jobConfig.PollInterval) * time.Second)
log.Infof("Polling repo %v", repoName)
time.Sleep(time.Duration(pipeline.PollInterval) * time.Second)
log.Infof("Polling repo %v", pipeline.Name)
repo, err := git.Clone(memory.NewStorage(), nil, &git.CloneOptions{
URL: jobConfig.URL,
URL: pipeline.Url,
})
if err != nil {
log.Errorf("Could not clone repo %v from url %v: %v", repoName, jobConfig.URL, err)
log.Errorf("Could not clone repo %v from url %v: %v", pipeline.Name, pipeline.Url, err)
continue
}
refsToRunFor := []string{}
@@ -42,23 +44,23 @@ func pollJob(repoName string, jobConfig config.Job, pipelineConf config.Pipeline
// get branches
branches, err := repo.Branches()
if err != nil {
log.Errorf("Could not enumerate branches in repo %v: %v", repoName, err)
log.Errorf("Could not enumerate branches in repo %v: %v", pipeline.Name, err)
continue
}
branches.ForEach(func(branch *plumbing.Reference) error {
log.Debugf("Processing branch %v from repo %v", branch.Name().String(), repoName)
log.Debugf("Processing branch %v from repo %v (id: %v)", branch.Name().String(), pipeline.Name, pipeline.Id)
prevRef, ok := prevCommits[branch.Name().String()]
if ok {
if branch.Hash().String() != prevRef {
log.Debugf("Queuing job for branch %v in repo %v with hash %v", branch.Name().String(), repoName, branch.Hash().String())
log.Debugf("Queuing job for branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String())
prevCommits[branch.Name().String()] = branch.Hash().String()
refsToRunFor = append(refsToRunFor, branch.Name().String())
} else {
log.Debugf("Branch %v in repo %v has hash %v, which matches the previously seen hash of %v", branch.Name().String(), repoName, branch.Hash().String(), prevRef)
log.Debugf("Branch %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String(), prevRef)
}
} else {
log.Debugf("Queuing job for newly discovered branch %v in repo %v with hash %v", branch.Name().String(), repoName, branch.Hash().String())
log.Debugf("Queuing job for newly discovered branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String())
prevCommits[branch.Name().String()] = branch.Hash().String()
refsToRunFor = append(refsToRunFor, branch.Name().String())
}
@@ -67,22 +69,22 @@ func pollJob(repoName string, jobConfig config.Job, pipelineConf config.Pipeline
tags, err := repo.Tags()
if err != nil {
log.Errorf("Could not enumerate tags in repo %v: %v", repoName, err)
log.Errorf("Could not enumerate tags in repo %v: %v", pipeline.Name, err)
continue
}
tags.ForEach(func(tag *plumbing.Reference) error {
log.Debugf("Processing tag %v from repo %v", tag.Name().String(), repoName)
log.Debugf("Processing tag %v from repo %v (id: %v)", tag.Name().String(), pipeline.Name, pipeline.Id)
prevRef, ok := prevCommits[tag.Name().String()]
if ok {
if tag.Hash().String() != prevRef {
log.Debugf("Queuing job for tag %v in repo %v with hash %v", tag.Name().String(), repoName, tag.Hash().String())
log.Debugf("Queuing job for tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String())
prevCommits[tag.Name().String()] = tag.Hash().String()
refsToRunFor = append(refsToRunFor, tag.Name().String())
} else {
log.Debugf("Tag %v in repo %v has hash %v, which matches the previously seen hash of %v", tag.Name().String(), repoName, tag.Hash().String(), prevRef)
log.Debugf("Tag %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String(), prevRef)
}
} else {
log.Debugf("Queuing job for newly discovered tag %v in repo %v with hash %v", tag.Name().String(), repoName, tag.Hash().String())
log.Debugf("Queuing job for newly discovered tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String())
prevCommits[tag.Name().String()] = tag.Hash().String()
refsToRunFor = append(refsToRunFor, tag.Name().String())
}
@@ -90,25 +92,55 @@ func pollJob(repoName string, jobConfig config.Job, pipelineConf config.Pipeline
})
for _, ref := range refsToRunFor {
log.Debugf("Dispatching job for ref %v in repo %v", ref, repoName)
log.Debugf("Dispatching job for ref %v in repo %v (id: %v)", ref, pipeline.Name, pipeline.Id)
pe := pipeline_executor.PipelineExecution{
Name: repoName,
Job: jobConfig,
Ref: ref,
run, err := db.CreateRun(pipeline.Id)
if err != nil {
log.Errorf("Could not create run for pipeline with id \"%v\": ", pipeline.Id, err)
continue
}
pipeline_executor.ExecutePipeline(pe, pipelineConf)
pe := pipeline_executor.PipelineExecution{
Pipeline: pipeline,
Ref: ref,
Run: run,
}
go pipeline_executor.ExecutePipeline(pe, db, pipelineConf)
}
}
}
func StartPolling(conf config.Config) {
for jobName, job := range conf.Jobs {
if job.PollInterval == 0 {
func launchPollJobs(conf config.PipelineConf, db database.Database, pollChan chan uuid.UUID) {
pipelines, err := db.GetPipelines()
if err != nil {
log.Errorf("Could not get pipelines from database: %w", err)
return
}
for _, pipeline := range pipelines {
if pipeline.PollInterval == 0 {
continue
} else {
go pollJob(jobName, job, conf.PipelineConf)
go pollJob(pipeline, conf, db)
}
}
for {
jobUUID := <-pollChan
pipeline, err := db.GetPipelineById(jobUUID)
if err != nil {
log.Errorf("Could not get pipeline with id \"%v\" from database: %v", err)
continue
}
// TODO: stop existing polling process for given uuid
go pollJob(pipeline, conf, db)
}
}
func StartPolling(conf config.PipelineConf, db database.Database) chan uuid.UUID {
pollChan := make(chan uuid.UUID)
go launchPollJobs(conf, db, pollChan)
return pollChan
}