diff --git a/src/util/vircommand.c b/src/util/vircommand.c index db7dbca9bc..bf3f816fb5 100644 --- a/src/util/vircommand.c +++ b/src/util/vircommand.c @@ -42,6 +42,7 @@ #include "virpidfile.h" #include "virprocess.h" #include "virbuffer.h" +#include "virthread.h" #define VIR_FROM_THIS VIR_FROM_NONE @@ -80,15 +81,13 @@ struct _virCommand { char **errbuf; int infd; + int inpipe; int outfd; int errfd; int *outfdptr; int *errfdptr; - size_t inbufOffset; - int inWatch; - int outWatch; - int errWatch; + virThreadPtr asyncioThread; bool handshake; int handshakeWait[2]; @@ -784,8 +783,7 @@ virCommandNewArgs(const char *const*args) cmd->handshakeNotify[0] = -1; cmd->handshakeNotify[1] = -1; - cmd->infd = cmd->outfd = cmd->errfd = -1; - cmd->inWatch = cmd->outWatch = cmd->errWatch = -1; + cmd->infd = cmd->inpipe = cmd->outfd = cmd->errfd = -1; cmd->pid = -1; virCommandAddArgSet(cmd, args); @@ -1703,19 +1701,17 @@ virCommandToString(virCommandPtr cmd) * Manage input and output to the child process. */ static int -virCommandProcessIO(virCommandPtr cmd, int *inpipe) +virCommandProcessIO(virCommandPtr cmd) { - int infd = -1, outfd = -1, errfd = -1; + int outfd = -1, errfd = -1; size_t inlen = 0, outlen = 0, errlen = 0; size_t inoff = 0; int ret = 0; /* With an input buffer, feed data to child * via pipe */ - if (cmd->inbuf) { + if (cmd->inbuf) inlen = strlen(cmd->inbuf); - infd = *inpipe; - } /* With out/err buffer, the outfd/errfd have been filled with an * FD for us. Guarantee an allocated string with partial results @@ -1744,8 +1740,8 @@ virCommandProcessIO(virCommandPtr cmd, int *inpipe) struct pollfd fds[3]; int nfds = 0; - if (infd != -1) { - fds[nfds].fd = infd; + if (cmd->inpipe != -1) { + fds[nfds].fd = cmd->inpipe; fds[nfds].events = POLLOUT; fds[nfds].revents = 0; nfds++; @@ -1817,21 +1813,19 @@ virCommandProcessIO(virCommandPtr cmd, int *inpipe) } if (fds[i].revents & (POLLOUT | POLLERR) && - fds[i].fd == infd) { + fds[i].fd == cmd->inpipe) { int done; /* Coverity 5.3.0 can't see that we only get here if * infd is in the set because it was non-negative. */ sa_assert(infd != -1); - done = write(infd, cmd->inbuf + inoff, + done = write(cmd->inpipe, cmd->inbuf + inoff, inlen - inoff); if (done < 0) { if (errno == EPIPE) { VIR_DEBUG("child closed stdin early, ignoring EPIPE " - "on fd %d", infd); - if (VIR_CLOSE(*inpipe) < 0) - VIR_DEBUG("ignoring failed close on fd %d", infd); - infd = -1; + "on fd %d", cmd->inpipe); + VIR_FORCE_CLOSE(cmd->inpipe); } else if (errno != EINTR && errno != EAGAIN) { virReportSystemError(errno, "%s", _("unable to write to child input")); @@ -1839,11 +1833,8 @@ virCommandProcessIO(virCommandPtr cmd, int *inpipe) } } else { inoff += done; - if (inoff == inlen) { - if (VIR_CLOSE(*inpipe) < 0) - VIR_DEBUG("ignoring failed close on fd %d", infd); - infd = -1; - } + if (inoff == inlen) + VIR_FORCE_CLOSE(cmd->inpipe); } } } @@ -1914,7 +1905,6 @@ virCommandRun(virCommandPtr cmd, int *exitstatus) int ret = 0; char *outbuf = NULL; char *errbuf = NULL; - int infd[2] = { -1, -1 }; struct stat st; bool string_io; bool async_io = false; @@ -1960,18 +1950,6 @@ virCommandRun(virCommandPtr cmd, int *exitstatus) } } - /* If we have an input buffer, we need - * a pipe to feed the data to the child */ - if (cmd->inbuf) { - if (pipe2(infd, O_CLOEXEC) < 0) { - virReportSystemError(errno, "%s", - _("unable to open pipe")); - cmd->has_error = -1; - return -1; - } - cmd->infd = infd[0]; - } - /* If caller requested the same string for stdout and stderr, then * merge those into one string. */ if (cmd->outbuf && cmd->outbuf == cmd->errbuf) { @@ -1999,23 +1977,14 @@ virCommandRun(virCommandPtr cmd, int *exitstatus) cmd->flags |= VIR_EXEC_RUN_SYNC; if (virCommandRunAsync(cmd, NULL) < 0) { - if (cmd->inbuf) { - tmpfd = infd[0]; - if (VIR_CLOSE(infd[0]) < 0) - VIR_DEBUG("ignoring failed close on fd %d", tmpfd); - tmpfd = infd[1]; - if (VIR_CLOSE(infd[1]) < 0) - VIR_DEBUG("ignoring failed close on fd %d", tmpfd); - } cmd->has_error = -1; return -1; } - tmpfd = infd[0]; - if (VIR_CLOSE(infd[0]) < 0) - VIR_DEBUG("ignoring failed close on fd %d", tmpfd); - if (string_io) - ret = virCommandProcessIO(cmd, &infd[1]); + if (string_io) { + VIR_FORCE_CLOSE(cmd->infd); + ret = virCommandProcessIO(cmd); + } if (virCommandWait(cmd, exitstatus) < 0) ret = -1; @@ -2031,11 +2000,7 @@ virCommandRun(virCommandPtr cmd, int *exitstatus) /* Reset any capturing, in case caller runs * this identical command again */ - if (cmd->inbuf) { - tmpfd = infd[1]; - if (VIR_CLOSE(infd[1]) < 0) - VIR_DEBUG("ignoring failed close on fd %d", tmpfd); - } + VIR_FORCE_CLOSE(cmd->inpipe); if (cmd->outbuf == &outbuf) { tmpfd = cmd->outfd; if (VIR_CLOSE(cmd->outfd) < 0) @@ -2135,177 +2100,13 @@ virCommandHook(void *data) static void -virCommandHandleReadWrite(int watch, int fd, int events, void *opaque) +virCommandDoAsyncIOHelper(void *opaque) { - virCommandPtr cmd = (virCommandPtr) opaque; - char ***bufptr = NULL; - char buf[1024]; - ssize_t nread, nwritten; - size_t len = 0; - int *watchPtr = NULL; - bool eof = false; - int *fdptr = NULL, **fdptrptr = NULL; - - VIR_DEBUG("watch=%d fd=%d events=%d", watch, fd, events); - errno = 0; - - if (watch == cmd->inWatch) { - watchPtr = &cmd->inWatch; - fdptr = &cmd->infd; - - if (events & VIR_EVENT_HANDLE_WRITABLE) { - len = strlen(cmd->inbuf); - - while (true) { - nwritten = write(fd, cmd->inbuf + cmd->inbufOffset, - len - cmd->inbufOffset); - if (nwritten < 0) { - if (errno != EAGAIN && errno != EINTR) { - virReportSystemError(errno, - _("Unable to write command's " - "input to FD %d"), - fd); - eof = true; - } - break; - } - - if (nwritten == 0) { - eof = true; - break; - } - - cmd->inbufOffset += nwritten; - if (cmd->inbufOffset == len) { - VIR_FORCE_CLOSE(cmd->infd); - eof = true; - break; - } - } - - } - } else { - if (watch == cmd->outWatch) { - watchPtr = &cmd->outWatch; - bufptr = &cmd->outbuf; - fdptr = &cmd->outfd; - fdptrptr = &cmd->outfdptr; - } else { - watchPtr = &cmd->errWatch; - bufptr = &cmd->errbuf; - fdptr = &cmd->errfd; - fdptrptr = &cmd->errfdptr; - } - - if (events & VIR_EVENT_HANDLE_READABLE) { - if (**bufptr) - len = strlen(**bufptr); - - while (true) { - nread = read(fd, buf, sizeof(buf)); - if (nread < 0) { - if (errno != EAGAIN && errno != EINTR) { - virReportSystemError(errno, - _("unable to read command's " - "output from FD %d"), - fd); - eof = true; - } - break; - } - - if (nread == 0) { - eof = true; - break; - } - - if (VIR_REALLOC_N(**bufptr, len + nread + 1) < 0) { - virReportOOMError(); - break; - } - - memcpy(**bufptr + len, buf, nread); - (**bufptr)[len + nread] = '\0'; - } - - } + virCommandPtr cmd = opaque; + if (virCommandProcessIO(cmd) < 0) { + /* If something went wrong, save errno or -1*/ + cmd->has_error = errno ? errno : -1; } - - if (eof || (events & VIR_EVENT_HANDLE_HANGUP) || - (events & VIR_EVENT_HANDLE_ERROR)) { - virEventRemoveHandle(watch); - - *watchPtr = -1; - VIR_FORCE_CLOSE(*fdptr); - if (bufptr) - *bufptr = NULL; - if (fdptrptr) - *fdptrptr = NULL; - } -} - - -static int -virCommandRegisterEventLoop(virCommandPtr cmd) -{ - int ret = -1; - - if (cmd->inbuf && - (cmd->inWatch = virEventAddHandle(cmd->infd, - VIR_EVENT_HANDLE_WRITABLE | - VIR_EVENT_HANDLE_HANGUP | - VIR_EVENT_HANDLE_ERROR, - virCommandHandleReadWrite, - cmd, NULL)) < 0) { - virReportError(VIR_ERR_INTERNAL_ERROR, - _("Unable to register infd %d in the event loop"), - cmd->infd); - goto cleanup; - } - - if (cmd->outbuf && cmd->outfdptr == &cmd->outfd && - (cmd->outWatch = virEventAddHandle(cmd->outfd, - VIR_EVENT_HANDLE_READABLE | - VIR_EVENT_HANDLE_HANGUP | - VIR_EVENT_HANDLE_ERROR, - virCommandHandleReadWrite, - cmd, NULL)) < 0) { - virReportError(VIR_ERR_INTERNAL_ERROR, - _("Unable to register outfd %d in the event loop"), - cmd->outfd); - - if (cmd->inWatch != -1) { - virEventRemoveHandle(cmd->inWatch); - cmd->inWatch = -1; - } - goto cleanup; - } - - if (cmd->errbuf && cmd->errfdptr == &cmd->errfd && - (cmd->errWatch = virEventAddHandle(cmd->errfd, - VIR_EVENT_HANDLE_READABLE | - VIR_EVENT_HANDLE_HANGUP | - VIR_EVENT_HANDLE_ERROR, - virCommandHandleReadWrite, - cmd, NULL)) < 0) { - virReportError(VIR_ERR_INTERNAL_ERROR, - _("Unable to register errfd %d in the event loop"), - cmd->errfd); - if (cmd->inWatch != -1) { - virEventRemoveHandle(cmd->inWatch); - cmd->inWatch = -1; - } - if (cmd->outWatch != -1) { - virEventRemoveHandle(cmd->outWatch); - cmd->outWatch = -1; - } - goto cleanup; - } - - ret = 0; - -cleanup: - return ret; } @@ -2332,7 +2133,7 @@ cleanup: int virCommandRunAsync(virCommandPtr cmd, pid_t *pid) { - int ret; + int ret = -1; char *str; int i; bool synchronous = false; @@ -2351,23 +2152,21 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) synchronous = cmd->flags & VIR_EXEC_RUN_SYNC; cmd->flags &= ~VIR_EXEC_RUN_SYNC; - /* Buffer management can only be requested via virCommandRun, unless help - * from the event loop has been requested via virCommandDoAsyncIO. */ - if (cmd->flags & VIR_EXEC_ASYNC_IO) { - /* If we have an input buffer, we need - * a pipe to feed the data to the child */ - if (cmd->inbuf && cmd->infd == -1) { - if (pipe2(infd, O_CLOEXEC) < 0) { - virReportSystemError(errno, "%s", - _("unable to open pipe")); - cmd->has_error = -1; - return -1; - } - cmd->infd = infd[0]; + /* Buffer management can only be requested via virCommandRun or + * virCommandDoAsyncIO. */ + if (cmd->inbuf && cmd->infd == -1 && + (synchronous || cmd->flags & VIR_EXEC_ASYNC_IO)) { + if (pipe2(infd, O_CLOEXEC) < 0) { + virReportSystemError(errno, "%s", + _("unable to open pipe")); + cmd->has_error = -1; + return -1; } + cmd->infd = infd[0]; + cmd->inpipe = infd[1]; } else if ((cmd->inbuf && cmd->infd == -1) || - (cmd->outbuf && cmd->outfdptr != &cmd->outfd) || - (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) { + (cmd->outbuf && cmd->outfdptr != &cmd->outfd) || + (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("cannot mix string I/O with asynchronous command")); return -1; @@ -2377,24 +2176,24 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) virReportError(VIR_ERR_INTERNAL_ERROR, _("command is already running as pid %lld"), (long long) cmd->pid); - return -1; + goto cleanup; } if (!synchronous && (cmd->flags & VIR_EXEC_DAEMON)) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("daemonized command cannot use virCommandRunAsync")); - return -1; + goto cleanup; } if (cmd->pwd && (cmd->flags & VIR_EXEC_DAEMON)) { virReportError(VIR_ERR_INTERNAL_ERROR, _("daemonized command cannot set working directory %s"), cmd->pwd); - return -1; + goto cleanup; } if (cmd->pidfile && !(cmd->flags & VIR_EXEC_DAEMON)) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("creation of pid file requires daemonized command")); - return -1; + goto cleanup; } str = virCommandToString(cmd); @@ -2430,15 +2229,27 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) cmd->reap = true; if (ret == 0 && cmd->flags & VIR_EXEC_ASYNC_IO) { - cmd->flags &= ~VIR_EXEC_ASYNC_IO; - if (cmd->inbuf && cmd->infd != -1) { - /* close the read end of infd and replace it with the write end */ + if (cmd->inbuf) VIR_FORCE_CLOSE(cmd->infd); - cmd->infd = infd[1]; + /* clear any error so we can catch if the helper thread reports one */ + cmd->has_error = 0; + if (VIR_ALLOC(cmd->asyncioThread) < 0 || + virThreadCreate(cmd->asyncioThread, true, + virCommandDoAsyncIOHelper, cmd) < 0) { + virReportSystemError(errno, "%s", + _("Unable to create thread " + "to process command's IO")); + VIR_FREE(cmd->asyncioThread); + virCommandAbort(cmd); + ret = -1; } - ret = virCommandRegisterEventLoop(cmd); } +cleanup: + if (ret < 0) { + VIR_FORCE_CLOSE(cmd->infd); + VIR_FORCE_CLOSE(cmd->inpipe); + } return ret; } @@ -2459,7 +2270,6 @@ virCommandWait(virCommandPtr cmd, int *exitstatus) { int ret; int status = 0; - const int events = VIR_EVENT_HANDLE_READABLE | VIR_EVENT_HANDLE_HANGUP; if (!cmd ||cmd->has_error == ENOMEM) { virReportOOMError(); @@ -2484,24 +2294,20 @@ virCommandWait(virCommandPtr cmd, int *exitstatus) * guarantee that virProcessWait only fails due to failure to wait, * and repeat the exitstatus check code ourselves. */ ret = virProcessWait(cmd->pid, exitstatus ? exitstatus : &status); - - if (cmd->inWatch != -1) { - virEventRemoveHandle(cmd->inWatch); - cmd->inWatch = -1; + if (cmd->flags & VIR_EXEC_ASYNC_IO) { + cmd->flags &= ~VIR_EXEC_ASYNC_IO; + virThreadJoin(cmd->asyncioThread); + VIR_FREE(cmd->asyncioThread); + VIR_FORCE_CLOSE(cmd->inpipe); + if (cmd->has_error) { + const char *msg = _("Error while processing command's IO"); + if (cmd->has_error < 0) + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", msg); + else + virReportSystemError(cmd->has_error, "%s", msg); + ret = -1; + } } - - if (cmd->outWatch != -1) { - virEventRemoveHandle(cmd->outWatch); - virCommandHandleReadWrite(cmd->outWatch, cmd->outfd, events, cmd); - cmd->outWatch = -1; - } - - if (cmd->errWatch != -1) { - virEventRemoveHandle(cmd->errWatch); - virCommandHandleReadWrite(cmd->errWatch, cmd->errfd, events, cmd); - cmd->errWatch = -1; - } - if (ret == 0) { cmd->pid = -1; cmd->reap = false; @@ -2719,6 +2525,10 @@ virCommandFree(virCommandPtr cmd) VIR_FORCE_CLOSE(cmd->transfer[i]); } + if (cmd->asyncioThread) { + virThreadJoin(cmd->asyncioThread); + VIR_FREE(cmd->asyncioThread); + } VIR_FREE(cmd->inbuf); VIR_FORCE_CLOSE(cmd->outfd); VIR_FORCE_CLOSE(cmd->errfd);