From 9548f5109a608cc176dc0a022b072d8d9c1b38f5 Mon Sep 17 00:00:00 2001 From: Matthias Koefferlein Date: Thu, 29 Feb 2024 00:30:34 +0100 Subject: [PATCH] Implementing automatic .gz support for https and pipe URIs --- src/tl/tl/tlHttpStream.h | 16 ++- src/tl/tl/tlStream.cc | 146 ++++++++++++++++++++++++- src/tl/tl/tlStream.h | 83 +++++++++++++- src/tl/unit_tests/tlHttpStreamTests.cc | 28 +++++ src/tl/unit_tests/tlStreamTests.cc | 9 ++ 5 files changed, 272 insertions(+), 10 deletions(-) diff --git a/src/tl/tl/tlHttpStream.h b/src/tl/tl/tlHttpStream.h index a85dc3348..529e81d85 100644 --- a/src/tl/tl/tlHttpStream.h +++ b/src/tl/tl/tlHttpStream.h @@ -112,7 +112,6 @@ public: * @brief Polling: call this function regularly to explicitly establish polling * (in the Qt framework, this is done automatically within the event loop) * May throw a tl::CancelException to stop. - * Returns true if a message has arrived. */ void tick (); @@ -209,6 +208,21 @@ private: InputHttpStreamCallback *mp_callback; }; +/** + * @brief A HTTP stream with .gz support + */ +class TL_PUBLIC InflatingInputHttpStream + : public inflating_input_stream +{ +public: + /** + * @brief Open a stream with the given URL + */ + InflatingInputHttpStream (const std::string &url) + : inflating_input_stream (new InputHttpStream (url)) + { } +}; + } #endif diff --git a/src/tl/tl/tlStream.cc b/src/tl/tl/tlStream.cc index dc1425537..4ffc2d43e 100644 --- a/src/tl/tl/tlStream.cc +++ b/src/tl/tl/tlStream.cc @@ -140,6 +140,125 @@ public: gzFile zs; }; +// --------------------------------------------------------------- +// inflating_input_stream implementation + +/** + * @brief A wrapper that adds generic .gz support + */ +template +inflating_input_stream::inflating_input_stream (Base *delegate) + : m_inflating_stream (delegate), m_is_compressed (false), mp_delegate (delegate) +{ + enter_inflate (); +} + +template +size_t +inflating_input_stream::read (char *b, size_t n) +{ + // TODO: this is somewhat inefficient, but we only use it for pipe and HTTP streams + size_t i = 0; + while (i < n) { + + if (m_is_compressed || m_inflating_stream.blen () == 0) { + + const char *read = m_inflating_stream.get (1); + if (! read) { + break; + } + *b++ = *read; + i += 1; + + } else { + + size_t nn = std::min (n - i, m_inflating_stream.blen ()); + const char *read = m_inflating_stream.get (nn); + tl_assert (read != 0); + memcpy (b, read, nn); + b += nn; + i += nn; + + } + + } + return i; +} + +template +void +inflating_input_stream::enter_inflate () +{ + // identify and skip header for .gz file + if (auto_detect_gz ()) { + m_is_compressed = true; + m_inflating_stream.inflate (true /* stop after inflated block */); + } else { + m_inflating_stream.unget (m_inflating_stream.pos ()); + } +} + +template +bool +inflating_input_stream::auto_detect_gz () +{ + std::string header = m_inflating_stream.read_all (10); + if (header.size () < 10) { + return false; + } + + const unsigned char *header_data = (const unsigned char *) header.c_str (); + unsigned char flags = header_data[3]; + if (header_data[0] != 0x1f || header_data[1] != 0x8b || header_data[2] != 0x08 || (flags & 0xe0) != 0) { + return false; + } + + // .gz signature found + + bool has_fhcrc = (flags & 0x02) != 0; + bool has_extra = (flags & 0x04) != 0; + bool has_fname = (flags & 0x08) != 0; + bool has_comment = (flags & 0x10) != 0; + + if (has_extra) { + const unsigned char *xlen = (const unsigned char *) m_inflating_stream.get (2); + if (! xlen) { + throw tl::Exception (tl::to_string (tr ("Corrupt .gz header - missing XLEN field"))); + } + const char *xdata = m_inflating_stream.get (size_t (xlen[0]) + (size_t (xlen[1]) << 8)); + if (! xdata) { + throw tl::Exception (tl::to_string (tr ("Corrupt .gz header - missing EXTRA data"))); + } + } + + if (has_fname) { + const char *c; + while ((c = m_inflating_stream.get (1)) != 0 && *c) + ; + if (! c) { + throw tl::Exception (tl::to_string (tr ("Corrupt .gz header - missing FNAME data trailing zero byte"))); + } + } + + if (has_comment) { + const char *c; + while ((c = m_inflating_stream.get (1)) != 0 && *c) + ; + if (! c) { + throw tl::Exception (tl::to_string (tr ("Corrupt .gz header - missing COMMENT data trailing zero byte"))); + } + } + + if (has_fhcrc) { + const char *crc16 = m_inflating_stream.get (2); + if (! crc16) { + throw tl::Exception (tl::to_string (tr ("Corrupt .gz header - missing CRC16 data"))); + } + } + + return true; +} + // --------------------------------------------------------------- // InputStream implementation @@ -175,7 +294,7 @@ public: } InputStream::InputStream (InputStreamBase &delegate) - : m_pos (0), mp_bptr (0), mp_delegate (&delegate), m_owns_delegate (false), mp_inflate (0), m_inflate_always (false) + : m_pos (0), mp_bptr (0), mp_delegate (&delegate), m_owns_delegate (false), mp_inflate (0), m_inflate_always (false), m_stop_after_inflate (false) { m_bcap = 4096; // initial buffer capacity m_blen = 0; @@ -183,7 +302,7 @@ InputStream::InputStream (InputStreamBase &delegate) } InputStream::InputStream (InputStreamBase *delegate) - : m_pos (0), mp_bptr (0), mp_delegate (delegate), m_owns_delegate (true), mp_inflate (0), m_inflate_always (false) + : m_pos (0), mp_bptr (0), mp_delegate (delegate), m_owns_delegate (true), mp_inflate (0), m_inflate_always (false), m_stop_after_inflate (false) { m_bcap = 4096; // initial buffer capacity m_blen = 0; @@ -191,7 +310,7 @@ InputStream::InputStream (InputStreamBase *delegate) } InputStream::InputStream (const std::string &abstract_path) - : m_pos (0), mp_bptr (0), mp_delegate (0), m_owns_delegate (false), mp_inflate (0), m_inflate_always (false) + : m_pos (0), mp_bptr (0), mp_delegate (0), m_owns_delegate (false), mp_inflate (0), m_inflate_always (false), m_stop_after_inflate (false) { m_bcap = 4096; // initial buffer capacity m_blen = 0; @@ -252,7 +371,7 @@ InputStream::InputStream (const std::string &abstract_path) } else if (ex.test ("pipe:")) { - mp_delegate = new InputPipe (ex.get ()); + mp_delegate = new InflatingInputPipe (ex.get ()); } else { @@ -260,7 +379,7 @@ InputStream::InputStream (const std::string &abstract_path) if (uri.scheme () == "http" || uri.scheme () == "https") { #if defined(HAVE_CURL) || defined(HAVE_QT) - mp_delegate = new InputHttpStream (abstract_path); + mp_delegate = new InflatingInputHttpStream (abstract_path); #else throw tl::Exception (tl::to_string (tr ("HTTP support not enabled - HTTP/HTTPS paths are not available"))); #endif @@ -337,9 +456,16 @@ InputStream::get (size_t n, bool bypass_inflate) tl_assert (r != 0); // since deflate did not report at_end() return r; + } else if (m_stop_after_inflate) { + + // report EOF after the inflator has finished + return 0; + } else { + delete mp_inflate; mp_inflate = 0; + } } @@ -384,9 +510,16 @@ InputStream::get (size_t n, bool bypass_inflate) void InputStream::unget (size_t n) { + if (n == 0) { + return; + } + if (mp_inflate) { + // TODO: this will not work if mp_inflate just got destroyed + // (no unget into previous compressed block) mp_inflate->unget (n); } else { + tl_assert (mp_buffer + n <= mp_bptr); mp_bptr -= n; m_blen += n; m_pos -= n; @@ -476,10 +609,11 @@ void InputStream::copy_to (tl::OutputStream &os) } void -InputStream::inflate () +InputStream::inflate (bool stop_after) { tl_assert (mp_inflate == 0); mp_inflate = new tl::InflateFilter (*this); + m_stop_after_inflate = stop_after; } void diff --git a/src/tl/tl/tlStream.h b/src/tl/tl/tlStream.h index 087e54f45..4e7cfdcf6 100644 --- a/src/tl/tl/tlStream.h +++ b/src/tl/tl/tlStream.h @@ -310,8 +310,7 @@ public: * an error occurs - commonly if the command cannot be executed. * This implementation is based on popen (). * - * @param cmd The command to execute - * @param read True, if the file should be read, false on write. + * @param source The command to execute */ InputPipe (const std::string &source); @@ -473,8 +472,11 @@ public: * the uncompressed data rather than the raw data, until the * compressed block is finished. * The stream must not be in inflate state yet. + * + * If "stop_after" is true, the stream will stop after the inflated + * block has finished. */ - void inflate (); + void inflate (bool stop_after = false); /** * @brief Enables "inflate" right from the beginning @@ -577,12 +579,87 @@ private: // inflate support InflateFilter *mp_inflate; bool m_inflate_always; + bool m_stop_after_inflate; // No copying currently InputStream (const InputStream &); InputStream &operator= (const InputStream &); }; +/** + * @brief A wrapper that adds generic .gz support + */ +template +class TL_PUBLIC_TEMPLATE inflating_input_stream + : public InputStreamBase +{ +public: + inflating_input_stream (Base *delegate); + + Base *delegate () + { + return mp_delegate; + } + + virtual size_t read (char *b, size_t n); + + virtual void reset () + { + m_inflating_stream.reset (); + enter_inflate (); + } + + virtual void close () + { + m_inflating_stream.close (); + } + + virtual std::string source () const + { + return m_inflating_stream.source (); + } + + virtual std::string absolute_path () const + { + return m_inflating_stream.absolute_path (); + } + + virtual std::string filename () const + { + return m_inflating_stream.filename (); + } + +private: + tl::InputStream m_inflating_stream; + bool m_is_compressed; + Base *mp_delegate; + + void enter_inflate (); + bool auto_detect_gz (); +}; + +/** + * @brief A pipe stream with .gz support + */ +class TL_PUBLIC InflatingInputPipe + : public inflating_input_stream +{ +public: + /** + * @brief Open a stream by connecting with the stdout of a given command + * + * Opening a pipe is a prerequisite for reading from the + * object. open() will throw a FilePOpenErrorException if + * an error occurs - commonly if the command cannot be executed. + * This implementation is based on popen (). + * + * @param source The command to execute + */ + InflatingInputPipe (const std::string &source) + : inflating_input_stream (new InputPipe (source)) + { } +}; + // --------------------------------------------------------------------------------- /** diff --git a/src/tl/unit_tests/tlHttpStreamTests.cc b/src/tl/unit_tests/tlHttpStreamTests.cc index e05c7e5fe..4acf10f3c 100644 --- a/src/tl/unit_tests/tlHttpStreamTests.cc +++ b/src/tl/unit_tests/tlHttpStreamTests.cc @@ -24,8 +24,10 @@ #include "tlHttpStream.h" #include "tlUnitTest.h" #include "tlTimer.h" +#include "tlStream.h" static std::string test_url1 ("http://www.klayout.org/svn-public/klayout-resources/trunk/testdata/text"); +static std::string test_url1_gz ("http://www.klayout.org/svn-public/klayout-resources/trunk/testdata/text.gz"); static std::string test_url2 ("http://www.klayout.org/svn-public/klayout-resources/trunk/testdata/dir1"); TEST(1) @@ -125,3 +127,29 @@ TEST(3) EXPECT_EQ (res, "hello, world.\n"); } +// tl::Stream embedding +TEST(4) +{ + if (! tl::InputHttpStream::is_available ()) { + throw tl::CancelException (); + } + + tl::InputStream stream (test_url1); + + std::string res = stream.read_all (); + EXPECT_EQ (res, "hello, world.\n"); +} + +// tl::Stream embedding with automatic unzip +TEST(5) +{ + if (! tl::InputHttpStream::is_available ()) { + throw tl::CancelException (); + } + + tl::InputStream stream (test_url1_gz); + + std::string res = stream.read_all (); + EXPECT_EQ (res, "hello, world.\n"); +} + diff --git a/src/tl/unit_tests/tlStreamTests.cc b/src/tl/unit_tests/tlStreamTests.cc index e51a7d4c4..517d71615 100644 --- a/src/tl/unit_tests/tlStreamTests.cc +++ b/src/tl/unit_tests/tlStreamTests.cc @@ -52,6 +52,13 @@ TEST(InputPipe2) EXPECT_NE (ret, 0); } +TEST(InputPipe3) +{ + tl::InputStream str ("pipe:echo HELLOWORLD"); + tl::TextInputStream tstr (str); + EXPECT_EQ (tstr.get_line (), "HELLOWORLD"); +} + TEST(OutputPipe1) { std::string tf = tmp_file ("pipe_out"); @@ -455,3 +462,5 @@ TEST(Backups) } } + +