add tests, initial protobuf and rabbitmq support, ...

This commit is contained in:
dragonmuffin 2025-03-04 22:03:00 +05:00
parent 54adfcee2a
commit 7510c2a7b3
13 changed files with 421 additions and 39 deletions

BIN
transport/transport Executable file

Binary file not shown.

96
transport/transport.c Normal file
View file

@ -0,0 +1,96 @@
#include "transport/transport.h"
#include "starter/starter.h"
void prepare_amqp_connection(int argc, char const *const *argv,struct connection_data *condata) {
int status;
if(secure_getenv("AMQP_HOSTNAME")==NULL || secure_getenv("AMQP_HOSTNAME")==NULL || secure_getenv("AMQP_EXCHANGE")==NULL || secure_getenv("AMQP_BINDINGKEY")==NULL) {
fprintf(stderr,"no amqp connection parameters in environment");
exit(1);
}
condata->hostname = getenv("AMQP_HOSTNAME");
condata->port = atoi(getenv("AMQP_PORT"));
condata->exchange = getenv("AMQP_EXCHANGE");
condata->bindingkey = getenv("AMQP_BINDINGKEY");
condata->conn = amqp_new_connection();
condata->socket = amqp_tcp_socket_new(condata->conn);
if (!condata->socket) {
error_log("failed to create TCP socket");
}
status = amqp_socket_open(condata->socket, condata->hostname, condata->port);
if (status) {
error_log("failed to open TCP socket");
}
amqp_assert(amqp_login(condata->conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "rmuser", "rmpassword"));
amqp_channel_open(condata->conn, 1);
amqp_assert(amqp_get_rpc_reply(condata->conn));
{
amqp_queue_declare_ok_t *r = amqp_queue_declare( condata->conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
amqp_assert(amqp_get_rpc_reply(condata->conn));
condata->queuename = amqp_bytes_malloc_dup(r->queue);
if (condata->queuename.bytes == NULL) {
fprintf(stderr, "Out of memory while copying queue name");
exit(1);
}
}
amqp_queue_bind(condata->conn, 1, condata->queuename, amqp_cstring_bytes(condata->exchange), amqp_cstring_bytes(condata->bindingkey), amqp_empty_table);
amqp_assert(amqp_get_rpc_reply(condata->conn));
amqp_basic_consume(condata->conn, 1, condata->queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
amqp_assert(amqp_get_rpc_reply(condata->conn));
}
int main(int argc, char const *const *argv) {
struct connection_data condata;
prepare_amqp_connection(argc,argv,&condata);
{
for (;;) {
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(condata.conn);
res = amqp_consume_message(condata.conn, &envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL != res.reply_type) {
break;
}
Runner__V1__Instruction *inst = runner__v1__instruction__unpack(NULL,envelope.message.body.len,envelope.message.body.bytes);
if(inst==NULL) {
fprintf(stderr, "error reading buffer");
exit(1);
}
switch(inst->instruction_case) {
case RUNNER__V1__INSTRUCTION__INSTRUCTION_BUILD:
puts("build request recieved");
printf("solution_id: %d\nbinding_key:%s\nlanguage:%d\nsolution:%s\n\n",inst->build->solution_id,inst->build->binding_key,inst->build->language,inst->build->solution);
break;
case RUNNER__V1__INSTRUCTION__INSTRUCTION_RUN:
puts("run request recieved");
printf("solution_id: %d\ntest_id:%d\nbinding_key:%s\n\n",inst->run->solution_id,inst->run->test_id,inst->run->binding_key);
break;
case RUNNER__V1__INSTRUCTION__INSTRUCTION__NOT_SET:
puts("empty request recieved");
break;
default:
puts("unknown request recieved");
}
runner__v1__instruction__free_unpacked(inst, NULL);
amqp_destroy_envelope(&envelope);
}
}
amqp_bytes_free(condata.queuename);
amqp_assert(amqp_channel_close(condata.conn, 1, AMQP_REPLY_SUCCESS));
amqp_assert(amqp_connection_close(condata.conn, AMQP_REPLY_SUCCESS));
amqp_code_assert(amqp_destroy_connection(condata.conn));
return 0;
}

27
transport/transport.h Normal file
View file

@ -0,0 +1,27 @@
#define _GNU_SOURCE
#include "gen/runner/v1/runner.pb-c.h"
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
struct connection_data {
char const *hostname;
int port;
char const *exchange;
char const *bindingkey;
amqp_socket_t *socket;
amqp_connection_state_t conn;
amqp_bytes_t queuename;
};
extern char* amqp_error(amqp_rpc_reply_t x);
#define error_log(err) fprintf(stderr,"error in %s line %d:%s",__FILE__,__LINE__,err);
#define amqp_assert(msg) {char* err=amqp_error(msg);if(err!=NULL) {fprintf(stderr,"amqp error in %s line %d:%s",__FILE__,__LINE__,err);free(err);}}
#define amqp_code_assert(msg) {int32_t code=msg;if(code<0) {fprintf(stderr,"amqp error in %s line %d:%s",__FILE__,__LINE__,amqp_error_string2(code));}}

48
transport/utils.c Normal file
View file

@ -0,0 +1,48 @@
#define _GNU_SOURCE
#include <ctype.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "gen/runner/v1/runner.pb-c.h"
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/framing.h>
#include <rabbitmq-c/tcp_socket.h>
#include <stdint.h>
#include "transport/transport.h"
char* amqp_error(amqp_rpc_reply_t x) {
char* res=NULL;
switch (x.reply_type) {
case AMQP_RESPONSE_NORMAL:
break;
case AMQP_RESPONSE_NONE:
asprintf(&res,"missing RPC reply type");
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
asprintf(&res,amqp_error_string2(x.library_error));
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *)x.reply.decoded;
asprintf(&res,"server connection error %uh, message: %.*s\n", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *)x.reply.decoded;
asprintf(&res,"server channel error %uh, message: %.*s\n", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
break;
}
default:
asprintf(&res,"unknown server error, method id 0x%08X\n", x.reply.id);
break;
}
}
return res;
}