112 lines
2.7 KiB
Go
112 lines
2.7 KiB
Go
package runnermanager
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/google/uuid"
|
|
"google.golang.org/protobuf/proto"
|
|
"google.golang.org/protobuf/reflect/protoreflect"
|
|
"nhooyr.io/websocket"
|
|
|
|
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
|
|
)
|
|
|
|
type RunnerData struct {
|
|
msgType websocket.MessageType
|
|
data []byte
|
|
}
|
|
|
|
type Runner struct {
|
|
id uuid.UUID
|
|
tags []string
|
|
conn *websocket.Conn
|
|
receiveChan chan []byte
|
|
}
|
|
|
|
func (r *Runner) HasTags(requestedTags []string) bool {
|
|
tagIter:
|
|
for _, requestedTag := range requestedTags {
|
|
for _, posessedTag := range r.tags {
|
|
// if we find the tag, move on to search for the next one
|
|
if posessedTag == requestedTag {
|
|
continue tagIter
|
|
}
|
|
}
|
|
// if we don't find the tag
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (r *Runner) Id() uuid.UUID {
|
|
return r.id
|
|
}
|
|
|
|
func (r *Runner) RunCommand(cmd string, args []string) (returnCode int64, stdout string, stderr string, err error) {
|
|
|
|
if r.conn == nil {
|
|
return 0, "", "", fmt.Errorf("runner with id %v has nil conn, THIS IS A BUG", r.id)
|
|
}
|
|
|
|
// Write RunCommand message to client
|
|
serverToRunnerMsg := &runner_api.ServerToRunnerMsg{
|
|
Msg: &runner_api.ServerToRunnerMsg_RunCommandMsg{
|
|
RunCommandMsg: &runner_api.RunCommand{
|
|
Command: cmd,
|
|
Args: args,
|
|
},
|
|
},
|
|
}
|
|
|
|
err = r.sendProtoStruct(serverToRunnerMsg)
|
|
if err != nil {
|
|
err = fmt.Errorf("Could not send command to client: %w", err)
|
|
return
|
|
}
|
|
|
|
for {
|
|
// Read RunCommandFinalResponse message from client
|
|
data, ok := <-r.receiveChan
|
|
if !ok {
|
|
err = fmt.Errorf("Channel is closed on runner")
|
|
return
|
|
}
|
|
|
|
runnerToServerMsg := &runner_api.RunnerToServerMsg{}
|
|
if err = proto.Unmarshal(data, runnerToServerMsg); err != nil {
|
|
err = fmt.Errorf("Could not parse RunCommand response: %w", err)
|
|
r.conn.Close(websocket.StatusUnsupportedData, "Invalid message")
|
|
return
|
|
}
|
|
|
|
switch x := runnerToServerMsg.Msg.(type) {
|
|
case *runner_api.RunnerToServerMsg_RunCommandPartialResponseMsg:
|
|
stdout += x.RunCommandPartialResponseMsg.Stdout
|
|
stderr += x.RunCommandPartialResponseMsg.Stderr
|
|
case *runner_api.RunnerToServerMsg_RunCommandFinalResponseMsg:
|
|
stdout += x.RunCommandFinalResponseMsg.PartialResponse.Stdout
|
|
stderr += x.RunCommandFinalResponseMsg.PartialResponse.Stderr
|
|
returnCode = x.RunCommandFinalResponseMsg.ReturnCode
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Runner) sendProtoStruct(p protoreflect.ProtoMessage) error {
|
|
protoOut, err := proto.Marshal(p)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not marshal proto: %w", err)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
|
|
log.Debugf("r.conn: %p", r.conn)
|
|
|
|
if err := r.conn.Write(ctx, websocket.MessageBinary, protoOut); err != nil {
|
|
return fmt.Errorf("Could not send proto to websocket: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|