Bug 17164 - socketConnection closes and reopens server sockets after each incoming connection, losing any concurrent arrivals
Summary: socketConnection closes and reopens server sockets after each incoming connec...
Status: UNCONFIRMED
Alias: None
Product: R
Classification: Unclassified
Component: I/O (show other bugs)
Version: R 3.3.0
Hardware: All All
: P5 minor
Assignee: R-core
URL:
Depends on:
Blocks:
 
Reported: 2016-10-12 04:49 UTC by Zafer Barutcuoglu
Modified: 2016-10-12 04:56 UTC (History)
0 users

See Also:


Attachments

Note You need to log in before you can comment on or make changes to this bug.
Description Zafer Barutcuoglu 2016-10-12 04:49:10 UTC
Implementing a socket server in R using

  con <- socketConnection(server = TRUE, blocking = TRUE, ...)

opens a server socket, (waits for and) accepts an incoming connection, and closes the server socket, all in one call.

Sequence of internal functions and the resulting socket interface calls:
  R_SockOpen: socket(), bind(), listen()
  R_SockListen: accept()
  R_SockClose: close()

To accept another connection, the whole call sequence is to be repeated, but because the server socket is closed and reopened between connections, any other incoming connection already in the connection queue (arrived between listen() and accept()) is lost (and sent into limbo -- the client thinks they are connected). Other clients trying to connect between close() and the next listen() will also fail.

To avoid this, we need to be able to open the server socket once and accept any number of incoming connections on it as needed.
  R_SockOpen: socket(), bind(), listen()
  R_SockListen: accept()
  R_SockListen: accept()
  ...
  R_SockListen: accept()
  R_SockClose: close()

For that, we need individual access to the R_Sock{Open/Listen/Close} functions, while still returning R "connection" objects for the accepted connections, allowing an interface like:

  srvsock <- openServerSocket(port)
  tryCatch({
    con1 <- acceptSocketConnection(srvsock)
    con2 <- acceptSocketConnection(srvsock)
    # ...
  }, finally={
    closeServerSocket(srvsock)
  })
Comment 1 Zafer Barutcuoglu 2016-10-12 04:56:13 UTC
In case it helps, here is a hack to get around it currently. (If nothing else, its ugliness should motivate the need for a proper base-level fix.)

Package utils has (unexported) C interfaces to the individual functions (although utils::make.socket also calls them all at once), which can be recycled to provide an "opened server socket" class:

openServerSocket <- function(port) {
    if (length(port <- as.integer(port)) != 1L) stop("'port' must be integer of length 1")

    fd <- .Call(utils:::C_sockopen, port)
    if (fd <= 0) stop("Opening server socket failed")
    
    r <- list(socket = fd, port = port)
    class(r) <- "serversocket"
    r
}

closeServerSocket <- function(srvsock) {
  stopifnot(inherits(srvsock, "serversocket"))
  invisible(.Call(utils:::C_sockclose, srvsock$socket))
}


Now to accept an incoming connection and return an R "connection" object:

acceptSocketConnection <- function(srvsock, blocking = TRUE, mode = "a+", ...) {
  stopifnot(inherits(srvsock, "serversocket"))
  stopifnot(length(mode) == 1 && nchar(mode) <= 3)
  
  socket <- .Call(utils:::C_socklisten, srvsock$socket)
  if (socket <= 0) stop("Accepting incoming socket connection failed")
  
  host <- attr(socket, "host")
  
  con <- socketConnection(host, port = srvsock$port, blocking = blocking, open = "", ...)
  openConnectionFromSocket(con, socket, host, mode=mode, blocking = blocking)
  
  con
}


This is the ugly part: accessing a couple of non-API entry points (likely to break with any new R version) and overwriting an unopened connection's relevant parts with the accepted client socket's data:

//
// sock2conn.c
//

#include <R.h>
#include <Rinternals.h>
#include <R_ext/Connections.h>

// Hijacked from connections.c.
Rconnection getConnection(int n);
#define set_iconv Rf_set_iconv
void set_iconv(Rconnection con);

// Copied from Rconnections.h.
typedef struct sockconn {
    int port;
    int server;
    int fd;
    int timeout;
    char *host;
    char inbuf[4096], *pstart, *pend;
} *Rsockconn;

// "Opens" an existing (unopened) R connection object using an open socket.
int openConnectionFromSocket(int conindex, int sock, const char * host, const char * mode, int block) {
  Rconnection con = getConnection(conindex);
  
  // Copied from modules/internet/sockconn.c:sock_open.
  //
  Rsockconn this = (Rsockconn)con->private;
  int timeout = this->timeout;

  if(timeout == NA_INTEGER || timeout <= 0) timeout = 60;
  this->pend = this->pstart = this->inbuf;

  // 'buf' renamed to 'host'.
	free(con->description);
	con->description = (char *) malloc(strlen(host) + 10);
	sprintf(con->description, "<-%s:%d", host, this->port);

  // Copied from connections.c:do_open.
  //
  if(strlen(mode) > 0) strcpy(con->mode, mode);
  con->blocking = block;
  
  this->fd = sock;
  int mlen = (int) strlen(con->mode);
  con->isopen = TRUE;
  if(mlen >= 2 && con->mode[mlen - 1] == 'b') con->text = FALSE;
  else con->text = TRUE;
  set_iconv(con); /* OK for output, at least */
  con->save = -1000;

  return 0;
}


(Using Rcpp to export it, for convenience.)

//
// sock2conn_export.cpp
//

#include <Rcpp.h>
#include <R_ext/Boolean.h>

using namespace Rcpp;

extern "C" int openConnectionFromSocket(int conindex, int sock, const char * host, const char * mode, int block);
// [[Rcpp::export("openConnectionFromSocket")]]
void _openConnectionFromSocket(int conindex, int sock, const char * host, const char * mode, bool blocking) {
  openConnectionFromSocket(conindex, sock, host, mode, blocking);
}