ms-runner/internal/runner/delivery/rabbitmq/pubsub.go

141 lines
2.9 KiB
Go
Raw Permalink Normal View History

2024-11-01 18:22:43 +00:00
package rabbitmq
import (
"fmt"
"git.sch9.ru/new_gate/ms-runner/internal/runner"
"git.sch9.ru/new_gate/ms-runner/internal/runner/usecase"
runnerv1 "git.sch9.ru/new_gate/ms-runner/pkg/go/gen/proto/runner/v1"
"github.com/golang/protobuf/proto"
amqp "github.com/rabbitmq/amqp091-go"
)
func NewRunnerBuilderConsumer(ch *amqp.Channel, runnerUC runner.RunnerUseCase, queueName string, instanceName string) {
msgs, err := ch.Consume(
queueName,
instanceName,
false,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
go func() {
for {
runnerUC.Lock()
if !runnerUC.Ready() {
continue
}
runnerUC.Unlock()
select {
case d := <-msgs:
var msg runnerv1.Instruction
err = proto.Unmarshal(d.Body, &msg)
if err != nil {
panic(err)
}
switch msg.Instruction.(type) {
case *runnerv1.Instruction_Build:
err = runnerUC.Process(usecase.BuildInstruction{
SolutionId: msg.GetBuild().GetSolutionId(),
BindingKey: msg.GetBuild().GetBindingKey(),
Language: msg.GetBuild().GetLanguage(),
Solution: msg.GetBuild().GetSolution(),
})
if err != nil {
fmt.Println(err)
} else {
err = d.Ack(false)
if err != nil {
panic(err)
}
}
case *runnerv1.Instruction_Run:
err = runnerUC.Process(usecase.RunInstruction{
SolutionId: msg.GetRun().GetSolutionId(),
TestId: msg.GetRun().GetTestId(),
BindingKey: msg.GetRun().GetBindingKey(),
})
if err != nil {
fmt.Println(err)
} else {
err = d.Ack(false)
if err != nil {
panic(err)
}
}
default:
fmt.Println("unknown instruction: ", msg.Instruction)
}
}
}
}()
}
func NewRunnerBuilderProducer(ch *amqp.Channel, runnerUC runner.RunnerUseCase) {
go func() {
for res := range runnerUC.Results() {
var (
body []byte
err error
bindingKey string
)
switch result := res.(type) {
case usecase.BuildResult:
msg := &runnerv1.Result{
Result: &runnerv1.Result_Build{
Build: &runnerv1.BuildResult{
SolutionId: result.SolutionId,
Status: result.Status,
},
},
}
body, err = proto.Marshal(msg)
if err != nil {
panic(err)
}
bindingKey = result.BindingKey
case usecase.RunResult:
msg := &runnerv1.Result{
Result: &runnerv1.Result_Run{
Run: &runnerv1.RunResult{
SolutionId: result.SolutionId,
TestId: result.TestId,
Status: result.Status,
},
},
}
body, err = proto.Marshal(msg)
if err != nil {
panic(err)
}
bindingKey = result.BindingKey
default:
fmt.Println("unknown result: ", result)
continue
}
fmt.Println("publishing: ", bindingKey)
err = ch.Publish(
"",
bindingKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: body,
})
if err != nil {
panic(err)
}
}
}()
}