Files
server/pipeline_executor/pipeline_executor.go
T

289 lines
7.6 KiB
Go

package pipeline_executor
import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
"strings"
"github.com/jhoonb/archivex"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing/transport"
"github.com/go-git/go-git/v5/plumbing/transport/http"
"github.com/go-git/go-git/v5/plumbing/transport/ssh"
"github.com/op/go-logging"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
)
var log = logging.MustGetLogger("cursorius-server")
type PipelineExecution struct {
Pipeline database.Pipeline
Ref string
Run database.Run
}
func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf config.PipelineConf) {
idStr := pe.Pipeline.Id.String()
jobFolder := filepath.Join(pipelineConf.WorkingDir, pe.Pipeline.Id.String(), pe.Run.Id.String())
cloneFolder := filepath.Join(jobFolder, "repo")
log.Debugf("%v: URL: %v", idStr, pe.Pipeline.Url)
log.Debugf("%v: Folder: %v", idStr, jobFolder)
err := os.RemoveAll(jobFolder)
if err != nil {
log.Errorf("%v: could not delete existing folder %v", idStr, jobFolder)
return
}
err = os.MkdirAll(cloneFolder, 0755)
if err != nil {
log.Errorf("%v: could not create working directory: %w", idStr, pe.Pipeline.Name, err)
return
}
log.Infof("%v: cloning source from URL %v", idStr, pe.Pipeline.Url)
var auth transport.AuthMethod
if pe.Pipeline.CloneCredential != nil {
credential, err := db.GetCloneCredentialById(*pe.Pipeline.CloneCredential)
if err != nil {
log.Errorf("%v: could not get credenital from db: %v", idStr, err)
return
}
switch credential.Type {
case "USER_PASS":
log.Debugf("%v: credential %v configured", idStr, credential.Name)
auth = transport.AuthMethod(&http.BasicAuth{
Username: credential.Username,
Password: credential.Secret,
})
case "SSH_KEY":
publicKeys, err := ssh.NewPublicKeys(credential.Username, []byte(credential.Secret), "")
if err != nil {
log.Errorf("%v: could not parse credential %v: %v", idStr, credential.Name, err)
return
}
auth = transport.AuthMethod(publicKeys)
default:
log.Errorf("%v: unsupported credential type %v", idStr, credential.Type)
return
}
} else {
auth = nil
}
_, err = git.PlainClone(cloneFolder, false, &git.CloneOptions{
URL: pe.Pipeline.Url,
Auth: auth,
})
if err != nil {
log.Errorf("%v: could not clone repo: %v", idStr, err)
return
}
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
log.Errorf("%v: could not create docker client: %w", idStr, err)
return
}
log.Infof("%v: source cloned successfully", idStr)
ctx := context.Background()
log.Debugf("%v: tarring up job source", idStr)
log.Debugf("%v: creating tarfile", idStr)
tarFile := filepath.Join(jobFolder, "archive.tar")
tar := new(archivex.TarFile)
err = tar.Create(tarFile)
if err != nil {
log.Errorf("%v: could not create tarfile: %w", idStr, err)
return
}
log.Debugf("%v: adding files to tarfile", idStr)
err = tar.AddAll(cloneFolder, false)
if err != nil {
log.Errorf("%v: could not add repo to tarfile: %w", idStr, err)
return
}
log.Debugf("%v: saving tarfile tarfile", idStr)
err = tar.Close()
if err != nil {
log.Errorf("%v: could not close tarfile: %w", idStr, err)
return
}
log.Debugf("%v: job source tarred", idStr)
log.Infof("%v: building container", idStr)
dockerBuildContext, err := os.Open(tarFile)
defer dockerBuildContext.Close()
imageName := fmt.Sprintf("%v-%v:latest", pe.Pipeline.Id.String(), pe.Run.Id.String())
buildResponse, err := cli.ImageBuild(context.Background(), dockerBuildContext, types.ImageBuildOptions{
Tags: []string{imageName},
Dockerfile: ".cursorius/Dockerfile",
})
if err != nil {
log.Errorf("%v: could not build container: %w", idStr, err)
return
}
log.Debugf("%v: reading build output from docker daemon", idStr)
err = db.UpdateRunBuildOutput(pe.Run.Id, cleanupBuildOutput(buildResponse.Body))
if err != nil {
log.Errorf("%v: could not update build output for run: %w", idStr, err)
return
}
log.Debugf("%v: build output read from docker daemon", idStr)
err = buildResponse.Body.Close()
if err != nil {
log.Errorf("%v: could not close build response body: %w", idStr, err)
return
}
log.Debugf("%v: build response closed", idStr)
log.Infof("%v: image built sucessfully", idStr)
hostConfig := container.HostConfig{}
if pipelineConf.DockerNetwork != nil {
hostConfig.NetworkMode = container.NetworkMode(*pipelineConf.DockerNetwork)
}
if pipelineConf.MountConf.Type == config.Bind {
sourceDir := filepath.Join(pipelineConf.MountConf.Source, pe.Pipeline.Id.String(), pe.Run.Id.String())
hostConfig.Mounts = append(hostConfig.Mounts,
mount.Mount{
Type: mount.TypeBind,
Source: sourceDir,
Target: "/cursorius/src",
ReadOnly: false,
Consistency: mount.ConsistencyDefault,
},
)
} else if pipelineConf.MountConf.Type == config.Volume {
hostConfig.Mounts = append(hostConfig.Mounts,
mount.Mount{
Type: mount.TypeVolume,
Source: pipelineConf.MountConf.Source,
Target: "/cursorius/src",
ReadOnly: false,
Consistency: mount.ConsistencyDefault,
},
)
}
env := make([]string, 0)
// set cursorius environment variables
env = append(env, []string{
fmt.Sprintf("CURSORIUS_RUN_ID=%v", pe.Run.Id),
"CURSORIUS_SRC_DIR=/cursorius/src",
fmt.Sprintf("CURSORIUS_SERVER_URL=%v", pipelineConf.AccessURL),
}...)
// load secrets into environment
secrets, err := db.GetSecretsForPipeline(pe.Pipeline.Id)
if err != nil {
log.Errorf("%v: could not get secrets for pipeline", idStr, err)
return
}
for _, secret := range secrets {
// the env name is validated to be just uppercase letters, numbers, and underscores on ingestion
env = append(env, fmt.Sprintf("%v=%v", strings.ToUpper(secret.Name), secret.Secret))
}
log.Debugf("%v: creating container", idStr)
resp, err := cli.ContainerCreate(ctx,
&container.Config{
Image: imageName,
Tty: false,
Env: env,
},
// TODO: fix running the runner in docker (add VolumesFrom to HostConfig)
&hostConfig,
nil, nil, "",
)
if err != nil {
log.Errorf("%v: could not create container: %w", idStr, err)
return
}
log.Info("%v: starting container", idStr)
if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
log.Errorf("%v: could not start container: %v", idStr, err)
return
}
log.Debugf("%v: container started", idStr)
log.Debugf("%v: waiting on container", idStr)
statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
select {
case err := <-errCh:
if err != nil {
log.Errorf("%v: container returned error: %v", idStr, err)
return
}
case okBody := <-statusCh:
if okBody.Error != nil {
log.Errorf("%v: could not wait on container: %v", idStr, err)
return
} else {
log.Debugf("%v: container finished running with return code: %v", idStr, okBody.StatusCode)
pe.Run.Result = &okBody.StatusCode
}
}
pe.Run.InProgress = false
log.Debugf("%v: getting container logs", idStr)
out, err := cli.ContainerLogs(ctx, resp.ID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true})
if err != nil {
log.Errorf("%v: could not get container logs: %w", idStr, err)
return
}
log.Debugf("%v: gotcontainer logs", idStr)
var stdOut bytes.Buffer
var stdErr bytes.Buffer
stdcopy.StdCopy(&stdOut, &stdErr, out)
pe.Run.Stdout = stdOut.Bytes()
pe.Run.Stderr = stdErr.Bytes()
db.UpdateRunResult(pe.Run)
return
}