diff --git a/lib/compression/pycompression.c b/lib/compression/pycompression.c
new file mode 100644
index 00000000000..00a207008fb
--- /dev/null
+++ b/lib/compression/pycompression.c
@@ -0,0 +1,304 @@
+/*
+ Samba Unix SMB/CIFS implementation.
+
+ Python bindings for compression functions.
+
+ Copyright (C) Petr Viktorin 2015
+ Copyright (C) Douglas Bagnall 2022
+
+ ** NOTE! The following LGPL license applies to the talloc
+ ** library. This does NOT imply that all of Samba is released
+ ** under the LGPL
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 3 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, see .
+*/
+
+#include "includes.h"
+#include
+#include
+#include "lzxpress.h"
+#include "lzxpress_huffman.h"
+
+/* CompressionError is filled out in module init */
+static PyObject *CompressionError = NULL;
+
+static PyObject *plain_compress(PyObject *mod, PyObject *args)
+{
+ uint8_t *src = NULL;
+ Py_ssize_t src_len;
+ char *dest = NULL;
+ Py_ssize_t dest_len;
+ PyObject *dest_obj = NULL;
+ size_t alloc_len;
+ int ret;
+
+ if (!PyArg_ParseTuple(args, "s#", &src, &src_len)) {
+ return NULL;
+ }
+
+ /*
+ * 9/8 + 4 is the worst case growth, but we add room.
+ *
+ * alloc_len can't overflow as src_len is ssize_t while alloc_len is
+ * size_t.
+ */
+ alloc_len = src_len + src_len / 8 + 500;
+
+ dest_obj = PyBytes_FromStringAndSize(NULL, alloc_len);
+ if (dest_obj == NULL) {
+ return NULL;
+ }
+ dest = PyBytes_AS_STRING(dest_obj);
+
+ dest_len = lzxpress_compress(src,
+ src_len,
+ (uint8_t *)dest,
+ alloc_len);
+ if (dest_len < 0) {
+ PyErr_SetString(CompressionError, "unable to compress data");
+ Py_DECREF(dest_obj);
+ return NULL;
+ }
+
+ ret = _PyBytes_Resize(&dest_obj, dest_len);
+ if (ret != 0) {
+ /*
+ * Don't try to free dest_obj, as we're in deep MemoryError
+ * territory here.
+ */
+ return NULL;
+ }
+ return dest_obj;
+}
+
+
+static PyObject *plain_decompress(PyObject *mod, PyObject *args)
+{
+ uint8_t *src = NULL;
+ Py_ssize_t src_len;
+ char *dest = NULL;
+ Py_ssize_t dest_len;
+ PyObject *dest_obj = NULL;
+ Py_ssize_t alloc_len = 0;
+ Py_ssize_t given_len = 0;
+ int ret;
+
+ if (!PyArg_ParseTuple(args, "s#|n", &src, &src_len, &given_len)) {
+ return NULL;
+ }
+ if (given_len != 0) {
+ /*
+ * With plain decompression, we don't *need* the exact output
+ * size (as we do with LZ77+Huffman), but it certainly helps
+ * when guessing the size.
+ */
+ alloc_len = given_len;
+ } else if (src_len > UINT32_MAX) {
+ /*
+ * The underlying decompress function will reject this, but by
+ * checking here we can give a better message and be clearer
+ * about overflow risks.
+ *
+ * Note, the limit is actually the smallest of UINT32_MAX and
+ * SSIZE_MAX, but src_len is ssize_t so it already can't
+ * exceed that.
+ */
+ PyErr_Format(CompressionError,
+ "The maximum size for compressed data is 4GB "
+ "cannot decompress %zu bytes.", src_len);
+ } else {
+ /*
+ * The data can expand massively (though not beyond the
+ * 4GB limit) so we guess a big number for small inputs
+ * (we expect small inputs), and a relatively conservative
+ * number for big inputs.
+ */
+ if (src_len <= 3333333) {
+ alloc_len = 10000000;
+ } else if (src_len / 3 >= UINT32_MAX) {
+ alloc_len = UINT32_MAX;
+ } else {
+ alloc_len = src_len * 3;
+ }
+ }
+
+ dest_obj = PyBytes_FromStringAndSize(NULL, alloc_len);
+ if (dest_obj == NULL) {
+ return NULL;
+ }
+ dest = PyBytes_AS_STRING(dest_obj);
+
+ dest_len = lzxpress_decompress(src,
+ src_len,
+ (uint8_t *)dest,
+ alloc_len);
+ if (dest_len < 0) {
+ if (alloc_len == given_len) {
+ PyErr_Format(CompressionError,
+ "unable to decompress data into a buffer "
+ "of %zd bytes.", alloc_len);
+ } else {
+ PyErr_Format(CompressionError,
+ "unable to decompress data into a buffer "
+ "of %zd bytes. If you know the length, "
+ "supply it as the second argument.",
+ alloc_len);
+ }
+ Py_DECREF(dest_obj);
+ return NULL;
+ }
+
+ ret = _PyBytes_Resize(&dest_obj, dest_len);
+ if (ret != 0) {
+ /*
+ * Don't try to free dest_obj, as we're in deep MemoryError
+ * territory here.
+ */
+ return NULL;
+ }
+ return dest_obj;
+}
+
+
+
+static PyObject *huffman_compress(PyObject *mod, PyObject *args)
+{
+ uint8_t *src = NULL;
+ Py_ssize_t src_len;
+ char *dest = NULL;
+ Py_ssize_t dest_len;
+ PyObject *dest_obj = NULL;
+ size_t alloc_len;
+ int ret;
+ struct lzxhuff_compressor_mem cmp_mem;
+
+ if (!PyArg_ParseTuple(args, "s#", &src, &src_len)) {
+ return NULL;
+ }
+ /*
+ * worst case is roughly 256 per 64k or less.
+ *
+ * alloc_len won't overflow as src_len is ssize_t while alloc_len is
+ * size_t.
+ */
+ alloc_len = src_len + src_len / 8 + 500;
+
+ dest_obj = PyBytes_FromStringAndSize(NULL, alloc_len);
+ if (dest_obj == NULL) {
+ return NULL;
+ }
+ dest = PyBytes_AS_STRING(dest_obj);
+
+ dest_len = lzxpress_huffman_compress(&cmp_mem,
+ src,
+ src_len,
+ (uint8_t *)dest,
+ alloc_len);
+ if (dest_len < 0) {
+ PyErr_SetString(CompressionError, "unable to compress data");
+ Py_DECREF(dest_obj);
+ return NULL;
+ }
+
+ ret = _PyBytes_Resize(&dest_obj, dest_len);
+ if (ret != 0) {
+ return NULL;
+ }
+ return dest_obj;
+}
+
+
+static PyObject *huffman_decompress(PyObject *mod, PyObject *args)
+{
+ uint8_t *src = NULL;
+ Py_ssize_t src_len;
+ char *dest = NULL;
+ Py_ssize_t dest_len;
+ PyObject *dest_obj = NULL;
+ Py_ssize_t given_len = 0;
+ /*
+ * Here it is always necessary to supply the exact length.
+ */
+
+ if (!PyArg_ParseTuple(args, "s#n", &src, &src_len, &given_len)) {
+ return NULL;
+ }
+
+ dest_obj = PyBytes_FromStringAndSize(NULL, given_len);
+ if (dest_obj == NULL) {
+ return NULL;
+ }
+ dest = PyBytes_AS_STRING(dest_obj);
+
+ dest_len = lzxpress_huffman_decompress(src,
+ src_len,
+ (uint8_t *)dest,
+ given_len);
+ if (dest_len != given_len) {
+ PyErr_Format(CompressionError,
+ "unable to decompress data into a %zd bytes.",
+ given_len);
+ Py_DECREF(dest_obj);
+ return NULL;
+ }
+ /* no resize here */
+ return dest_obj;
+}
+
+
+static PyMethodDef mod_methods[] = {
+ { "plain_compress", (PyCFunction)plain_compress, METH_VARARGS,
+ "compress bytes using lzxpress plain compression"},
+ { "plain_decompress", (PyCFunction)plain_decompress, METH_VARARGS,
+ "decompress lzxpress plain compressed bytes"},
+ { "huffman_compress", (PyCFunction)huffman_compress, METH_VARARGS,
+ "compress bytes using lzxpress plain compression"},
+ { "huffman_decompress", (PyCFunction)huffman_decompress, METH_VARARGS,
+ "decompress lzxpress plain compressed bytes"},
+ {0}
+};
+
+
+#define MODULE_DOC PyDoc_STR("LZXpress compresssion/decompression bindings")
+
+static struct PyModuleDef moduledef = {
+ PyModuleDef_HEAD_INIT,
+ .m_name = "compression",
+ .m_doc = MODULE_DOC,
+ .m_size = -1,
+ .m_methods = mod_methods,
+};
+
+
+static PyObject *module_init(void)
+{
+ PyObject *m = PyModule_Create(&moduledef);
+ if (m == NULL) {
+ return NULL;
+ }
+
+ CompressionError = PyErr_NewException(
+ "compression.CompressionError",
+ PyExc_Exception,
+ NULL);
+ PyModule_AddObject(m, "CompressionError", CompressionError);
+
+ return m;
+}
+
+PyMODINIT_FUNC PyInit_compression(void);
+PyMODINIT_FUNC PyInit_compression(void)
+{
+ return module_init();
+}
diff --git a/lib/compression/wscript_build b/lib/compression/wscript_build
index 1ab208cf18d..61fe4a9808e 100644
--- a/lib/compression/wscript_build
+++ b/lib/compression/wscript_build
@@ -18,3 +18,8 @@ bld.SAMBA_BINARY('test_lzxpress_plain',
' samba-util'),
local_include=False,
for_selftest=True)
+
+bld.SAMBA_PYTHON('pycompression',
+ 'pycompression.c',
+ deps='LZXPRESS',
+ realname='samba/compression.so')
diff --git a/python/samba/tests/compression.py b/python/samba/tests/compression.py
new file mode 100644
index 00000000000..48f8c874cba
--- /dev/null
+++ b/python/samba/tests/compression.py
@@ -0,0 +1,212 @@
+# Unix SMB/CIFS implementation.
+# Copyright © Catalyst
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from unittest import TestSuite
+import os
+import random
+
+from samba.tests import TestCase
+from samba import compression
+
+
+TEST_DIR = "testdata/compression"
+
+
+class BaseCompressionTest(TestCase):
+ def round_trip(self, data, size_delta=0):
+ """Compress, decompress, assert equality with original.
+
+ If size_delta is None, no size is given to decompress. This
+ should fail with the Huffman varient and succeed with plain.
+ Otherwise size_delta is added to the gven size; if negative,
+ we'd expect a failure, with plain compression a positive delta
+ will succeed.
+ """
+
+ compressed = self.compress(data)
+ if size_delta is None:
+ decompressed = self.decompress(compressed)
+ else:
+ decomp_size = len(data) + size_delta
+ decompressed = self.decompress(compressed, decomp_size)
+
+ if isinstance(data, str):
+ data = data.encode()
+
+ self.assertEqual(data, decompressed)
+ return compressed
+
+ def decompress_file(self, fn):
+ decomp_fn = os.path.join(TEST_DIR,
+ "decompressed",
+ fn + ".decomp")
+ comp_fn = os.path.join(TEST_DIR,
+ self.compressed_dir,
+ fn + self.compressed_suffix)
+
+ with open(decomp_fn, 'rb') as f:
+ decomp_expected = f.read()
+ with open(comp_fn, 'rb') as f:
+ comp = f.read()
+
+ decompressed = self.decompress(comp, len(decomp_expected))
+
+ self.assertEqual(decomp_expected, decompressed)
+
+
+class LzxpressPlainCompressionTest(BaseCompressionTest):
+ compress = compression.plain_compress
+ decompress = compression.plain_decompress
+ compressed_dir = "compressed-plain"
+ compressed_suffix = ".lzplain"
+
+ def test_round_trip_aaa_str(self):
+ s = 'a' * 150000
+ self.round_trip(s)
+
+ def test_round_trip_aaa_bytes(self):
+ s = b'a' * 150000
+ self.round_trip(s)
+
+ def test_round_trip_aaa_short(self):
+ s = b'a' * 150000
+
+ # this'll fail because the match for 'aaa...' will run
+ # past the end of the buffer
+ self.assertRaises(compression.CompressionError,
+ self.round_trip, s, -1)
+
+ def test_round_trip_aaa_long(self):
+ s = b'a' * 150000
+ # this *wont* fail because although the data will run out
+ # before the buffer is full, LZXpress plain does not care
+ # about that.
+ try:
+ self.round_trip(s, 1)
+ except compression.CompressionError as e:
+ self.fail(f"failed to decompress with {e}")
+
+ def test_round_trip_aaab_short(self):
+ s = b'a' * 150000 + b'b'
+
+ # this will *partially* succeed, because the buffer will fill
+ # up vat a break in the decompression (not mid-match), and
+ # lzxpress plain does not mind that. However self.round_trip
+ # also makes an assertion that the original data equals the
+ # decompressed result, and it won't because the decompressed
+ # result is one byte shorter.
+ self.assertRaises(AssertionError,
+ self.round_trip, s, -1)
+
+ def test_round_trip_aaab_unstated(self):
+ s = b'a' * 150000 + b'b'
+
+ # this will succeed, because with no target size given, we
+ # guess a large buffer in the python bindings.
+ try:
+ self.round_trip(s)
+ except compression.CompressionError as e:
+ self.fail(f"failed to decompress with {e}")
+
+ def test_round_trip_30mb(self):
+ s = b'abc' * 10000000
+ # This will decompress into a string bigger than the python
+ # bindings are willing to speculatively allocate, so will fail
+ # to decompress.
+ with self.assertRaises(compression.CompressionError):
+ self.round_trip(s, None)
+
+ # but it will be fine if we use the length
+ try:
+ self.round_trip(s, 0)
+ except compression.CompressionError as e:
+ self.fail(f"failed to decompress with {e}")
+
+ def test_files(self):
+ # We don't go through the whole set, which are already tested
+ # by lib/compression/tests/test_lzxpress_plain.c
+ for fn in ("slow-33d90a24e70515b14cd0",
+ "midsummer-nights-dream.txt"):
+ self.decompress_file(fn)
+
+ def test_empty_round_trip(self):
+ # not symmetrical with Huffman, this doesn't fail
+ self.round_trip('')
+
+
+class LzxpressHuffmanCompressionTest(BaseCompressionTest):
+ compress = compression.huffman_compress
+ decompress = compression.huffman_decompress
+ compressed_dir = "compressed-huffman"
+ compressed_suffix = ".lzhuff"
+
+ def test_round_trip_aaa_str(self):
+ s = 'a' * 150000
+ self.round_trip(s)
+
+ def test_round_trip_aaa_bytes(self):
+ s = b'a' * 150000
+ self.round_trip(s)
+
+ def test_round_trip_aaa_short(self):
+ s = b'a' * 150000
+
+ # this'll fail because the match for 'aaa...' will run
+ # past the end of the buffer
+ self.assertRaises(compression.CompressionError,
+ self.round_trip, s, -1)
+
+ def test_round_trip_aaa_long(self):
+ s = b'a' * 150000
+
+ # this'll fail because the data will run out before the buffer
+ # is full.
+ self.assertRaises(compression.CompressionError,
+ self.round_trip, s, 1)
+
+ def test_round_trip_aaab_short(self):
+ s = b'a' * 150000 + b'b'
+
+ # this *could* be allowed to succeed, because even though we
+ # give it the wrong size, we know the decompression will not
+ # flow over the end of the buffer, The behaviour here appears
+ # to be implementation dependent -- the decompressor has the
+ # option of saying 'whatever' and continuing. We are probably
+ # stricter than Windows.
+ self.assertRaises(compression.CompressionError,
+ self.round_trip, s, -1)
+
+ def test_round_trip_aaab_unstated(self):
+ s = b'a' * 150000 + b'b'
+
+ # For the Huffman algorithm, the length is really an essential
+ # part of the compression data, and the bindings will reject a
+ # call with out it. This happens at the arument parsing stage,
+ # so is a TypeError (i.e. wrong type of function), not a
+ # CompressionError.
+ self.assertRaises(TypeError,
+ self.round_trip, s, None)
+
+ def test_files(self):
+ # We don't go through the whole set, which are already tested
+ # by lib/compression/tests/test_lzx_huffman.c
+ for fn in ("slow-33d90a24e70515b14cd0",
+ "midsummer-nights-dream.txt"):
+ self.decompress_file(fn)
+
+ def test_empty_round_trip(self):
+ with self.assertRaises(compression.CompressionError):
+ self.round_trip('')
diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py
index 3212e296ddd..774b874edbd 100755
--- a/source4/selftest/tests.py
+++ b/source4/selftest/tests.py
@@ -1902,3 +1902,4 @@ planoldpythontestsuite("proclimitdc",
planoldpythontestsuite("none", "samba.tests.usage")
planpythontestsuite("fileserver", "samba.tests.dcerpc.mdssvc")
+planoldpythontestsuite("none", "samba.tests.compression")