3cbe670bc1
This includes refactoring the jobscheduler into the runnermanager. This service manages runner connections and allocating them to pipelines. These requests are done via the pipeline grpc api
115 lines
3.6 KiB
Go
115 lines
3.6 KiB
Go
package poll
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/op/go-logging"
|
|
|
|
"git.ohea.xyz/cursorius/server/config"
|
|
"git.ohea.xyz/cursorius/server/pipeline_executor"
|
|
"github.com/go-git/go-git/v5"
|
|
"github.com/go-git/go-git/v5/plumbing"
|
|
"github.com/go-git/go-git/v5/storage/memory"
|
|
)
|
|
|
|
var log = logging.MustGetLogger("cursorius-server")
|
|
|
|
type branch struct {
|
|
ref string
|
|
commitHash string
|
|
}
|
|
|
|
type tag struct {
|
|
ref string
|
|
commitHash string
|
|
}
|
|
|
|
func pollJob(repoName string, jobConfig config.Job, pipelineConf config.PipelineConf) {
|
|
prevCommits := make(map[string]string)
|
|
for {
|
|
time.Sleep(time.Duration(jobConfig.PollInterval) * time.Second)
|
|
log.Infof("Polling repo %v", repoName)
|
|
|
|
repo, err := git.Clone(memory.NewStorage(), nil, &git.CloneOptions{
|
|
URL: jobConfig.URL,
|
|
})
|
|
if err != nil {
|
|
log.Errorf("Could not clone repo %v from url %v: %v", repoName, jobConfig.URL, err)
|
|
continue
|
|
}
|
|
refsToRunFor := []string{}
|
|
|
|
// get branches
|
|
branches, err := repo.Branches()
|
|
if err != nil {
|
|
log.Errorf("Could not enumerate branches in repo %v: %v", repoName, err)
|
|
continue
|
|
}
|
|
|
|
branches.ForEach(func(branch *plumbing.Reference) error {
|
|
log.Debugf("Processing branch %v from repo %v", branch.Name().String(), repoName)
|
|
prevRef, ok := prevCommits[branch.Name().String()]
|
|
if ok {
|
|
if branch.Hash().String() != prevRef {
|
|
log.Debugf("Queuing job for branch %v in repo %v with hash %v", branch.Name().String(), repoName, branch.Hash().String())
|
|
prevCommits[branch.Name().String()] = branch.Hash().String()
|
|
refsToRunFor = append(refsToRunFor, branch.Name().String())
|
|
} else {
|
|
log.Debugf("Branch %v in repo %v has hash %v, which matches the previously seen hash of %v", branch.Name().String(), repoName, branch.Hash().String(), prevRef)
|
|
}
|
|
} else {
|
|
log.Debugf("Queuing job for newly discovered branch %v in repo %v with hash %v", branch.Name().String(), repoName, branch.Hash().String())
|
|
prevCommits[branch.Name().String()] = branch.Hash().String()
|
|
refsToRunFor = append(refsToRunFor, branch.Name().String())
|
|
}
|
|
return nil
|
|
})
|
|
|
|
tags, err := repo.Tags()
|
|
if err != nil {
|
|
log.Errorf("Could not enumerate tags in repo %v: %v", repoName, err)
|
|
continue
|
|
}
|
|
tags.ForEach(func(tag *plumbing.Reference) error {
|
|
log.Debugf("Processing tag %v from repo %v", tag.Name().String(), repoName)
|
|
prevRef, ok := prevCommits[tag.Name().String()]
|
|
if ok {
|
|
if tag.Hash().String() != prevRef {
|
|
log.Debugf("Queuing job for tag %v in repo %v with hash %v", tag.Name().String(), repoName, tag.Hash().String())
|
|
prevCommits[tag.Name().String()] = tag.Hash().String()
|
|
refsToRunFor = append(refsToRunFor, tag.Name().String())
|
|
} else {
|
|
log.Debugf("Tag %v in repo %v has hash %v, which matches the previously seen hash of %v", tag.Name().String(), repoName, tag.Hash().String(), prevRef)
|
|
}
|
|
} else {
|
|
log.Debugf("Queuing job for newly discovered tag %v in repo %v with hash %v", tag.Name().String(), repoName, tag.Hash().String())
|
|
prevCommits[tag.Name().String()] = tag.Hash().String()
|
|
refsToRunFor = append(refsToRunFor, tag.Name().String())
|
|
}
|
|
return nil
|
|
})
|
|
|
|
for _, ref := range refsToRunFor {
|
|
log.Debugf("Dispatching job for ref %v in repo %v", ref, repoName)
|
|
|
|
pe := pipeline_executor.PipelineExecution{
|
|
Name: repoName,
|
|
Job: jobConfig,
|
|
Ref: ref,
|
|
}
|
|
|
|
pipeline_executor.ExecutePipeline(pe, pipelineConf)
|
|
}
|
|
}
|
|
}
|
|
|
|
func StartPolling(conf config.Config) {
|
|
for jobName, job := range conf.Jobs {
|
|
if job.PollInterval == 0 {
|
|
continue
|
|
} else {
|
|
go pollJob(jobName, job, conf.PipelineConf)
|
|
}
|
|
}
|
|
}
|