1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-22 22:04:08 +03:00

tsocket: add tstream implementation for bsd sockets (inet and unix)

metze
This commit is contained in:
Stefan Metzmacher 2009-04-03 12:15:27 +02:00
parent 8a090c4b8b
commit ee6d796c19
2 changed files with 943 additions and 0 deletions

View File

@ -146,6 +146,49 @@ int _tdgram_unix_socket(const struct tsocket_address *local,
#define tdgram_unix_socket(local, remote, mem_ctx, dgram) \
_tdgram_unix_socket(local, remote, mem_ctx, dgram, __location__)
struct tevent_req * tstream_inet_tcp_connect_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
const struct tsocket_address *local,
const struct tsocket_address *remote);
int _tstream_inet_tcp_connect_recv(struct tevent_req *req,
int *perrno,
TALLOC_CTX *mem_ctx,
struct tstream_context **stream,
const char *location);
#define tstream_inet_tcp_connect_recv(req, perrno, mem_ctx, stream) \
_tstream_inet_tcp_connect_recv(req, perrno, mem_ctx, stream, \
__location__)
struct tevent_req * tstream_unix_connect_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
const struct tsocket_address *local,
const struct tsocket_address *remote);
int _tstream_unix_connect_recv(struct tevent_req *req,
int *perrno,
TALLOC_CTX *mem_ctx,
struct tstream_context **stream,
const char *location);
#define tstream_unix_connect_recv(req, perrno, mem_ctx, stream) \
_tstream_unix_connect_recv(req, perrno, mem_ctx, stream, \
__location__)
int _tstream_unix_socketpair(TALLOC_CTX *mem_ctx1,
struct tstream_context **_stream1,
TALLOC_CTX *mem_ctx2,
struct tstream_context **_stream2,
const char *location);
#define tstream_unix_socketpair(mem_ctx1, stream1, mem_ctx2, stream2) \
_tstream_unix_socketpair(mem_ctx1, stream1, mem_ctx2, stream2, \
__location__)
int _tstream_bsd_existing_socket(TALLOC_CTX *mem_ctx,
int fd,
struct tstream_context **_stream,
const char *location);
#define tstream_bsd_existing_socket(mem_ctx, fd, stream) \
_tstream_bsd_existing_socket(mem_ctx, fd, stream, \
__location__)
/*
* Queue helpers
*/

View File

