package pipeline_executor import ( "bytes" "context" "fmt" "os" "path/filepath" "strings" "github.com/jhoonb/archivex" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/mount" "github.com/docker/docker/client" "github.com/docker/docker/pkg/stdcopy" "github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5/plumbing/transport" "github.com/go-git/go-git/v5/plumbing/transport/http" "github.com/go-git/go-git/v5/plumbing/transport/ssh" "github.com/op/go-logging" "git.ohea.xyz/cursorius/server/config" "git.ohea.xyz/cursorius/server/database" ) var log = logging.MustGetLogger("cursorius-server") type PipelineExecution struct { Pipeline database.Pipeline Ref string Run database.Run } func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf config.PipelineConf) { idStr := pe.Pipeline.Id.String() jobFolder := filepath.Join(pipelineConf.WorkingDir, pe.Pipeline.Id.String(), pe.Run.Id.String()) cloneFolder := filepath.Join(jobFolder, "repo") log.Debugf("%v: URL: %v", idStr, pe.Pipeline.Url) log.Debugf("%v: Folder: %v", idStr, jobFolder) err := os.RemoveAll(jobFolder) if err != nil { log.Errorf("%v: could not delete existing folder %v", idStr, jobFolder) return } err = os.MkdirAll(cloneFolder, 0755) if err != nil { log.Errorf("%v: could not create working directory: %w", idStr, pe.Pipeline.Name, err) return } log.Infof("%v: cloning source from URL %v", idStr, pe.Pipeline.Url) var auth transport.AuthMethod if pe.Pipeline.CloneCredential != nil { credential, err := db.GetCloneCredentialById(*pe.Pipeline.CloneCredential) if err != nil { log.Errorf("%v: could not get credenital from db: %v", idStr, err) return } switch credential.Type { case "USER_PASS": log.Debugf("%v: credential %v configured", idStr, credential.Name) auth = transport.AuthMethod(&http.BasicAuth{ Username: credential.Username, Password: credential.Secret, }) case "SSH_KEY": publicKeys, err := ssh.NewPublicKeys(credential.Username, []byte(credential.Secret), "") if err != nil { log.Errorf("%v: could not parse credential %v: %v", idStr, credential.Name, err) return } auth = transport.AuthMethod(publicKeys) default: log.Errorf("%v: unsupported credential type %v", idStr, credential.Type) return } } else { auth = nil } _, err = git.PlainClone(cloneFolder, false, &git.CloneOptions{ URL: pe.Pipeline.Url, Auth: auth, }) if err != nil { log.Errorf("%v: could not clone repo: %v", idStr, err) return } cli, err := client.NewClientWithOpts(client.FromEnv) if err != nil { log.Errorf("%v: could not create docker client: %w", idStr, err) return } log.Infof("%v: source cloned successfully", idStr) ctx := context.Background() log.Debugf("%v: tarring up job source", idStr) log.Debugf("%v: creating tarfile", idStr) tarFile := filepath.Join(jobFolder, "archive.tar") tar := new(archivex.TarFile) err = tar.Create(tarFile) if err != nil { log.Errorf("%v: could not create tarfile: %w", idStr, err) return } log.Debugf("%v: adding files to tarfile", idStr) err = tar.AddAll(cloneFolder, false) if err != nil { log.Errorf("%v: could not add repo to tarfile: %w", idStr, err) return } log.Debugf("%v: saving tarfile tarfile", idStr) err = tar.Close() if err != nil { log.Errorf("%v: could not close tarfile: %w", idStr, err) return } log.Debugf("%v: job source tarred", idStr) log.Infof("%v: building container", idStr) dockerBuildContext, err := os.Open(tarFile) defer dockerBuildContext.Close() imageName := fmt.Sprintf("%v-%v:latest", pe.Pipeline.Id.String(), pe.Run.Id.String()) buildResponse, err := cli.ImageBuild(context.Background(), dockerBuildContext, types.ImageBuildOptions{ Tags: []string{imageName}, Dockerfile: ".cursorius/Dockerfile", }) if err != nil { log.Errorf("%v: could not build container: %w", idStr, err) return } log.Debugf("%v: reading build output from docker daemon", idStr) err = db.UpdateRunBuildOutput(pe.Run.Id, cleanupBuildOutput(buildResponse.Body)) if err != nil { log.Errorf("%v: could not update build output for run: %w", idStr, err) return } log.Debugf("%v: build output read from docker daemon", idStr) err = buildResponse.Body.Close() if err != nil { log.Errorf("%v: could not close build response body: %w", idStr, err) return } log.Debugf("%v: build response closed", idStr) log.Infof("%v: image built sucessfully", idStr) hostConfig := container.HostConfig{} if pipelineConf.DockerNetwork != nil { hostConfig.NetworkMode = container.NetworkMode(*pipelineConf.DockerNetwork) } if pipelineConf.MountConf.Type == config.Bind { sourceDir := filepath.Join(pipelineConf.MountConf.Source, pe.Pipeline.Id.String(), pe.Run.Id.String()) hostConfig.Mounts = append(hostConfig.Mounts, mount.Mount{ Type: mount.TypeBind, Source: sourceDir, Target: "/cursorius/src", ReadOnly: false, Consistency: mount.ConsistencyDefault, }, ) } else if pipelineConf.MountConf.Type == config.Volume { hostConfig.Mounts = append(hostConfig.Mounts, mount.Mount{ Type: mount.TypeVolume, Source: pipelineConf.MountConf.Source, Target: "/cursorius/src", ReadOnly: false, Consistency: mount.ConsistencyDefault, }, ) } env := make([]string, 0) // set cursorius environment variables env = append(env, []string{ fmt.Sprintf("CURSORIUS_RUN_ID=%v", pe.Run.Id), "CURSORIUS_SRC_DIR=/cursorius/src", fmt.Sprintf("CURSORIUS_SERVER_URL=%v", pipelineConf.AccessURL), }...) // load secrets into environment secrets, err := db.GetSecretsForPipeline(pe.Pipeline.Id) if err != nil { log.Errorf("%v: could not get secrets for pipeline", idStr, err) return } for _, secret := range secrets { // the env name is validated to be just uppercase letters, numbers, and underscores on ingestion env = append(env, fmt.Sprintf("%v=%v", strings.ToUpper(secret.Name), secret.Secret)) } log.Debugf("%v: creating container", idStr) resp, err := cli.ContainerCreate(ctx, &container.Config{ Image: imageName, Tty: false, Env: env, }, // TODO: fix running the runner in docker (add VolumesFrom to HostConfig) &hostConfig, nil, nil, "", ) if err != nil { log.Errorf("%v: could not create container: %w", idStr, err) return } log.Info("%v: starting container", idStr) if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil { log.Errorf("%v: could not start container: %v", idStr, err) return } log.Debugf("%v: container started", idStr) log.Debugf("%v: waiting on container", idStr) statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning) select { case err := <-errCh: if err != nil { log.Errorf("%v: container returned error: %v", idStr, err) return } case okBody := <-statusCh: if okBody.Error != nil { log.Errorf("%v: could not wait on container: %v", idStr, err) return } else { log.Debugf("%v: container finished running with return code: %v", idStr, okBody.StatusCode) pe.Run.Result = &okBody.StatusCode } } pe.Run.InProgress = false log.Debugf("%v: getting container logs", idStr) out, err := cli.ContainerLogs(ctx, resp.ID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true}) if err != nil { log.Errorf("%v: could not get container logs: %w", idStr, err) return } log.Debugf("%v: gotcontainer logs", idStr) var stdOut bytes.Buffer var stdErr bytes.Buffer stdcopy.StdCopy(&stdOut, &stdErr, out) pe.Run.Stdout = stdOut.Bytes() pe.Run.Stderr = stdErr.Bytes() db.UpdateRunResult(pe.Run) return }