diff --git a/ctdb/common/line.c b/ctdb/common/line.c
new file mode 100644
index 00000000000..c4c6726875b
--- /dev/null
+++ b/ctdb/common/line.c
@@ -0,0 +1,145 @@
+/*
+ Line based I/O over fds
+
+ Copyright (C) Amitay Isaacs 2018
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see .
+*/
+
+#include "replace.h"
+
+#include
+
+#include "lib/util/sys_rw.h"
+
+#include "common/line.h"
+
+struct line_read_state {
+ line_process_fn_t callback;
+ void *private_data;
+ char *buf;
+ size_t hint, len, offset;
+ int num_lines;
+};
+
+static bool line_read_one(char *buf, size_t start, size_t len, size_t *pos)
+{
+ size_t i;
+
+ for (i=start; ibuf, start, state->offset, &pos);
+ if (! ok) {
+ break;
+ }
+
+ state->buf[pos] = '\0';
+ state->num_lines += 1;
+
+ ret = state->callback(state->buf + start, state->private_data);
+ if (ret != 0) {
+ return ret;
+ }
+
+ start = pos+1;
+ }
+
+ if (pos > 0) {
+ if (pos+1 < state->offset) {
+ memmove(state->buf,
+ state->buf + pos+1,
+ state->offset - (pos+1));
+ }
+ state->offset -= (pos+1);
+ }
+
+ return 0;
+}
+
+int line_read(int fd,
+ size_t length,
+ TALLOC_CTX *mem_ctx,
+ line_process_fn_t callback,
+ void *private_data,
+ int *num_lines)
+{
+ struct line_read_state state;
+
+ if (length < 32) {
+ length = 32;
+ }
+
+ state = (struct line_read_state) {
+ .callback = callback,
+ .private_data = private_data,
+ .hint = length,
+ };
+
+ while (1) {
+ ssize_t n;
+ int ret;
+
+ if (state.offset == state.len) {
+ state.len += state.hint;
+ state.buf = talloc_realloc_size(mem_ctx,
+ state.buf,
+ state.len);
+ if (state.buf == NULL) {
+ return ENOMEM;
+ }
+ }
+
+ n = sys_read(fd,
+ state.buf + state.offset,
+ state.len - state.offset);
+ if (n < 0) {
+ return errno;
+ }
+ if (n == 0) {
+ break;
+ }
+
+ state.offset += n;
+
+ ret = line_read_process(&state);
+ if (ret != 0) {
+ if (num_lines != NULL) {
+ *num_lines = state.num_lines;
+ }
+ return ret;
+ }
+ }
+
+ if (num_lines != NULL) {
+ *num_lines = state.num_lines;
+ }
+ return 0;
+}
diff --git a/ctdb/common/line.h b/ctdb/common/line.h
new file mode 100644
index 00000000000..6b67f1e92e1
--- /dev/null
+++ b/ctdb/common/line.h
@@ -0,0 +1,62 @@
+/*
+ Line based I/O over fds
+
+ Copyright (C) Amitay Isaacs 2018
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see .
+*/
+
+#ifndef __CTDB_LINE_H__
+#define __CTDB_LINE_H__
+
+#include
+
+/**
+ * @file line.h
+ *
+ * @brief Line based I/O over pipes and sockets
+ */
+
+/**
+ * @brief The callback routine called to process a line
+ *
+ * @param[in] line The line read
+ * @param[in] private_data Private data for callback
+ * @return 0 to continue processing lines, non-zero to stop reading
+ */
+typedef int (*line_process_fn_t)(char *line, void *private_data);
+
+/**
+ * @brief Read a line (terminated by \n or \0)
+ *
+ * If there is any read error on fd, then errno will be returned.
+ * If callback function returns a non-zero value, then that value will be
+ * returned.
+ *
+ * @param[in] fd The file descriptor
+ * @param[in] length The expected length of a line (this is only a hint)
+ * @param[in] mem_ctx Talloc memory context
+ * @param[in] callback Callback function called when a line is read
+ * @param[in] private_data Private data for callback
+ * @param[out] num_lines Number of lines read so far
+ * @return 0 on on success, errno on failure
+ */
+int line_read(int fd,
+ size_t length,
+ TALLOC_CTX *mem_ctx,
+ line_process_fn_t callback,
+ void *private_data,
+ int *num_lines);
+
+#endif /* __CTDB_LINE_H__ */
diff --git a/ctdb/tests/cunit/line_test_001.sh b/ctdb/tests/cunit/line_test_001.sh
new file mode 100755
index 00000000000..991d01a24e7
--- /dev/null
+++ b/ctdb/tests/cunit/line_test_001.sh
@@ -0,0 +1,90 @@
+#!/bin/sh
+
+. "${TEST_SCRIPTS_DIR}/unit.sh"
+
+tfile="${TEST_VAR_DIR}/line.$$"
+
+remove_files ()
+{
+ rm -f "$tfile"
+}
+
+test_cleanup remove_files
+
+> "$tfile"
+
+ok_null
+unit_test line_test "$tfile"
+
+printf "\0" > "$tfile"
+
+required_result 1 < "$tfile"
+
+ok_null
+unit_test line_test "$tfile"
+
+cat < "$tfile"
+hello
+world
+EOF
+
+required_result 2 << EOF
+hello
+world
+EOF
+unit_test line_test "$tfile"
+
+required_result 2 << EOF
+hello
+world
+EOF
+unit_test line_test "$tfile"
+
+cat < "$tfile"
+This is a really long long line full of random words and hopefully it will be read properly by the line test program and identified as a single line
+EOF
+
+required_result 1 < "$tfile"
+line number one
+line number two
+line number one
+line number two
+line number one
+EOF
+
+required_result 5 < "$tfile"
+this is line number one
+this is line number two
+this is line number three
+this is line number four
+this is line number five
+EOF
+
+required_result 5 <.
+*/
+
+#include "replace.h"
+#include "system/filesys.h"
+
+#include
+#include
+
+#include "common/line.c"
+
+static int line_print(char *line, void *private_data)
+{
+ printf("%s\n", line);
+ fflush(stdout);
+
+ return 0;
+}
+
+int main(int argc, const char **argv)
+{
+ TALLOC_CTX *mem_ctx;
+ size_t hint = 32;
+ pid_t pid;
+ int ret, lines = 0;
+ int pipefd[2];
+
+ if (argc < 2 || argc > 3) {
+ fprintf(stderr, "Usage: %s []\n", argv[0]);
+ exit(1);
+ }
+
+ if (argc == 3) {
+ long value;
+
+ value = atol(argv[2]);
+ assert(value > 0);
+ hint = value;
+ }
+
+ ret = pipe(pipefd);
+ assert(ret == 0);
+
+ pid = fork();
+ assert(pid != -1);
+
+ if (pid == 0) {
+ char buffer[16];
+ ssize_t n, n2;
+ int fd;
+
+ close(pipefd[0]);
+
+ fd = open(argv[1], O_RDONLY);
+ assert(fd != -1);
+
+ while (1) {
+ n = read(fd, buffer, sizeof(buffer));
+ assert(n >= 0 && n <= sizeof(buffer));
+
+ if (n == 0) {
+ break;
+ }
+
+ n2 = write(pipefd[1], buffer, n);
+ assert(n2 == n);
+ }
+
+ close(pipefd[1]);
+ close(fd);
+
+ exit(0);
+ }
+
+ close(pipefd[1]);
+
+ mem_ctx = talloc_new(NULL);
+ assert(mem_ctx != NULL);
+
+ ret = line_read(pipefd[0], hint, NULL, line_print, NULL, &lines);
+ assert(ret == 0);
+
+ talloc_free(mem_ctx);
+
+ return lines;
+}
diff --git a/ctdb/wscript b/ctdb/wscript
index c26bd8c0d9a..6d69545b6aa 100644
--- a/ctdb/wscript
+++ b/ctdb/wscript
@@ -404,7 +404,7 @@ def build(bld):
pidfile.c run_proc.c
hash_count.c run_event.c
sock_client.c version.c
- cmdline.c path.c conf.c
+ cmdline.c path.c conf.c line.c
'''),
deps='''samba-util sys_rw tevent-util
replace talloc tevent tdb popt''')
@@ -868,6 +868,7 @@ def build(bld):
'run_event_test',
'cmdline_test',
'conf_test',
+ 'line_test',
]
for target in ctdb_unit_tests: