Use epoll in multiple-thread programming

epoll provides a simple but high-efficient polling mechanism:

(1) epoll_create1 creates a epoll instance;
(2) epoll_ctl modifies the file descriptors in epoll instance;
(3) epoll_wait is used to wait I/O events.

Moustique shows a method of using epoll in multiple-thread program:

auto event_loop_fn = [listen_fd, conn_handler] {

    int epoll_fd = epoll_create1(0);

    ......
    epoll_ctl(listen_fd, EPOLLIN | EPOLLET);

    const int MAXEVENTS = 64;
    ......

    // Event loop.
    epoll_event events[MAXEVENTS];
    while (true)
    {
      int n_events = epoll_wait (epoll_fd, events, MAXEVENTS, -1);
      ......
    }
}

Every thread has its own epoll instance, and monitors the listen_fd. When a new connection is established, a dedicated thread will serve it. Since every thread has its own epoll instance and events, this will eliminate synchronization among threads.

If you want multiple threads using the same epoll instance, I think this topic can be a reference.

The gRPC server program will crash when can’t bind successfully

a server program which uses gRPC crashed when started:

$ ./server 60001
......
E1123 15:37:54.133480971   14408 server_chttp2.c:38]         {"created":"@1511422674.133408109","description":"No address added out of total 1 resolved","file":"src/core/ext/transport/chttp2/server/chttp2_server.c","file_line":245,"referenced_errors":[{"created":"@1511422674.133405147","description":"Failed to add any wildcard listeners","file":"src/core/lib/iomgr/tcp_server_posix.c","file_line":338,"referenced_errors":[{"created":"@1511422674.133394827","description":"Unable to configure socket","fd":4,"file":"src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":200,"referenced_errors":[{"created":"@1511422674.133385167","description":"OS Error","errno":98,"file":"src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":173,"os_error":"Address already in use","syscall":"bind"}]},{"created":"@1511422674.133404647","description":"Unable to configure socket","fd":4,"file":"src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":200,"referenced_errors":[{"created":"@1511422674.133401558","description":"OS Error","errno":98,"file":"src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":173,"os_error":"Address already in use","syscall":"bind"}]}]}]}
Segmentation fault (core dumped)

It is weird because it runs well yesterday. Check the core dump file:

......
Core was generated by `./server 60001'.
Program terminated with signal SIGSEGV, Segmentation fault.
#0  0x00007f30edbbf9b0 in pthread_mutex_lock () from /usr/lib/libpthread.so.0
(gdb) bt
#0  0x00007f30edbbf9b0 in pthread_mutex_lock () from /usr/lib/libpthread.so.0
#1  0x000055c312c87077 in grpc::Server::Wait() ()
......

No clue. So format the above json log:

{  
   "created":"@1511422674.133408109",
   "description":"No address added out of total 1 resolved",
   "file":"src/core/ext/transport/chttp2/server/chttp2_server.c",
   "file_line":245,
   "referenced_errors":[  
      {  
         "created":"@1511422674.133405147",
         "description":"Failed to add any wildcard listeners",
         "file":"src/core/lib/iomgr/tcp_server_posix.c",
         "file_line":338,
         "referenced_errors":[  
            {  
               "created":"@1511422674.133394827",
               "description":"Unable to configure socket",
               "fd":4,
               "file":"src/core/lib/iomgr/tcp_server_utils_posix_common.c",
               "file_line":200,
               "referenced_errors":[  
                  {  
                     "created":"@1511422674.133385167",
                     "description":"OS Error",
                     "errno":98,
                     "file":"src/core/lib/iomgr/tcp_server_utils_posix_common.c",
                     "file_line":173,
                     "os_error":"Address already in use",
                     "syscall":"bind"
                  }
               ]
            },
            {  
               "created":"@1511422674.133404647",
               "description":"Unable to configure socket",
               "fd":4,
               "file":"src/core/lib/iomgr/tcp_server_utils_posix_common.c",
               "file_line":200,
               "referenced_errors":[  
                  {  
                     "created":"@1511422674.133401558",
                     "description":"OS Error",
                     "errno":98,
                     "file":"src/core/lib/iomgr/tcp_server_utils_posix_common.c",
                     "file_line":173,
                     "os_error":"Address already in use",
                     "syscall":"bind"
                  }
               ]
            }
         ]
      }

Oh, I see. “Address already in use” means the 60001 port is occupied already. I switch to another port, and it works.

Learn socket programming tips from netcat

Since netcat is honored as “TCP/IP swiss army knife”, I read its source code in OpenBSD to summarize some socket programming tips:

(1) Client connects in non-blocking mode:

......
s = socket(res->ai_family, res->ai_socktype |
            SOCK_NONBLOCK, res->ai_protocol);


......  
if ((ret = connect(s, name, namelen)) != 0 && errno == EINPROGRESS) {
        pfd.fd = s;
        pfd.events = POLLOUT;
        ret = poll(&pfd, 1, timeout));
}
......

Creating socket and set SOCK_NONBLOCK mode for it. Then calling connect() function, if ret is 0, it means connection is established successfully; if errno is EINPROGRESS, we can use timeout to control how long to wait; otherwise the connection can’t be built.

(2) The usage of poll():

......
/* stdin */
pfd[POLL_STDIN].fd = stdin_fd;
pfd[POLL_STDIN].events = POLLIN;

/* network out */
pfd[POLL_NETOUT].fd = net_fd;
pfd[POLL_NETOUT].events = 0;

/* network in */
pfd[POLL_NETIN].fd = net_fd;
pfd[POLL_NETIN].events = POLLIN;

/* stdout */
pfd[POLL_STDOUT].fd = stdout_fd;
pfd[POLL_STDOUT].events = 0;

......
/* poll */
num_fds = poll(pfd, 4, timeout);

/* treat poll errors */
if (num_fds == -1)
    err(1, "polling error");

/* timeout happened */
if (num_fds == 0)
    return;

/* treat socket error conditions */
for (n = 0; n < 4; n++) {
    if (pfd[n].revents & (POLLERR|POLLNVAL)) {
        pfd[n].fd = -1;
    }
}
/* reading is possible after HUP */
if (pfd[POLL_STDIN].events & POLLIN &&
    pfd[POLL_STDIN].revents & POLLHUP &&
    !(pfd[POLL_STDIN].revents & POLLIN))
    pfd[POLL_STDIN].fd = -1;

Usually, we just need to care about file descriptors for reading:

pfd[POLL_STDIN].fd = stdin_fd;
pfd[POLL_STDIN].events = POLLIN;

no need to monitor file descriptors for writing:

/* network out */
pfd[POLL_NETOUT].fd = net_fd;
pfd[POLL_NETOUT].events = 0;

According to poll() manual from OpenBSD, if no need for “high-priority” (maybe out-of-band) data, POLLIN is enough, otherwise the monitor events should be POLLIN|POLLPRI. And this is similar for POLLOUT and POLLWRBAND.

There are 3 values(POLLERR, POLLNVAL and POLLHUP) which are only used in struct pollfd‘s revents. If POLLERR or POLLNVAL is detected, it’s not necessary to poll this file descriptor furthermore:

if (pfd[n].revents & (POLLERR|POLLNVAL)) {
    pfd[n].fd = -1;
}

We should pay more attention to POLLHUP:
(a)

POLLHUP

The device or socket has been disconnected. This event and POLLOUT are mutually-exclusive; a descriptor can never be writable if a hangup has occurred. However, this event and POLLIN, POLLRDNORM, POLLRDBAND, or POLLPRI are not mutually-exclusive. This flag is only valid in the revents bitmask; it is ignored in the events member.

(b)

The second difference is that on EOF there is no guarantee that POLLIN will be set in revents, the caller must also check for POLLHUP.

So it means if POLLHUP and POLLIN are both set in revents, there must be data to read (maybe EOF?), otherwise if only POLLHUP is checked, there is no data to read from.

 

Leverage comprehensive debugging tricks in one shot

Last Friday, a colleague told me that when connecting an invalid address, the client using gRPC will block forever. To verify it, I use the example code shipped in gRPC:

GreeterClient greeter(grpc::CreateChannel(
  "localhost:50051", grpc::InsecureChannelCredentials()));

Change the "localhost:50051" to "badhost:50051", then compile and execute the program. Sure enough, the client hang without any response. At the outset, I thought it should be a common issue, and there must be a solution already. So I just submitted a post in the discussion group, although there was some responses, but since they were not the satisfactory explanations, I knew I need to trouble-shooting myself.

(1) The first thing I wanted to make sure was whether the network card had sent requests to badhost or not, so I used tcpdump to capture the packets:

$ sudo tcpdump -A -s 0 'port 50051' -i enp7s0f0

But there isn’t any data captured. To double-confirm, I also used tcpconnect program to check:

$ sudo tcpconnect -P 50051
PID    COMM         IP SADDR            DADDR            DPORT

Still nothing output.

(2) Although I couldn’t find the connect request to port 50051, no matter what application on *NIX, it will definitely call connect function at the end. So I changed the tactic, and tried to find who calls the connect:

a) Build gRPC with debugging info (The reason of using “PKG_CONFIG_PATH=/usr/lib/openssl-1.0/pkgconfig” is here):

$ PKG_CONFIG_PATH=/usr/lib/openssl-1.0/pkgconfig CC=clang CXX=clang++ CFLAGS="-g -O0" CXXFLAGS="-g -O0" make

b) Modify the Makefile to build client program with debugging info:

CXXFLAGS += -g -std=c++11

c) Use gdb to debug the program, after starting it, set breakpoint at connect function:

$ gdb -q greeter_client
Reading symbols from greeter_client...done.
(gdb) start
Temporary breakpoint 1 at 0x146fe: file greeter_client.cc, line 74.
Starting program: /home/xiaonan/Project/grpc/examples/cpp/helloworld/greeter_client
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/libthread_db.so.1".

Temporary breakpoint 1, main (argc=1, argv=0x7fffffffea88) at greeter_client.cc:74
74      int main(int argc, char** argv) {
(gdb) b connect
Breakpoint 2 at 0x7ffff6619b80 (2 locations)

Then continue executing the program. When the breakpoint was hit, check the stack:

(gdb) c
Continuing.
[New Thread 0x7ffff4edc700 (LWP 28396)]
[New Thread 0x7ffff46db700 (LWP 28397)]
[Switching to Thread 0x7ffff4edc700 (LWP 28396)]

Thread 2 "greeter_client" hit Breakpoint 2, 0x00007ffff6619b80 in connect () from /usr/lib/libc.so.6

(gdb) bt
#0  0x00007ffff6619b80 in connect () from /usr/lib/libc.so.6
#1  0x00007ffff664e61e in open_socket () from /usr/lib/libc.so.6
#2  0x00007ffff664f156 in __nscd_open_socket () from /usr/lib/libc.so.6
#3  0x00007ffff664ccc6 in __nscd_getai () from /usr/lib/libc.so.6
#4  0x00007ffff66038bc in gaih_inet.constprop () from /usr/lib/libc.so.6
#5  0x00007ffff6604724 in getaddrinfo () from /usr/lib/libc.so.6
#6  0x00007ffff714ee1e in ?? () from /usr/lib/libgrpc.so.4
#7  0x00007ffff714f38c in ?? () from /usr/lib/libgrpc.so.4
#8  0x00007ffff714d020 in ?? () from /usr/lib/libgrpc.so.4
#9  0x00007ffff714cf12 in ?? () from /usr/lib/libgrpc.so.4
#10 0x00007ffff71fff57 in ?? () from /usr/lib/libgrpc.so.4
#11 0x00007ffff7755049 in start_thread () from /usr/lib/libpthread.so.0
#12 0x00007ffff6618f0f in clone () from /usr/lib/libc.so.6

Then continue to run the program, the breakpoint was hit again:

(gdb) bt
#0  0x00007ffff6619b80 in connect () from /usr/lib/libc.so.6
#1  0x00007ffff664e61e in open_socket () from /usr/lib/libc.so.6
#2  0x00007ffff664f156 in __nscd_open_socket () from /usr/lib/libc.so.6
#3  0x00007ffff664ccc6 in __nscd_getai () from /usr/lib/libc.so.6
#4  0x00007ffff66038bc in gaih_inet.constprop () from /usr/lib/libc.so.6
#5  0x00007ffff6604724 in getaddrinfo () from /usr/lib/libc.so.6
#6  0x00007ffff714ee1e in ?? () from /usr/lib/libgrpc.so.4
#7  0x00007ffff714f38c in ?? () from /usr/lib/libgrpc.so.4
#8  0x00007ffff714d020 in ?? () from /usr/lib/libgrpc.so.4
#9  0x00007ffff714cf12 in ?? () from /usr/lib/libgrpc.so.4
#10 0x00007ffff71fff57 in ?? () from /usr/lib/libgrpc.so.4
#11 0x00007ffff7755049 in start_thread () from /usr/lib/libpthread.so.0
#12 0x00007ffff6618f0f in clone () from /usr/lib/libc.so.6
(gdb)

Oh, I see! The resolving of badhost must be failed, so there would definitely no subsequent connecting port 50051. But why the client was trying to resolve name again and again? If I find this cause, it can explain why client was blocking.

(3) Since there is ?? from /usr/lib/libgrpc.so.4, I can’t know which function was the culprit. I can go over the code, but I think I need the direct proof. Build gRPC with CC=clang CXX=clang++ CFLAGS="-g -O0" CXXFLAGS="-g -O0" seems not enough. After some tweaking, I come out the following solutions:

a) According to the Makefile:

# TODO(nnoble): the strip target is stripping in-place, instead
# of copying files in a temporary folder.
# This prevents proper debugging after running make install.  

make install” will strip the debugging information, so instead of executing “make install” command, I set LD_LIBRARY_PATH environment variable to let client link library in the specified directory:

$ export LD_LIBRARY_PATH=/home/xiaonan/Project/grpc/libs/opt

b) Hardcode -g in the Makefile:

CFLAGS += -g -std=c99 -Wsign-conversion -Wconversion $(W_SHADOW) $(W_EXTRA_SEMI)
CXXFLAGS += -g -std=c++11

Then the symbols can all be resolved:

(gdb) bt
#0  0x00007ffff6486b80 in connect () from /usr/lib/libc.so.6
#1  0x00007ffff64bb61e in open_socket () from /usr/lib/libc.so.6
#2  0x00007ffff64bbae2 in __nscd_get_mapping () from /usr/lib/libc.so.6
#3  0x00007ffff64bbed5 in __nscd_get_map_ref () from /usr/lib/libc.so.6
#4  0x00007ffff64b9ba3 in __nscd_getai () from /usr/lib/libc.so.6
#5  0x00007ffff64708bc in gaih_inet.constprop () from /usr/lib/libc.so.6
#6  0x00007ffff6471724 in getaddrinfo () from /usr/lib/libc.so.6
#7  0x00007ffff7473ec5 in blocking_resolve_address_impl (name=0x55555578edf0 "badhost:50051",
    default_port=0x555555790220 "https", addresses=0x55555578f1f0) at src/core/lib/iomgr/resolve_address_posix.c:83
#8  0x00007ffff74742e3 in do_request_thread (exec_ctx=0x7ffff5043c30, rp=0x55555578e630, error=<optimized out>)
    at src/core/lib/iomgr/resolve_address_posix.c:157
#9  0x00007ffff7472b86 in run_closures (exec_ctx=<optimized out>, list=...) at src/core/lib/iomgr/executor.c:64
#10 executor_thread (arg=0x555555789fc0) at src/core/lib/iomgr/executor.c:152
#11 0x00007ffff74e5286 in thread_body (v=<optimized out>) at src/core/lib/support/thd_posix.c:42
#12 0x00007ffff6181049 in start_thread () from /usr/lib/../lib64/libpthread.so.0
#13 0x00007ffff6485f0f in clone () from /usr/lib/libc.so.6

Now I just need to step-into code, and the information of this issue can also be referred here.

During the whole process, I used sniffer tool (tcpdump), kernel tracing tool(tcpconnect, which belongs to bcc and utilizes eBPF), networking knowledge (set breakpoint on connect function), debugging tool (gdb), and the trick of linking library (set LD_LIBRARY_PATH to bypass installing gRPC), that’s why I call the whole procedure “leverage comprehensive debugging tricks”.

 

The analysis of gRPC communication mode

The route_guide program is a very good tutorial to learn gRPC, I will use it as an example to analyze the gRPC programming.

Prologue:
Compile the program with debug information, so it is easy to use gdb to manipulate them. Modify Makefile:

CXXFLAGS += -std=c++11 -g

(1) Start route_guide_server application, and use gdb to step-into route_guide_client:

221     int main(int argc, char** argv) {
(gdb) n
223       std::string db = routeguide::GetDbFileContent(argc, argv);
(gdb)
226                               grpc::InsecureChannelCredentials()),
(gdb)
[New Thread 0x7ffff502f700 (LWP 25947)]
[New Thread 0x7ffff482e700 (LWP 25948)]
225           grpc::CreateChannel("localhost:50051",
(gdb)
227           db);
(gdb)
DB parsed, loaded 100 features.

Check current network status:

$ netstat -an | grep 50051
tcp6       0      0 :::50051                :::*                    LISTEN

From this, we can know grpc::CreateChannel() function doesn’t create connection to server, and it just does some initialization work.

(2) Since simple RPC is one-request/one-response mode, and it’s really easy, I won’t discuss it here.

(3) Server-side streaming RPC is one-request/multiple-response mode. The client code skeleton is like this:

std::unique_ptr<ClientReader<Feature> > reader(
        stub_->ListFeatures(&context, rect));
while (reader->Read(&feature)) {
  ......
}
Status status = reader->Finish();
if (status.ok()) {
  ......
} else {
  ......
}

After client sends request (stub_->ListFeatures(&context, rect)), it will read all the responses from server until reader->Read() returns false. Then using reader->Finish() to check the status. The server code is simple, just send all responses:

for (......) {
    ......
    writer->Write(f);
 } 

But how about client not want to continue to process the responses in the middle? It likes this:

while (reader->Read(&feature)) {
  ......
  if (some error) {
    break;
  } 
}
Status status = reader->Finish();
if (status.ok()) {
  ......
} else {
  ......
}

After experimenting, the client will block at reader->Finish() function. You can refer Be careful of using grpc::ClientStreamingInterface::Finish() function. There are 2 methods I can figure out to handle this case:
a) Don’t call reader->Finish():

while (reader->Read(&feature)) {
  ......
  if (some error) {
    break;
  } 
}

b) Or although there is error, still consume other messages:

while (reader->Read(&feature)) {
  ......
  if (some error) {
    break;
  } 
}
while (reader->Read(&feature)) {
    // do nothing, just comsume other messages.
}
Status status = reader->Finish();
if (status.ok()) {
  ......
} else {
  ......
}

Honestly, I don’t dive into the gRPC code. But from testing, the above 2 methods seems all work correctly.

(4) Client-side streaming RPC is multiple-request/one-response mode, the client code framework is like this:

std::unique_ptr<ClientWriter<Point> > writer(
    stub_->RecordRoute(&context, &stats));
for (......) {
  ......
  if (!writer->Write(f.location())) {
    // Broken stream.
    break;
  }
  ......
}
writer->WritesDone();
Status status = writer->Finish();
if (status.ok()) {
  // Check stats from server
} else {
  ......
}

The client code is simple, just tries best to send all requests. Once finish transmitting, calls writer->WritesDone() and blocks writer->Finish() to wait server’s response. For server code, it’s also brief:

while (reader->Read(&point)) {
  ......
}

If the program breaks the while loop half way:

while (reader->Read(&point)) {
  ......
  if (some error) {
    break;
  } 
}

That is OK too.

(5) The last flow is bidirectional streaming RPC, which is multiple-request/multiple-response mode. For the server code framework, it is neat, just sends response according request:

while (stream->Read(&note)) {
  ......
  stream->Write(n);
  ......
}

But for client side, it is a little complicated:

stream->Write(note);
stream->Read(&server_note);
stream->Write(note);
stream->Read(&server_note);
......
stream->WritesDone();
Status status = stream->Finish();
if (!status.ok()) {
  ......
} else {
  ......
}

You must pay attention to the same problem as Server-side streaming RPC, that is, if there is pending server’s responses before stream->Finish(), it will cause client block forever:

while (reader->Read(&feature)) {
  ......
  if (some error) {
    break;
  } 
}
Status status = reader->Finish();

Epilogue:
In a summary, gRPC is a brilliant communication framework which hides so many dirty details and let you focus on message definitions. If you are considering doing some network programming, you can try it, and I think it can’t make you disappointed.