1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-24 02:04:21 +03:00

tsocket: add tstream_readv_pdu_send/recv()

metze
This commit is contained in:
Stefan Metzmacher 2009-04-03 17:17:15 +02:00
parent ee6d796c19
commit a0830f4cb9
2 changed files with 178 additions and 1 deletions

View File

@ -190,7 +190,7 @@ int _tstream_bsd_existing_socket(TALLOC_CTX *mem_ctx,
__location__)
/*
* Queue helpers
* Queue and PDU helpers
*/
struct tevent_req *tdgram_sendto_queue_send(TALLOC_CTX *mem_ctx,
@ -202,5 +202,17 @@ struct tevent_req *tdgram_sendto_queue_send(TALLOC_CTX *mem_ctx,
struct tsocket_address *dst);
ssize_t tdgram_sendto_queue_recv(struct tevent_req *req, int *perrno);
typedef int (*tstream_readv_pdu_next_vector_t)(struct tstream_context *stream,
void *private_data,
TALLOC_CTX *mem_ctx,
struct iovec **vector,
size_t *count);
struct tevent_req *tstream_readv_pdu_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct tstream_context *stream,
tstream_readv_pdu_next_vector_t next_vector_fn,
void *next_vector_private);
int tstream_readv_pdu_recv(struct tevent_req *req, int *perrno);
#endif /* _TSOCKET_H */

View File

@ -22,6 +22,7 @@
*/
#include "replace.h"
#include "system/filesys.h"
#include "tsocket.h"
#include "tsocket_internal.h"
@ -155,3 +156,167 @@ ssize_t tdgram_sendto_queue_recv(struct tevent_req *req, int *perrno)
return ret;
}
struct tstream_readv_pdu_state {
/* this structs are owned by the caller */
struct {
struct tevent_context *ev;
struct tstream_context *stream;
tstream_readv_pdu_next_vector_t next_vector_fn;
void *next_vector_private;
} caller;
/*
* Each call to the callback resets iov and count
* the callback allocated the iov as child of our state,
* that means we are allowed to modify and free it.
*
* we should call the callback every time we filled the given
* vector and ask for a new vector. We return if the callback
* ask for 0 bytes.
*/
struct iovec *vector;
size_t count;
/*
* the total number of bytes we read,
* the return value of the _recv function
*/
int total_read;
};
static void tstream_readv_pdu_ask_for_next_vector(struct tevent_req *req);
static void tstream_readv_pdu_readv_done(struct tevent_req *subreq);
struct tevent_req *tstream_readv_pdu_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct tstream_context *stream,
tstream_readv_pdu_next_vector_t next_vector_fn,
void *next_vector_private)
{
struct tevent_req *req;
struct tstream_readv_pdu_state *state;
req = tevent_req_create(mem_ctx, &state,
struct tstream_readv_pdu_state);
if (!req) {
return NULL;
}
state->caller.ev = ev;
state->caller.stream = stream;
state->caller.next_vector_fn = next_vector_fn;
state->caller.next_vector_private = next_vector_private;
state->vector = NULL;
state->count = 0;
state->total_read = 0;
tstream_readv_pdu_ask_for_next_vector(req);
if (!tevent_req_is_in_progress(req)) {
goto post;
}
return req;
post:
return tevent_req_post(req, ev);
}
static void tstream_readv_pdu_ask_for_next_vector(struct tevent_req *req)
{
struct tstream_readv_pdu_state *state = tevent_req_data(req,
struct tstream_readv_pdu_state);
int ret;
size_t to_read = 0;
size_t i;
struct tevent_req *subreq;
TALLOC_FREE(state->vector);
state->count = 0;
ret = state->caller.next_vector_fn(state->caller.stream,
state->caller.next_vector_private,
state, &state->vector, &state->count);
if (ret == -1) {
tevent_req_error(req, errno);
return;
}
if (state->count == 0) {
tevent_req_done(req);
return;
}
for (i=0; i < state->count; i++) {
size_t tmp = to_read;
tmp += state->vector[i].iov_len;
if (tmp < to_read) {
tevent_req_error(req, EMSGSIZE);
return;
}
to_read = tmp;
}
/*
* this is invalid the next vector function should have
* reported count == 0.
*/
if (to_read == 0) {
tevent_req_error(req, EINVAL);
return;
}
if (state->total_read + to_read < state->total_read) {
tevent_req_error(req, EMSGSIZE);
return;
}
subreq = tstream_readv_send(state,
state->caller.ev,
state->caller.stream,
state->vector,
state->count);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, tstream_readv_pdu_readv_done, req);
}
static void tstream_readv_pdu_readv_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(subreq,
struct tevent_req);
struct tstream_readv_pdu_state *state = tevent_req_data(req,
struct tstream_readv_pdu_state);
int ret;
int sys_errno;
ret = tstream_readv_recv(subreq, &sys_errno);
if (ret == -1) {
tevent_req_error(req, sys_errno);
return;
}
state->total_read += ret;
/* ask the callback for a new vector we should fill */
tstream_readv_pdu_ask_for_next_vector(req);
}
int tstream_readv_pdu_recv(struct tevent_req *req, int *perrno)
{
struct tstream_readv_pdu_state *state = tevent_req_data(req,
struct tstream_readv_pdu_state);
int ret;
ret = tsocket_simple_int_recv(req, perrno);
if (ret == 0) {
ret = state->total_read;
}
tevent_req_received(req);
return ret;
}