MEDIUM: threads/fd: Initialize the process mask during the call to fd_insert
Listeners will allow any threads to process the corresponding fd. But for other FDs, we limit the processing to the current thread.
This commit is contained in:
parent
a7c5d43085
commit
36716a7fec
@ -110,7 +110,7 @@ static inline void conn_ctrl_init(struct connection *conn)
|
||||
|
||||
fdtab[fd].owner = conn;
|
||||
fdtab[fd].iocb = conn_fd_handler;
|
||||
fd_insert(fd);
|
||||
fd_insert(fd, tid_bit);
|
||||
/* mark the fd as ready so as not to needlessly poll at the beginning */
|
||||
fd_may_recv(fd);
|
||||
fd_may_send(fd);
|
||||
|
@ -395,7 +395,7 @@ static inline void fd_update_events(int fd, int evts)
|
||||
}
|
||||
|
||||
/* Prepares <fd> for being polled */
|
||||
static inline void fd_insert(int fd)
|
||||
static inline void fd_insert(int fd, unsigned long thread_mask)
|
||||
{
|
||||
SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
|
||||
fdtab[fd].ev = 0;
|
||||
@ -404,7 +404,7 @@ static inline void fd_insert(int fd)
|
||||
fdtab[fd].linger_risk = 0;
|
||||
fdtab[fd].cloned = 0;
|
||||
fdtab[fd].cache = 0;
|
||||
fdtab[fd].process_mask = 0; // unused for now
|
||||
fdtab[fd].process_mask = thread_mask;
|
||||
SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
|
||||
|
||||
SPIN_LOCK(FDTAB_LOCK, &fdtab_lock);
|
||||
|
@ -193,7 +193,7 @@ static int dns_connect_namesaver(struct dns_nameserver *ns)
|
||||
dgram->t.sock.fd = fd;
|
||||
fdtab[fd].owner = dgram;
|
||||
fdtab[fd].iocb = dgram_fd_handler;
|
||||
fd_insert(fd);
|
||||
fd_insert(fd, (unsigned long)-1);
|
||||
fd_want_recv(fd);
|
||||
return 0;
|
||||
}
|
||||
|
@ -2228,7 +2228,7 @@ void mworker_pipe_register(int pipefd[2])
|
||||
fcntl(mworker_pipe[0], F_SETFL, O_NONBLOCK);
|
||||
fdtab[mworker_pipe[0]].owner = mworker_pipe;
|
||||
fdtab[mworker_pipe[0]].iocb = mworker_pipe_handler;
|
||||
fd_insert(mworker_pipe[0]);
|
||||
fd_insert(mworker_pipe[0], MAX_THREADS_MASK);
|
||||
fd_want_recv(mworker_pipe[0]);
|
||||
}
|
||||
|
||||
|
@ -50,7 +50,7 @@ int thread_sync_init(unsigned long mask)
|
||||
|
||||
fdtab[rfd].owner = NULL;
|
||||
fdtab[rfd].iocb = thread_sync_io_handler;
|
||||
fd_insert(rfd);
|
||||
fd_insert(rfd, MAX_THREADS_MASK);
|
||||
|
||||
all_threads_mask = mask;
|
||||
return 0;
|
||||
|
@ -1107,7 +1107,7 @@ int tcp_bind_listener(struct listener *listener, char *errmsg, int errlen)
|
||||
|
||||
fdtab[fd].owner = listener; /* reference the listener instead of a task */
|
||||
fdtab[fd].iocb = listener->proto->accept;
|
||||
fd_insert(fd);
|
||||
fd_insert(fd, MAX_THREADS_MASK);
|
||||
|
||||
tcp_return:
|
||||
if (msg && errlen) {
|
||||
|
@ -332,9 +332,9 @@ static int uxst_bind_listener(struct listener *listener, char *errmsg, int errle
|
||||
listener->state = LI_LISTEN;
|
||||
|
||||
/* the function for the accept() event */
|
||||
fd_insert(fd);
|
||||
fdtab[fd].iocb = listener->proto->accept;
|
||||
fdtab[fd].owner = listener; /* reference the listener instead of a task */
|
||||
fd_insert(fd, MAX_THREADS_MASK);
|
||||
return err;
|
||||
|
||||
err_rename:
|
||||
|
@ -459,7 +459,7 @@ static void inline ssl_async_process_fds(struct connection *conn, SSL *ssl)
|
||||
afd = add_fd[i];
|
||||
fdtab[afd].owner = conn;
|
||||
fdtab[afd].iocb = ssl_async_fd_handler;
|
||||
fd_insert(afd);
|
||||
fd_insert(afd, tid_bit);
|
||||
}
|
||||
|
||||
num_add_fds = 0;
|
||||
|
Loading…
x
Reference in New Issue
Block a user