diff --git a/.gitignore b/.gitignore index 95b5004..b35a26e 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ *.yml run job +runner diff --git a/go.mod b/go.mod index b4e21bb..dfaa094 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,11 @@ module git.ohea.xyz/cursorius/runner go 1.19 require ( + git.ohea.xyz/cursorius/runner-api/go/api/v2 v2.0.0-20230109074922-e20285fe6cf2 git.ohea.xyz/golang/config v0.0.0-20220915224621-b9debd233173 github.com/docker/docker v20.10.18+incompatible github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 + google.golang.org/protobuf v1.28.1 nhooyr.io/websocket v1.8.7 ) diff --git a/go.sum b/go.sum index 21ea76c..591904e 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +git.ohea.xyz/cursorius/runner-api/go/api/v2 v2.0.0-20230109074922-e20285fe6cf2 h1:G1XQEqhj1LZPQbH7avzvT7QL9Wfbb4CXMm0nLL39eDc= +git.ohea.xyz/cursorius/runner-api/go/api/v2 v2.0.0-20230109074922-e20285fe6cf2/go.mod h1:F9y5Ck4Wchsaj5amSX2eDRUlQ/iYP1VNLFduvjNwmLc= git.ohea.xyz/golang/config v0.0.0-20220915224621-b9debd233173 h1:dhq/W6sa5KkLHVBwwgcNIPWcO4YK2/ecFTTln2W+1n8= git.ohea.xyz/golang/config v0.0.0-20220915224621-b9debd233173/go.mod h1:86PbXJ2WdqQ+3hYqrnv3ukgKNRK9nQfThnlY03FAO0g= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= @@ -39,8 +41,9 @@ github.com/gobwas/ws v1.1.0/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= -github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= @@ -142,6 +145,9 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= diff --git a/main.go b/main.go index ed9333b..bbb380f 100644 --- a/main.go +++ b/main.go @@ -3,13 +3,19 @@ package main import ( "context" "fmt" - "git.ohea.xyz/cursorius/runner/config" - "git.ohea.xyz/cursorius/runner/jobrunner" - "github.com/op/go-logging" - "nhooyr.io/websocket" - "nhooyr.io/websocket/wsjson" + "io" "os" + "os/exec" "time" + + runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2" + "git.ohea.xyz/cursorius/runner/config" + + //"git.ohea.xyz/cursorius/runner/jobrunner" + "github.com/op/go-logging" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" + "nhooyr.io/websocket" ) var log = logging.MustGetLogger("cursorius-server") @@ -45,16 +51,9 @@ func main() { os.Exit(0) } - registrationData := Register{ - Secret: configData.Config.Secret, - Id: configData.Config.Id, - Tags: configData.Config.Tags, - } - connectString := fmt.Sprintf("ws://%v/runner", configData.Config.ServerUrl) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() conn, _, err := websocket.Dial(ctx, connectString, nil) if err != nil { @@ -63,24 +62,129 @@ func main() { } defer conn.Close(websocket.StatusInternalError, "the sky is falling") - err = wsjson.Write(ctx, conn, registrationData) + cancel() + + ctx = context.Background() + + registrationProto := &runner_api.Register{} + registrationProto.Secret = configData.Config.Secret + registrationProto.Id = configData.Config.Id + registrationProto.Tags = configData.Config.Tags + + err = sendProtoStruct(conn, registrationProto) if err != nil { - log.Fatalf("Could not send registration information to server: %v", err) + log.Fatalf("Could not send registration proto: %v", err) os.Exit(1) } for { - var job jobrunner.Job - err = wsjson.Read(ctx, conn, &job) + typ, r, err := conn.Read(ctx) if err != nil { - log.Fatalf("Could not receive from connection: %v", err) - continue + log.Errorf("Could not read from runner websocket connection: %v", err) + log.Errorf("Disconnecting...") + return + } + if typ != websocket.MessageBinary { + log.Error("Got non binary message from runner, disconnecting...") + conn.Close(websocket.StatusUnsupportedData, "Requires binary data") + return + } + serverToRunnerMsg := &runner_api.ServerToRunnerMsg{} + if err := proto.Unmarshal(r, serverToRunnerMsg); err != nil { + log.Error("Could not parse registration message from runner, disconnection....") + conn.Close(websocket.StatusUnsupportedData, "Invalid message") + return } - log.Infof("Received job from server: id: %v, URL: %v", job.Id, job.URL) - err = jobrunner.RunJob(job, configData.Config.WorkingDir) - if err != nil { - log.Errorf("Could not run job: %v", err) + switch x := serverToRunnerMsg.Msg.(type) { + case *runner_api.ServerToRunnerMsg_RunCommandMsg: + log.Debugf("Server asked to run the command %v", x.RunCommandMsg.Command) + returnCode, stdout, stderr, err := RunCommand(x.RunCommandMsg.Command, x.RunCommandMsg.Args) + if err != nil { + log.Errorf("Could not run command: %v", err) + os.Exit(1) + } + + runnerToServerMsg := &runner_api.RunnerToServerMsg{ + Msg: &runner_api.RunnerToServerMsg_RunCommandFinalResponseMsg{ + RunCommandFinalResponseMsg: &runner_api.RunCommandFinalResponse{ + ReturnCode: returnCode, + PartialResponse: &runner_api.RunCommandPartialResponse{ + Stdout: stdout, + Stderr: stderr, + }, + }, + }, + } + + err = sendProtoStruct(conn, runnerToServerMsg) + if err != nil { + log.Errorf("Could not send results to server: %v", err) + os.Exit(1) + } + } } } + +func sendProtoStruct(conn *websocket.Conn, p protoreflect.ProtoMessage) error { + protoOut, err := proto.Marshal(p) + if err != nil { + return fmt.Errorf("Could not marshal proto: %w", err) + } + + ctx := context.Background() + + if err := conn.Write(ctx, websocket.MessageBinary, protoOut); err != nil { + return fmt.Errorf("Could not send proto to websocket: %w", err) + } + + return nil +} + +func RunCommand(command string, args []string) (returnCode int64, stdout string, stderr string, err error) { + cmd := exec.Command(command, args...) + stdoutPipe, err := cmd.StdoutPipe() + if err != nil { + err = fmt.Errorf("Could not get stdout pipe: %w", err) + return + } + + stderrPipe, err := cmd.StderrPipe() + if err != nil { + err = fmt.Errorf("Could not get stderr pipe: %w", err) + return + } + + log.Infof("Running command: \"%v\"", command) + if err = cmd.Start(); err != nil { + err = fmt.Errorf("Could not start command: %w", err) + return + } + + stdoutBytes, err := io.ReadAll(stdoutPipe) + if err != nil { + err = fmt.Errorf("Could not read stdout: %w", err) + return + } + stdout = string(stdoutBytes) + + stderrBytes, err := io.ReadAll(stderrPipe) + if err != nil { + err = fmt.Errorf("Could not read stderr: %w", err) + return + } + stderr = string(stderrBytes) + + if err = cmd.Wait(); err != nil { + switch x := err.(type) { + case *exec.ExitError: + returnCode = int64(x.ExitCode()) + return + default: + err = fmt.Errorf("Could not wait for command: %w", err) + return + } + } + return +}