@ -1258,3 +1258,903 @@ int _tdgram_unix_socket(const struct tsocket_address *local,
return ret;
}
struct tstream_bsd {
int fd;
void *event_ptr;
struct tevent_fd *fde;
void *readable_private;
void (*readable_handler)(void *private_data);
void *writeable_private;
void (*writeable_handler)(void *private_data);
};
static void tstream_bsd_fde_handler(struct tevent_context *ev,
struct tevent_fd *fde,
uint16_t flags,
void *private_data)
{
struct tstream_bsd *bsds = talloc_get_type_abort(private_data,
struct tstream_bsd);
if (flags & TEVENT_FD_WRITE) {
bsds->writeable_handler(bsds->writeable_private);
return;
}
if (flags & TEVENT_FD_READ) {
if (!bsds->readable_handler) {
TEVENT_FD_NOT_READABLE(bsds->fde);
return;
}
bsds->readable_handler(bsds->readable_private);
return;
}
}
static int tstream_bsd_set_readable_handler(struct tstream_bsd *bsds,
struct tevent_context *ev,
void (*handler)(void *private_data),
void *private_data)
{
if (ev == NULL) {
if (handler) {
errno = EINVAL;
return -1;
}
if (!bsds->readable_handler) {
return 0;
}
bsds->readable_handler = NULL;
bsds->readable_private = NULL;
return 0;
}
/* read and write must use the same tevent_context */
if (bsds->event_ptr != ev) {
if (bsds->readable_handler || bsds->writeable_handler) {
errno = EINVAL;
return -1;
}
bsds->event_ptr = NULL;
TALLOC_FREE(bsds->fde);
}
if (bsds->fde == NULL) {
bsds->fde = tevent_add_fd(ev, bsds,
bsds->fd, TEVENT_FD_READ,
tstream_bsd_fde_handler,
bsds);
if (!bsds->fde) {
return -1;
}
/* cache the event context we're running on */
bsds->event_ptr = ev;
} else if (!bsds->readable_handler) {
TEVENT_FD_READABLE(bsds->fde);
}
bsds->readable_handler = handler;
bsds->readable_private = private_data;
return 0;
}
static int tstream_bsd_set_writeable_handler(struct tstream_bsd *bsds,
struct tevent_context *ev,
void (*handler)(void *private_data),
void *private_data)
{
if (ev == NULL) {
if (handler) {
errno = EINVAL;
return -1;
}
if (!bsds->writeable_handler) {
return 0;
}
bsds->writeable_handler = NULL;
bsds->writeable_private = NULL;
TEVENT_FD_NOT_WRITEABLE(bsds->fde);
return 0;
}
/* read and write must use the same tevent_context */
if (bsds->event_ptr != ev) {
if (bsds->readable_handler || bsds->writeable_handler) {
errno = EINVAL;
return -1;
}
bsds->event_ptr = NULL;
TALLOC_FREE(bsds->fde);
}
if (bsds->fde == NULL) {
bsds->fde = tevent_add_fd(ev, bsds,
bsds->fd, TEVENT_FD_WRITE,
tstream_bsd_fde_handler,
bsds);
if (!bsds->fde) {
return -1;
}
/* cache the event context we're running on */
bsds->event_ptr = ev;
} else if (!bsds->writeable_handler) {
TEVENT_FD_WRITEABLE(bsds->fde);
}
bsds->writeable_handler = handler;
bsds->writeable_private = private_data;
return 0;
}
static ssize_t tstream_bsd_pending_bytes(struct tstream_context *stream)
{
struct tstream_bsd *bsds = tstream_context_data(stream,
struct tstream_bsd);
ssize_t ret;
if (bsds->fd == -1) {
errno = ENOTCONN;
return -1;
}
ret = tsocket_bsd_pending(bsds->fd);
return ret;
}
struct tstream_bsd_readv_state {
struct tstream_context *stream;
struct iovec *vector;
size_t count;
int ret;
};
static int tstream_bsd_readv_destructor(struct tstream_bsd_readv_state *state)
{
struct tstream_bsd *bsds = tstream_context_data(state->stream,
struct tstream_bsd);
tstream_bsd_set_readable_handler(bsds, NULL, NULL, NULL);
return 0;
}
static void tstream_bsd_readv_handler(void *private_data);
static struct tevent_req *tstream_bsd_readv_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct tstream_context *stream,
struct iovec *vector,
size_t count)
{
struct tevent_req *req;
struct tstream_bsd_readv_state *state;
struct tstream_bsd *bsds = tstream_context_data(stream, struct tstream_bsd);
int ret;
req = tevent_req_create(mem_ctx, &state,
struct tstream_bsd_readv_state);
if (!req) {
return NULL;
}
state->stream = stream;
/* we make a copy of the vector so that we can modify it */
state->vector = talloc_array(state, struct iovec, count);
if (tevent_req_nomem(state->vector, req)) {
goto post;
}
memcpy(state->vector, vector, sizeof(struct iovec)*count);
state->count = count;
state->ret = 0;
talloc_set_destructor(state, tstream_bsd_readv_destructor);
if (bsds->fd == -1) {
tevent_req_error(req, ENOTCONN);
goto post;
}
/*
* this is a fast path, not waiting for the
* socket to become explicit readable gains
* about 10%-20% performance in benchmark tests.
*/
tstream_bsd_readv_handler(req);
if (!tevent_req_is_in_progress(req)) {
goto post;
}
ret = tstream_bsd_set_readable_handler(bsds, ev,
tstream_bsd_readv_handler,
req);
if (ret == -1) {
tevent_req_error(req, errno);
goto post;
}
return req;
post:
tevent_req_post(req, ev);
return req;
}
static void tstream_bsd_readv_handler(void *private_data)
{
struct tevent_req *req = talloc_get_type_abort(private_data,
struct tevent_req);
struct tstream_bsd_readv_state *state = tevent_req_data(req,
struct tstream_bsd_readv_state);
struct tstream_context *stream = state->stream;
struct tstream_bsd *bsds = tstream_context_data(stream, struct tstream_bsd);
int ret;
int err;
bool retry;
ret = readv(bsds->fd, state->vector, state->count);
if (ret == 0) {
/* propagate end of file */
tevent_req_error(req, EPIPE);
return;
}
err = tsocket_bsd_error_from_errno(ret, errno, &retry);
if (retry) {
/* retry later */
return;
}
if (tevent_req_error(req, err)) {
return;
}
state->ret += ret;
while (ret > 0) {
if (ret < state->vector[0].iov_len) {
uint8_t *base;
base = (uint8_t *)state->vector[0].iov_base;
base += ret;
state->vector[0].iov_base = base;
state->vector[0].iov_len -= ret;
break;
}
ret -= state->vector[0].iov_len;
state->vector += 1;
state->count -= 1;
}
if (state->count > 0) {
/* we have more to read */
return;
}
tevent_req_done(req);
}
static int tstream_bsd_readv_recv(struct tevent_req *req,
int *perrno)
{
struct tstream_bsd_readv_state *state = tevent_req_data(req,
struct tstream_bsd_readv_state);
int ret;
ret = tsocket_simple_int_recv(req, perrno);
if (ret == 0) {
ret = state->ret;
}
tevent_req_received(req);
return ret;
}
struct tstream_bsd_writev_state {
struct tstream_context *stream;
struct iovec *vector;
size_t count;
int ret;
};
static int tstream_bsd_writev_destructor(struct tstream_bsd_writev_state *state)
{
struct tstream_bsd *bsds = tstream_context_data(state->stream,
struct tstream_bsd);
tstream_bsd_set_writeable_handler(bsds, NULL, NULL, NULL);
return 0;
}
static void tstream_bsd_writev_handler(void *private_data);
static struct tevent_req *tstream_bsd_writev_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct tstream_context *stream,
const struct iovec *vector,
size_t count)
{
struct tevent_req *req;
struct tstream_bsd_writev_state *state;
struct tstream_bsd *bsds = tstream_context_data(stream, struct tstream_bsd);
int ret;
req = tevent_req_create(mem_ctx, &state,
struct tstream_bsd_writev_state);
if (!req) {
return NULL;
}
state->stream = stream;
/* we make a copy of the vector so that we can modify it */
state->vector = talloc_array(state, struct iovec, count);
if (tevent_req_nomem(state->vector, req)) {
goto post;
}
memcpy(state->vector, vector, sizeof(struct iovec)*count);
state->count = count;
state->ret = 0;
talloc_set_destructor(state, tstream_bsd_writev_destructor);
if (bsds->fd == -1) {
tevent_req_error(req, ENOTCONN);
goto post;
}
/*
* this is a fast path, not waiting for the
* socket to become explicit writeable gains
* about 10%-20% performance in benchmark tests.
*/
tstream_bsd_writev_handler(req);
if (!tevent_req_is_in_progress(req)) {
goto post;
}
ret = tstream_bsd_set_writeable_handler(bsds, ev,
tstream_bsd_writev_handler,
req);
if (ret == -1) {
tevent_req_error(req, errno);
goto post;
}
return req;
post:
tevent_req_post(req, ev);
return req;
}
static void tstream_bsd_writev_handler(void *private_data)
{
struct tevent_req *req = talloc_get_type_abort(private_data,
struct tevent_req);
struct tstream_bsd_writev_state *state = tevent_req_data(req,
struct tstream_bsd_writev_state);
struct tstream_context *stream = state->stream;
struct tstream_bsd *bsds = tstream_context_data(stream, struct tstream_bsd);
ssize_t ret;
int err;
bool retry;
ret = writev(bsds->fd, state->vector, state->count);
if (ret == 0) {
/* propagate end of file */
tevent_req_error(req, EPIPE);
return;
}
err = tsocket_bsd_error_from_errno(ret, errno, &retry);
if (retry) {
/* retry later */
return;
}
if (tevent_req_error(req, err)) {
return;
}
state->ret += ret;
while (ret > 0) {
if (ret < state->vector[0].iov_len) {
uint8_t *base;
base = (uint8_t *)state->vector[0].iov_base;
base += ret;
state->vector[0].iov_base = base;
state->vector[0].iov_len -= ret;
break;
}
ret -= state->vector[0].iov_len;
state->vector += 1;
state->count -= 1;
}
if (state->count > 0) {
/* we have more to read */
return;
}
tevent_req_done(req);
}
static int tstream_bsd_writev_recv(struct tevent_req *req, int *perrno)
{
struct tstream_bsd_writev_state *state = tevent_req_data(req,
struct tstream_bsd_writev_state);
int ret;
ret = tsocket_simple_int_recv(req, perrno);
if (ret == 0) {
ret = state->ret;
}
tevent_req_received(req);
return ret;
}
struct tstream_bsd_disconnect_state {
void *__dummy;
};
static struct tevent_req *tstream_bsd_disconnect_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct tstream_context *stream)
{
struct tstream_bsd *bsds = tstream_context_data(stream, struct tstream_bsd);
struct tevent_req *req;
struct tstream_bsd_disconnect_state *state;
int ret;
int err;
bool dummy;
req = tevent_req_create(mem_ctx, &state,
struct tstream_bsd_disconnect_state);
if (req == NULL) {
return NULL;
}
if (bsds->fd == -1) {
tevent_req_error(req, ENOTCONN);
goto post;
}
ret = close(bsds->fd);
bsds->fd = -1;
err = tsocket_bsd_error_from_errno(ret, errno, &dummy);
if (tevent_req_error(req, err)) {
goto post;
}
tevent_req_done(req);
post:
tevent_req_post(req, ev);
return req;
}
static int tstream_bsd_disconnect_recv(struct tevent_req *req,
int *perrno)
{
int ret;
ret = tsocket_simple_int_recv(req, perrno);
tevent_req_received(req);
return ret;
}
static const struct tstream_context_ops tstream_bsd_ops = {
.name = "bsd",
.pending_bytes = tstream_bsd_pending_bytes,
.readv_send = tstream_bsd_readv_send,
.readv_recv = tstream_bsd_readv_recv,
.writev_send = tstream_bsd_writev_send,
.writev_recv = tstream_bsd_writev_recv,
.disconnect_send = tstream_bsd_disconnect_send,
.disconnect_recv = tstream_bsd_disconnect_recv,
};
static int tstream_bsd_destructor(struct tstream_bsd *bsds)
{
TALLOC_FREE(bsds->fde);
if (bsds->fd != -1) {
close(bsds->fd);
bsds->fd = -1;
}
return 0;
}
int _tstream_bsd_existing_socket(TALLOC_CTX *mem_ctx,
int fd,
struct tstream_context **_stream,
const char *location)
{
struct tstream_context *stream;
struct tstream_bsd *bsds;
stream = tstream_context_create(mem_ctx,
&tstream_bsd_ops,
&bsds,
struct tstream_bsd,
location);
if (!stream) {
return -1;
}
ZERO_STRUCTP(bsds);
bsds->fd = fd;
talloc_set_destructor(bsds, tstream_bsd_destructor);
*_stream = stream;
return 0;
}
struct tstream_bsd_connect_state {
int fd;
struct tevent_fd *fde;
struct tstream_conext *stream;
};
static int tstream_bsd_connect_destructor(struct tstream_bsd_connect_state *state)
{
TALLOC_FREE(state->fde);
if (state->fd != -1) {
close(state->fd);
state->fd = -1;
}
return 0;
}
static void tstream_bsd_connect_fde_handler(struct tevent_context *ev,
struct tevent_fd *fde,
uint16_t flags,
void *private_data);
static struct tevent_req * tstream_bsd_connect_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
int sys_errno,
const struct tsocket_address *local,
const struct tsocket_address *remote)
{
struct tevent_req *req;
struct tstream_bsd_connect_state *state;
struct tsocket_address_bsd *lbsda =
talloc_get_type_abort(local->private_data,
struct tsocket_address_bsd);
struct tsocket_address_bsd *rbsda =
talloc_get_type_abort(remote->private_data,
struct tsocket_address_bsd);
int ret;
int err;
bool retry;
bool do_bind = false;
bool do_reuseaddr = false;
socklen_t sa_len = sizeof(rbsda->u.ss);
req = tevent_req_create(mem_ctx, &state,
struct tstream_bsd_connect_state);
if (!req) {
return NULL;
}
state->fd = -1;
state->fde = NULL;
talloc_set_destructor(state, tstream_bsd_connect_destructor);
/* give the wrappers a chance to report an error */
if (sys_errno != 0) {
tevent_req_error(req, sys_errno);
goto post;
}
switch (lbsda->u.sa.sa_family) {
case AF_UNIX:
if (lbsda->u.un.sun_path[0] != 0) {
do_reuseaddr = true;
do_bind = true;
}
/*
* for unix sockets we can't use the size of sockaddr_storage
* we would get EINVAL
*/
sa_len = sizeof(rbsda->u.un);
break;
case AF_INET:
if (lbsda->u.in.sin_port != 0) {
do_reuseaddr = true;
do_bind = true;
}
if (lbsda->u.in.sin_addr.s_addr == INADDR_ANY) {
do_bind = true;
}
break;
#ifdef HAVE_IPV6
case AF_INET6:
if (lbsda->u.in6.sin6_port != 0) {
do_reuseaddr = true;
do_bind = true;
}
if (memcmp(&in6addr_any,
&lbsda->u.in6.sin6_addr,
sizeof(in6addr_any)) != 0) {
do_bind = true;
}
break;
#endif
default:
tevent_req_error(req, EINVAL);
goto post;
}
state->fd = socket(lbsda->u.sa.sa_family, SOCK_STREAM, 0);
if (state->fd == -1) {
tevent_req_error(req, errno);
goto post;
}
state->fd = tsocket_bsd_common_prepare_fd(state->fd, true);
if (state->fd == -1) {
tevent_req_error(req, errno);
goto post;
}
if (do_reuseaddr) {
int val = 1;
ret = setsockopt(state->fd, SOL_SOCKET, SO_REUSEADDR,
(const void *)&val, sizeof(val));
if (ret == -1) {
tevent_req_error(req, errno);
goto post;
}
}
if (do_bind) {
ret = bind(state->fd, &lbsda->u.sa, sizeof(lbsda->u.ss));
if (ret == -1) {
tevent_req_error(req, errno);
goto post;
}
}
ret = connect(state->fd, &rbsda->u.sa, sa_len);
err = tsocket_bsd_error_from_errno(ret, errno, &retry);
if (retry) {
/* retry later */
goto async;
}
if (tevent_req_error(req, err)) {
goto post;
}
tevent_req_done(req);
goto post;
async:
state->fde = tevent_add_fd(ev, state,
state->fd,
TEVENT_FD_READ | TEVENT_FD_WRITE,
tstream_bsd_connect_fde_handler,
req);
if (tevent_req_nomem(state->fde, req)) {
goto post;
}
return req;
post:
tevent_req_post(req, ev);
return req;
}
static void tstream_bsd_connect_fde_handler(struct tevent_context *ev,
struct tevent_fd *fde,
uint16_t flags,
void *private_data)
{
struct tevent_req *req = talloc_get_type_abort(private_data,
struct tevent_req);
struct tstream_bsd_connect_state *state = tevent_req_data(req,
struct tstream_bsd_connect_state);
int ret;
int error=0;
socklen_t len = sizeof(error);
int err;
bool retry;
ret = getsockopt(state->fd, SOL_SOCKET, SO_ERROR, &error, &len);
if (ret == 0) {
if (error != 0) {
errno = error;
ret = -1;
}
}
err = tsocket_bsd_error_from_errno(ret, errno, &retry);
if (retry) {
/* retry later */
return;
}
if (tevent_req_error(req, err)) {
return;
}
tevent_req_done(req);
}
static int tstream_bsd_connect_recv(struct tevent_req *req,
int *perrno,
TALLOC_CTX *mem_ctx,
struct tstream_context **stream,
const char *location)
{
struct tstream_bsd_connect_state *state = tevent_req_data(req,
struct tstream_bsd_connect_state);
int ret;
ret = tsocket_simple_int_recv(req, perrno);
if (ret == 0) {
ret = _tstream_bsd_existing_socket(mem_ctx,
state->fd,
stream,
location);
if (ret == -1) {
*perrno = errno;
goto done;
}
TALLOC_FREE(state->fde);
state->fd = -1;
}
done:
tevent_req_received(req);
return ret;
}
struct tevent_req * tstream_inet_tcp_connect_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
const struct tsocket_address *local,
const struct tsocket_address *remote)
{
struct tsocket_address_bsd *lbsda =
talloc_get_type_abort(local->private_data,
struct tsocket_address_bsd);
struct tevent_req *req;
int sys_errno = 0;
switch (lbsda->u.sa.sa_family) {
case AF_INET:
break;
#ifdef HAVE_IPV6
case AF_INET6:
break;
#endif
default:
sys_errno = EINVAL;
break;
}
req = tstream_bsd_connect_send(mem_ctx, ev, sys_errno, local, remote);
return req;
}
int _tstream_inet_tcp_connect_recv(struct tevent_req *req,
int *perrno,
TALLOC_CTX *mem_ctx,
struct tstream_context **stream,
const char *location)
{
return tstream_bsd_connect_recv(req, perrno, mem_ctx, stream, location);
}
struct tevent_req * tstream_unix_connect_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
const struct tsocket_address *local,
const struct tsocket_address *remote)
{
struct tsocket_address_bsd *lbsda =
talloc_get_type_abort(local->private_data,
struct tsocket_address_bsd);
struct tevent_req *req;
int sys_errno = 0;
switch (lbsda->u.sa.sa_family) {
case AF_UNIX:
break;
default:
sys_errno = EINVAL;
break;
}
req = tstream_bsd_connect_send(mem_ctx, ev, sys_errno, local, remote);
return req;
}
int _tstream_unix_connect_recv(struct tevent_req *req,
int *perrno,
TALLOC_CTX *mem_ctx,
struct tstream_context **stream,
const char *location)
{
return tstream_bsd_connect_recv(req, perrno, mem_ctx, stream, location);
}
int _tstream_unix_socketpair(TALLOC_CTX *mem_ctx1,
struct tstream_context **_stream1,
TALLOC_CTX *mem_ctx2,
struct tstream_context **_stream2,
const char *location)
{
int ret;
int fds[2];
int fd1;
int fd2;
struct tstream_context *stream1 = NULL;
struct tstream_context *stream2 = NULL;
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
if (ret == -1) {
return -1;
}
fd1 = fds[0];
fd2 = fds[1];
fd1 = tsocket_bsd_common_prepare_fd(fd1, true);
if (fd1 == -1) {
int sys_errno = errno;
close(fd2);
errno = sys_errno;
return -1;
}
fd2 = tsocket_bsd_common_prepare_fd(fd2, true);
if (fd2 == -1) {
int sys_errno = errno;
close(fd1);
errno = sys_errno;
return -1;
}
ret = _tstream_bsd_existing_socket(mem_ctx1,
fd1,
&stream1,
location);
if (ret == -1) {
int sys_errno = errno;
close(fd1);
close(fd2);
errno = sys_errno;
return -1;
}
ret = _tstream_bsd_existing_socket(mem_ctx2,
fd2,
&stream2,
location);
if (ret == -1) {
int sys_errno = errno;
talloc_free(stream1);
close(fd2);
errno = sys_errno;
return -1;
}
*_stream1 = stream1;
*_stream2 = stream2;
return 0;
}