Hello. I am trying to build a simple UDP multi-threaded echo server in C
using libhv
(library is using non-blocking events).
Basically i am trying to combine https://github.com/ithewei/libhv/blob/master/examples/udp_echo_server.c with the threading of https://github.com/ithewei/libhv/blob/master/examples/multi-thread/one-acceptor-multi-workers.c
I do this because i will have high amount of traffic and multiple threads will be needed to handle that kind of stream.
SOURCE CODE:
#include "hloop.h"
#include "hsocket.h"
#include "hthread.h"
static const char* host = "127.0.0.1";
static int port = 8000;
static int thread_num = 4;
static hloop_t* accept_loop = NULL;
static hloop_t** worker_loops = NULL;
static hloop_t* get_next_loop() {
static int s_cur_index = 0;
if (++s_cur_index >= thread_num) {
s_cur_index = 0;
}
return worker_loops[s_cur_index % thread_num];
}
// not used for UDP
/*
static void on_close(hio_t* io) {
printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
}
*/
// not used for UDP
/*
static void on_recv(hio_t* io, void* buf, int readbytes) {
// echo
hio_write(io, buf, readbytes);
}
*/
// not used for UDP
/*
static void new_conn_event(hevent_t* ev) {
hloop_t* loop = ev->loop;
hio_t* io = (hio_t*)hevent_userdata(ev);
hio_attach(loop, io);
char localaddrstr[SOCKADDR_STRLEN] = {0};
char peeraddrstr[SOCKADDR_STRLEN] = {0};
printf("tid=%ld connfd=%d [%s] <= [%s]\n",
(long)hv_gettid(),
(int)hio_fd(io),
SOCKADDR_STR(hio_localaddr(io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
hio_setcb_close(io, on_close);
hio_setcb_read(io, on_recv);
hio_read(io);
}
*/
// not used for UDP
/*
static void on_accept(hio_t* io) {
hio_detach(io);
hloop_t* worker_loop = get_next_loop();
hevent_t ev;
memset(&ev, 0, sizeof(ev));
ev.loop = worker_loop;
ev.cb = new_conn_event;
ev.userdata = io;
hloop_post_event(worker_loop, &ev);
}
*/
static void on_recvfrom(hio_t* io, void* buf, int readbytes) {
pthread_t tid = pthread_self();
printf("on_recvfrom fd=%d readbytes=%d\n", hio_fd(io), readbytes);
char localaddrstr[SOCKADDR_STRLEN] = {0};
char peeraddrstr[SOCKADDR_STRLEN] = {0};
printf("[tid=%ld][fd=%d][%s] <=> [%s] #[%lu]\n",
(long)hv_gettid(),
(int)hio_fd(io),
SOCKADDR_STR(hio_localaddr(io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr),
(unsigned long)tid);
// get msg
char* str = (char*)buf;
printf("< %.*s", readbytes, str);
// echo back
printf("> %.*s", readbytes, str);
hio_write(io, buf, readbytes);
#if TEST_KCP
if (strncmp(str, "CLOSE", 5) == 0) {
hio_close_rudp(io, hio_peeraddr(io));
}
#endif
}
static HTHREAD_ROUTINE(worker_thread) {
hloop_t* loop = (hloop_t*)userdata;
hloop_run(loop);
return 0;
}
static HTHREAD_ROUTINE(accept_thread) {
// not used for UDP
/*
hloop_t* loop = (hloop_t*)userdata;
hio_t* listenio = hloop_create_tcp_server(loop, host, port, on_accept);
if (listenio == NULL) {
exit(1);
}
hloop_run(loop);
*/
//hloop_t* loop = hloop_new(0);
hloop_t* loop = (hloop_t*)userdata;
hio_t* io = hloop_create_udp_server(loop, host, port); // Change the port as needed
if (io == NULL) {
return NULL;
}
hio_setcb_read(io, on_recvfrom);
hio_read(io);
hloop_run(loop);
hloop_free(&loop);
return 0;
}
int main(int argc, char** argv) {
if (argc < 2) {
printf("Usage: cmd port\n");
return -10;
}
port = atoi(argv[1]);
worker_loops = (hloop_t**)malloc(sizeof(hloop_t*) * thread_num);
for (int i = 0; i < thread_num; ++i) {
worker_loops[i] = hloop_new(HLOOP_FLAG_AUTO_FREE);
hthread_create(worker_thread, worker_loops[i]);
}
accept_loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
accept_thread(accept_loop);
return 0;
}
If i do a simple test, it is oppening 4 threads but also observe that it only uses one thread:
netcat test1:
nc 127.0.0.1 8000 -u
msg1
msg1
msg2
msg2
^C
netcat test2:
nc 127.0.0.1 8000 -u
msg1 connect2
msg1 connect2
msg2 connect2
msg2 connect2
^C
Output:
on_recvfrom fd=8 readbytes=5
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:55234] #[139856893998912]
< msg1
> msg1
on_recvfrom fd=8 readbytes=5
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:55234] #[139856893998912]
< msg2
> msg2
on_recvfrom fd=8 readbytes=14
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:43275] #[139856893998912]
< msg1 connect2
> msg1 connect2
on_recvfrom fd=8 readbytes=14
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:43275] #[139856893998912]
< msg2 connect2
> msg2 connect2
As you can see tid=27641
remains the same even after reconnection.
What should i do to solve this?
Thank you.