Implementing automatic .gz support for https and pipe URIs

This commit is contained in:
Matthias Koefferlein 2024-02-29 00:30:34 +01:00
parent 012447c31b
commit 9548f5109a
5 changed files with 272 additions and 10 deletions

View File

@ -112,7 +112,6 @@ public:
* @brief Polling: call this function regularly to explicitly establish polling
* (in the Qt framework, this is done automatically within the event loop)
* May throw a tl::CancelException to stop.
* Returns true if a message has arrived.
*/
void tick ();
@ -209,6 +208,21 @@ private:
InputHttpStreamCallback *mp_callback;
};
/**
* @brief A HTTP stream with .gz support
*/
class TL_PUBLIC InflatingInputHttpStream
: public inflating_input_stream<InputHttpStream>
{
public:
/**
* @brief Open a stream with the given URL
*/
InflatingInputHttpStream (const std::string &url)
: inflating_input_stream<InputHttpStream> (new InputHttpStream (url))
{ }
};
}
#endif

View File

@ -140,6 +140,125 @@ public:
gzFile zs;
};
// ---------------------------------------------------------------
// inflating_input_stream implementation
/**
* @brief A wrapper that adds generic .gz support
*/
template <class Base>
inflating_input_stream<Base>::inflating_input_stream (Base *delegate)
: m_inflating_stream (delegate), m_is_compressed (false), mp_delegate (delegate)
{
enter_inflate ();
}
template <class Base>
size_t
inflating_input_stream<Base>::read (char *b, size_t n)
{
// TODO: this is somewhat inefficient, but we only use it for pipe and HTTP streams
size_t i = 0;
while (i < n) {
if (m_is_compressed || m_inflating_stream.blen () == 0) {
const char *read = m_inflating_stream.get (1);
if (! read) {
break;
}
*b++ = *read;
i += 1;
} else {
size_t nn = std::min (n - i, m_inflating_stream.blen ());
const char *read = m_inflating_stream.get (nn);
tl_assert (read != 0);
memcpy (b, read, nn);
b += nn;
i += nn;
}
}
return i;
}
template <class Base>
void
inflating_input_stream<Base>::enter_inflate ()
{
// identify and skip header for .gz file
if (auto_detect_gz ()) {
m_is_compressed = true;
m_inflating_stream.inflate (true /* stop after inflated block */);
} else {
m_inflating_stream.unget (m_inflating_stream.pos ());
}
}
template <class Base>
bool
inflating_input_stream<Base>::auto_detect_gz ()
{
std::string header = m_inflating_stream.read_all (10);
if (header.size () < 10) {
return false;
}
const unsigned char *header_data = (const unsigned char *) header.c_str ();
unsigned char flags = header_data[3];
if (header_data[0] != 0x1f || header_data[1] != 0x8b || header_data[2] != 0x08 || (flags & 0xe0) != 0) {
return false;
}
// .gz signature found
bool has_fhcrc = (flags & 0x02) != 0;
bool has_extra = (flags & 0x04) != 0;
bool has_fname = (flags & 0x08) != 0;
bool has_comment = (flags & 0x10) != 0;
if (has_extra) {
const unsigned char *xlen = (const unsigned char *) m_inflating_stream.get (2);
if (! xlen) {
throw tl::Exception (tl::to_string (tr ("Corrupt .gz header - missing XLEN field")));
}
const char *xdata = m_inflating_stream.get (size_t (xlen[0]) + (size_t (xlen[1]) << 8));
if (! xdata) {
throw tl::Exception (tl::to_string (tr ("Corrupt .gz header - missing EXTRA data")));
}
}
if (has_fname) {
const char *c;
while ((c = m_inflating_stream.get (1)) != 0 && *c)
;
if (! c) {
throw tl::Exception (tl::to_string (tr ("Corrupt .gz header - missing FNAME data trailing zero byte")));
}
}
if (has_comment) {
const char *c;
while ((c = m_inflating_stream.get (1)) != 0 && *c)
;
if (! c) {
throw tl::Exception (tl::to_string (tr ("Corrupt .gz header - missing COMMENT data trailing zero byte")));
}
}
if (has_fhcrc) {
const char *crc16 = m_inflating_stream.get (2);
if (! crc16) {
throw tl::Exception (tl::to_string (tr ("Corrupt .gz header - missing CRC16 data")));
}
}
return true;
}
// ---------------------------------------------------------------
// InputStream implementation
@ -175,7 +294,7 @@ public:
}
InputStream::InputStream (InputStreamBase &delegate)
: m_pos (0), mp_bptr (0), mp_delegate (&delegate), m_owns_delegate (false), mp_inflate (0), m_inflate_always (false)
: m_pos (0), mp_bptr (0), mp_delegate (&delegate), m_owns_delegate (false), mp_inflate (0), m_inflate_always (false), m_stop_after_inflate (false)
{
m_bcap = 4096; // initial buffer capacity
m_blen = 0;
@ -183,7 +302,7 @@ InputStream::InputStream (InputStreamBase &delegate)
}
InputStream::InputStream (InputStreamBase *delegate)
: m_pos (0), mp_bptr (0), mp_delegate (delegate), m_owns_delegate (true), mp_inflate (0), m_inflate_always (false)
: m_pos (0), mp_bptr (0), mp_delegate (delegate), m_owns_delegate (true), mp_inflate (0), m_inflate_always (false), m_stop_after_inflate (false)
{
m_bcap = 4096; // initial buffer capacity
m_blen = 0;
@ -191,7 +310,7 @@ InputStream::InputStream (InputStreamBase *delegate)
}
InputStream::InputStream (const std::string &abstract_path)
: m_pos (0), mp_bptr (0), mp_delegate (0), m_owns_delegate (false), mp_inflate (0), m_inflate_always (false)
: m_pos (0), mp_bptr (0), mp_delegate (0), m_owns_delegate (false), mp_inflate (0), m_inflate_always (false), m_stop_after_inflate (false)
{
m_bcap = 4096; // initial buffer capacity
m_blen = 0;
@ -252,7 +371,7 @@ InputStream::InputStream (const std::string &abstract_path)
} else if (ex.test ("pipe:")) {
mp_delegate = new InputPipe (ex.get ());
mp_delegate = new InflatingInputPipe (ex.get ());
} else {
@ -260,7 +379,7 @@ InputStream::InputStream (const std::string &abstract_path)
if (uri.scheme () == "http" || uri.scheme () == "https") {
#if defined(HAVE_CURL) || defined(HAVE_QT)
mp_delegate = new InputHttpStream (abstract_path);
mp_delegate = new InflatingInputHttpStream (abstract_path);
#else
throw tl::Exception (tl::to_string (tr ("HTTP support not enabled - HTTP/HTTPS paths are not available")));
#endif
@ -337,9 +456,16 @@ InputStream::get (size_t n, bool bypass_inflate)
tl_assert (r != 0); // since deflate did not report at_end()
return r;
} else if (m_stop_after_inflate) {
// report EOF after the inflator has finished
return 0;
} else {
delete mp_inflate;
mp_inflate = 0;
}
}
@ -384,9 +510,16 @@ InputStream::get (size_t n, bool bypass_inflate)
void
InputStream::unget (size_t n)
{
if (n == 0) {
return;
}
if (mp_inflate) {
// TODO: this will not work if mp_inflate just got destroyed
// (no unget into previous compressed block)
mp_inflate->unget (n);
} else {
tl_assert (mp_buffer + n <= mp_bptr);
mp_bptr -= n;
m_blen += n;
m_pos -= n;
@ -476,10 +609,11 @@ void InputStream::copy_to (tl::OutputStream &os)
}
void
InputStream::inflate ()
InputStream::inflate (bool stop_after)
{
tl_assert (mp_inflate == 0);
mp_inflate = new tl::InflateFilter (*this);
m_stop_after_inflate = stop_after;
}
void

View File

@ -310,8 +310,7 @@ public:
* an error occurs - commonly if the command cannot be executed.
* This implementation is based on popen ().
*
* @param cmd The command to execute
* @param read True, if the file should be read, false on write.
* @param source The command to execute
*/
InputPipe (const std::string &source);
@ -473,8 +472,11 @@ public:
* the uncompressed data rather than the raw data, until the
* compressed block is finished.
* The stream must not be in inflate state yet.
*
* If "stop_after" is true, the stream will stop after the inflated
* block has finished.
*/
void inflate ();
void inflate (bool stop_after = false);
/**
* @brief Enables "inflate" right from the beginning
@ -577,12 +579,87 @@ private:
// inflate support
InflateFilter *mp_inflate;
bool m_inflate_always;
bool m_stop_after_inflate;
// No copying currently
InputStream (const InputStream &);
InputStream &operator= (const InputStream &);
};
/**
* @brief A wrapper that adds generic .gz support
*/
template <class Base>
class TL_PUBLIC_TEMPLATE inflating_input_stream
: public InputStreamBase
{
public:
inflating_input_stream (Base *delegate);
Base *delegate ()
{
return mp_delegate;
}
virtual size_t read (char *b, size_t n);
virtual void reset ()
{
m_inflating_stream.reset ();
enter_inflate ();
}
virtual void close ()
{
m_inflating_stream.close ();
}
virtual std::string source () const
{
return m_inflating_stream.source ();
}
virtual std::string absolute_path () const
{
return m_inflating_stream.absolute_path ();
}
virtual std::string filename () const
{
return m_inflating_stream.filename ();
}
private:
tl::InputStream m_inflating_stream;
bool m_is_compressed;
Base *mp_delegate;
void enter_inflate ();
bool auto_detect_gz ();
};
/**
* @brief A pipe stream with .gz support
*/
class TL_PUBLIC InflatingInputPipe
: public inflating_input_stream<InputPipe>
{
public:
/**
* @brief Open a stream by connecting with the stdout of a given command
*
* Opening a pipe is a prerequisite for reading from the
* object. open() will throw a FilePOpenErrorException if
* an error occurs - commonly if the command cannot be executed.
* This implementation is based on popen ().
*
* @param source The command to execute
*/
InflatingInputPipe (const std::string &source)
: inflating_input_stream<InputPipe> (new InputPipe (source))
{ }
};
// ---------------------------------------------------------------------------------
/**

View File

@ -24,8 +24,10 @@
#include "tlHttpStream.h"
#include "tlUnitTest.h"
#include "tlTimer.h"
#include "tlStream.h"
static std::string test_url1 ("http://www.klayout.org/svn-public/klayout-resources/trunk/testdata/text");
static std::string test_url1_gz ("http://www.klayout.org/svn-public/klayout-resources/trunk/testdata/text.gz");
static std::string test_url2 ("http://www.klayout.org/svn-public/klayout-resources/trunk/testdata/dir1");
TEST(1)
@ -125,3 +127,29 @@ TEST(3)
EXPECT_EQ (res, "hello, world.\n");
}
// tl::Stream embedding
TEST(4)
{
if (! tl::InputHttpStream::is_available ()) {
throw tl::CancelException ();
}
tl::InputStream stream (test_url1);
std::string res = stream.read_all ();
EXPECT_EQ (res, "hello, world.\n");
}
// tl::Stream embedding with automatic unzip
TEST(5)
{
if (! tl::InputHttpStream::is_available ()) {
throw tl::CancelException ();
}
tl::InputStream stream (test_url1_gz);
std::string res = stream.read_all ();
EXPECT_EQ (res, "hello, world.\n");
}

View File

@ -52,6 +52,13 @@ TEST(InputPipe2)
EXPECT_NE (ret, 0);
}
TEST(InputPipe3)
{
tl::InputStream str ("pipe:echo HELLOWORLD");
tl::TextInputStream tstr (str);
EXPECT_EQ (tstr.get_line (), "HELLOWORLD");
}
TEST(OutputPipe1)
{
std::string tf = tmp_file ("pipe_out");
@ -455,3 +462,5 @@ TEST(Backups)
}
}