This commit is contained in:
Vyacheslav1557 2024-11-01 23:22:43 +05:00
parent 568ccea09a
commit c2076338fa
13 changed files with 509 additions and 166 deletions

View file

@ -0,0 +1,57 @@
package rabbitmq
import (
"fmt"
"git.sch9.ru/new_gate/ms-tester/internal/problems"
"github.com/golang/protobuf/proto"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
func NewNotificationSubscriber(ch *amqp.Channel, queueName string, instanceName string, problemUC problems.ProblemUseCase) {
_, err := ch.QueueDeclare(
queueName,
true,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
msgs, err := ch.Consume(
queueName,
instanceName,
false,
false,
false,
false,
nil,
)
if err != nil {
panic(err) // FIXME
}
go func() {
for d := range msgs {
err = d.Ack(false)
if err != nil {
panic(err) // FIXME
}
msg := filer_pb.EventNotification{}
err = proto.Unmarshal(d.Body, &msg)
if err != nil {
panic(err) // FIXME
}
fmt.Println(msg.String()) // TODO: instead, call appropriate problemUC handler
}
}()
return
}

View file

@ -1 +1,10 @@
package tester
import (
testerv1 "git.sch9.ru/new_gate/ms-tester/pkg/go/gen/proto/tester/v1"
"google.golang.org/grpc"
)
type Handlers interface {
CreateSolution(*testerv1.CreateSolutionRequest, grpc.ServerStreamingServer[testerv1.TestingState]) error
}

View file

@ -1 +1,41 @@
package grpc
import (
"git.sch9.ru/new_gate/ms-tester/internal/tester"
testerv1 "git.sch9.ru/new_gate/ms-tester/pkg/go/gen/proto/tester/v1"
"google.golang.org/grpc"
)
func NewTesterHandlers(gserver *grpc.Server, testerUC tester.TesterUseCase) {
handlers := &testerHandlers{
testerUC: testerUC,
}
testerv1.RegisterTesterServiceServer(gserver, handlers)
}
type testerHandlers struct {
testerv1.UnimplementedTesterServiceServer
testerUC tester.TesterUseCase
}
func (h *testerHandlers) CreateSolution(req *testerv1.CreateSolutionRequest, stream testerv1.TesterService_CreateSolutionServer) error {
id, err := h.testerUC.CreateSolution(stream.Context(), req.GetTaskId(), req.GetSolution(), req.GetLanguage())
if err != nil {
return err
}
ch, err := h.testerUC.ProcessTesting(stream.Context(), id)
if err != nil {
return err
}
for state := range ch {
err = stream.Send(&testerv1.TestingState{Msg: state})
if err != nil {
return err
}
}
return nil
}

View file

@ -0,0 +1,107 @@
package rabbitmq
import (
"context"
"fmt"
"git.sch9.ru/new_gate/ms-tester/internal/tester"
runnerv1 "git.sch9.ru/new_gate/ms-tester/pkg/go/gen/proto/runner/v1"
"github.com/golang/protobuf/proto"
amqp "github.com/rabbitmq/amqp091-go"
)
func NewTesterProducer(ch *amqp.Channel, tQueueName string, rQueueName string, testerUC tester.TesterUseCase) {
_, err := ch.QueueDeclare(
tQueueName,
true,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
go func() {
for d := range testerUC.TestingChannel() {
for i := 0; i < 15; i++ {
msg := runnerv1.Instruction{
Instruction: &runnerv1.Instruction_Run{
Run: &runnerv1.Run{
SolutionId: d,
TestId: 0,
BindingKey: rQueueName,
},
}}
body, err := proto.Marshal(&msg)
if err != nil {
panic(err)
}
err = ch.Publish(
"",
tQueueName,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: body,
},
)
if err != nil {
panic(err)
}
}
}
}()
return
}
func NewTesterConsumer(ch *amqp.Channel, rQueueName string, instanceName string, testerUC tester.TesterUseCase) {
_, err := ch.QueueDeclare(
rQueueName,
true,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
msgs, err := ch.Consume(
rQueueName,
"",
false,
true, // each tester must have exclusive results queue
false,
false,
nil,
)
if err != nil {
panic(err)
}
go func() {
for d := range msgs {
err = d.Ack(false)
if err != nil {
panic(err)
}
msg := runnerv1.Result{}
err = proto.Unmarshal(d.Body, &msg)
if err != nil {
panic(err)
}
fmt.Println(msg.String())
err = testerUC.ProcessResult(context.Background(), msg.GetRun().SolutionId, msg.GetRun().String())
if err != nil {
fmt.Println(err)
}
}
}()
}

View file

@ -1 +1,4 @@
package tester
type TesterRepository interface {
}

View file

@ -1 +1,12 @@
package tester
import (
"context"
)
type TesterUseCase interface {
CreateSolution(ctx context.Context, taskId int32, solution string, language int32) (int32, error)
ProcessTesting(ctx context.Context, solutionId int32) (<-chan string, error)
ProcessResult(ctx context.Context, solutionId int32, result string) error
TestingChannel() chan int32
}

View file

@ -1 +1,107 @@
package usecase
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
)
const (
MaxSimultaneousTestingProcesses = 5000
MaxMessagesPerSolution = 500
)
type TesterUseCase struct {
publicChannels sync.Map
privateChannels sync.Map
testingChannel chan int32
}
func NewTesterUseCase() *TesterUseCase {
return &TesterUseCase{
testingChannel: make(chan int32, MaxSimultaneousTestingProcesses),
}
}
func (u *TesterUseCase) CreateSolution(ctx context.Context, taskId int32, solution string, language int32) (int32, error) {
return rand.Int31(), nil
}
func (u *TesterUseCase) ProcessTesting(ctx context.Context, solutionId int32) (<-chan string, error) {
u.testingChannel <- solutionId
publicChannel := u.newPublicChannel(solutionId)
go func() {
privateChannel := u.newPrivateChannel(solutionId)
defer func() {
err := u.closeAndDeletePrivateChannel(solutionId)
if err != nil {
panic(err)
}
err = u.closeAndDeletePublicChannel(solutionId)
if err != nil {
panic(err)
}
}()
c := 0
for res := range privateChannel {
c += 1
publicChannel <- res
if c == 15 {
fmt.Println("finished")
break
}
}
}()
return publicChannel, nil
}
func (u *TesterUseCase) ProcessResult(ctx context.Context, solutionId int32, result string) error {
ch, ok := u.privateChannels.Load(solutionId)
if !ok {
return errors.New("")
}
ch.(chan string) <- result
return nil
}
func (u *TesterUseCase) TestingChannel() chan int32 {
return u.testingChannel
}
func (u *TesterUseCase) newPublicChannel(solutionId int32) chan string {
userCh := make(chan string, MaxMessagesPerSolution)
u.publicChannels.Store(solutionId, userCh)
return userCh
}
func (u *TesterUseCase) newPrivateChannel(solutionId int32) chan string {
userCh := make(chan string, MaxMessagesPerSolution)
u.privateChannels.Store(solutionId, userCh)
return userCh
}
func (u *TesterUseCase) closeAndDeletePublicChannel(solutionId int32) error {
ch, ok := u.publicChannels.Load(solutionId)
if !ok {
return errors.New("")
}
close(ch.(chan string))
u.publicChannels.Delete(solutionId)
return nil
}
func (u *TesterUseCase) closeAndDeletePrivateChannel(solutionId int32) error {
ch, ok := u.privateChannels.Load(solutionId)
if !ok {
return errors.New("")
}
close(ch.(chan string))
u.privateChannels.Delete(solutionId)
return nil
}