sankar2000 0 Newbie Poster

Hello. I am trying to build a simple UDP multi-threaded echo server in C using libhv (library is using non-blocking events).

Basically i am trying to combine https://github.com/ithewei/libhv/blob/master/examples/udp_echo_server.c with the threading of https://github.com/ithewei/libhv/blob/master/examples/multi-thread/one-acceptor-multi-workers.c

I do this because i will have high amount of traffic and multiple threads will be needed to handle that kind of stream.

SOURCE CODE:

#include "hloop.h"
#include "hsocket.h"
#include "hthread.h"

static const char* host = "127.0.0.1";
static int port = 8000;
static int thread_num = 4;
static hloop_t*  accept_loop = NULL;
static hloop_t** worker_loops = NULL;

static hloop_t* get_next_loop() {
    static int s_cur_index = 0;
    if (++s_cur_index >= thread_num) {
        s_cur_index = 0;
    }
    return worker_loops[s_cur_index % thread_num];
}

// not used for UDP
/*
static void on_close(hio_t* io) {
    printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
}
*/

// not used for UDP
/*
static void on_recv(hio_t* io, void* buf, int readbytes) {
    // echo
    hio_write(io, buf, readbytes);
}
*/

// not used for UDP
/*
static void new_conn_event(hevent_t* ev) {
    hloop_t* loop = ev->loop;
    hio_t* io = (hio_t*)hevent_userdata(ev);
    hio_attach(loop, io);

    char localaddrstr[SOCKADDR_STRLEN] = {0};
    char peeraddrstr[SOCKADDR_STRLEN] = {0};
    printf("tid=%ld connfd=%d [%s] <= [%s]\n",
            (long)hv_gettid(),
            (int)hio_fd(io),
            SOCKADDR_STR(hio_localaddr(io), localaddrstr),
            SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));

    hio_setcb_close(io, on_close);
    hio_setcb_read(io, on_recv);

    hio_read(io);
}
*/

// not used for UDP
/*
static void on_accept(hio_t* io) {
    hio_detach(io);

    hloop_t* worker_loop = get_next_loop();
    hevent_t ev;
    memset(&ev, 0, sizeof(ev));
    ev.loop = worker_loop;
    ev.cb = new_conn_event;
    ev.userdata = io;
    hloop_post_event(worker_loop, &ev);
}
*/

static void on_recvfrom(hio_t* io, void* buf, int readbytes) {
    pthread_t tid = pthread_self();

    printf("on_recvfrom fd=%d readbytes=%d\n", hio_fd(io), readbytes);
    char localaddrstr[SOCKADDR_STRLEN] = {0};
    char peeraddrstr[SOCKADDR_STRLEN] = {0};
    printf("[tid=%ld][fd=%d][%s] <=> [%s] #[%lu]\n",
        (long)hv_gettid(),
        (int)hio_fd(io),
        SOCKADDR_STR(hio_localaddr(io), localaddrstr),
        SOCKADDR_STR(hio_peeraddr(io), peeraddrstr),
        (unsigned long)tid);

    // get msg
    char* str = (char*)buf;
    printf("< %.*s", readbytes, str);

    // echo back
    printf("> %.*s", readbytes, str);
    hio_write(io, buf, readbytes);

#if TEST_KCP
    if (strncmp(str, "CLOSE", 5) == 0) {
        hio_close_rudp(io, hio_peeraddr(io));
    }
#endif
}


static HTHREAD_ROUTINE(worker_thread) {
    hloop_t* loop = (hloop_t*)userdata;
    hloop_run(loop);
    return 0;
}

static HTHREAD_ROUTINE(accept_thread) {
    // not used for UDP
    /*
    hloop_t* loop = (hloop_t*)userdata;
    hio_t* listenio = hloop_create_tcp_server(loop, host, port, on_accept);
    if (listenio == NULL) {
        exit(1);
    }
    hloop_run(loop);
    */

    //hloop_t* loop = hloop_new(0);
    hloop_t* loop = (hloop_t*)userdata;
    hio_t* io = hloop_create_udp_server(loop, host, port); // Change the port as needed
    if (io == NULL) {
        return NULL;
    }

    hio_setcb_read(io, on_recvfrom);
    hio_read(io);
    hloop_run(loop);
    hloop_free(&loop);

    return 0;
}

int main(int argc, char** argv) {
    if (argc < 2) {
        printf("Usage: cmd port\n");
        return -10;
    }
    port = atoi(argv[1]);

    worker_loops = (hloop_t**)malloc(sizeof(hloop_t*) * thread_num);
    for (int i = 0; i < thread_num; ++i) {
        worker_loops[i] = hloop_new(HLOOP_FLAG_AUTO_FREE);
        hthread_create(worker_thread, worker_loops[i]);
    }

    accept_loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
    accept_thread(accept_loop);

    return 0;
}

If i do a simple test, it is oppening 4 threads but also observe that it only uses one thread:

netcat test1:

nc 127.0.0.1 8000  -u
msg1
msg1
msg2
msg2
^C

netcat test2:

nc 127.0.0.1 8000  -u
msg1 connect2
msg1 connect2
msg2 connect2
msg2 connect2
^C

Output:

on_recvfrom fd=8 readbytes=5
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:55234] #[139856893998912]
< msg1
> msg1
on_recvfrom fd=8 readbytes=5
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:55234] #[139856893998912]
< msg2
> msg2
on_recvfrom fd=8 readbytes=14
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:43275] #[139856893998912]
< msg1 connect2
> msg1 connect2
on_recvfrom fd=8 readbytes=14
[tid=27641][fd=8][127.0.0.1:8000] <=> [127.0.0.1:43275] #[139856893998912]
< msg2 connect2
> msg2 connect2

As you can see tid=27641 remains the same even after reconnection.

What should i do to solve this?

Thank you.