Skip to content

Commit 7f9c486

Browse files
committed
Read PBF files without an extra thread
Unlike XML or OPL files, PBF files are never packed into gzip or bzip2 as a whole. So we don't need the extra thread doing that unpacking. This speeds up reading of PBF files.
1 parent cd7d5fa commit 7f9c486

5 files changed

Lines changed: 124 additions & 22 deletions

File tree

include/osmium/io/compression.hpp

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ namespace osmium {
113113

114114
virtual void close() = 0;
115115

116+
virtual bool is_real() const noexcept {
117+
return true;
118+
}
119+
116120
std::size_t file_size() const noexcept {
117121
return m_file_size;
118122
}
@@ -274,6 +278,37 @@ namespace osmium {
274278

275279
}; // class NoCompressor
276280

281+
/**
282+
* The DummyDecompressor is used when reading PBF files. In that
283+
* case the PBFParser class is responsible for reading from the
284+
* file itself, and the DummyDecompressor does nothing.
285+
*/
286+
class DummyDecompressor final : public Decompressor {
287+
public:
288+
289+
DummyDecompressor() = default;
290+
291+
DummyDecompressor(const DummyDecompressor&) = delete;
292+
DummyDecompressor& operator=(const DummyDecompressor&) = delete;
293+
294+
DummyDecompressor(DummyDecompressor&&) = delete;
295+
DummyDecompressor& operator=(DummyDecompressor&&) = delete;
296+
297+
~DummyDecompressor() noexcept override = default;
298+
299+
std::string read() override {
300+
return {};
301+
}
302+
303+
void close() override {
304+
}
305+
306+
bool is_real() const noexcept override {
307+
return false;
308+
}
309+
310+
}; // class DummyDecompressor
311+
277312
class NoDecompressor final : public Decompressor {
278313

279314
int m_fd = -1;

include/osmium/io/detail/input_format.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ namespace osmium {
5858

5959
struct parser_arguments {
6060
osmium::thread::Pool& pool;
61+
int fd;
6162
future_string_queue_type& input_queue;
6263
future_buffer_queue_type& output_queue;
6364
std::promise<osmium::io::Header>& header_promise;

include/osmium/io/detail/pbf_input_format.hpp

Lines changed: 65 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ DEALINGS IN THE SOFTWARE.
3737
#include <osmium/io/detail/pbf.hpp> // IWYU pragma: export
3838
#include <osmium/io/detail/pbf_decoder.hpp>
3939
#include <osmium/io/detail/protobuf_tags.hpp>
40+
#include <osmium/io/detail/read_write.hpp>
4041
#include <osmium/io/file_format.hpp>
4142
#include <osmium/io/header.hpp>
4243
#include <osmium/osm/entity_bits.hpp>
@@ -65,6 +66,7 @@ namespace osmium {
6566
class PBFParser final : public Parser {
6667

6768
std::string m_input_buffer{};
69+
int m_fd;
6870

6971
/**
7072
* Make sure the input data contains at least the specified
@@ -73,6 +75,7 @@ namespace osmium {
7375
* @param size Number of bytes to read
7476
*/
7577
void ensure_available_in_input_queue(size_t size) {
78+
assert(m_fd == -1);
7679
if (m_input_buffer.size() < size) {
7780
m_input_buffer.reserve(size);
7881
}
@@ -91,40 +94,62 @@ namespace osmium {
9194
* @param size Number of bytes to remove
9295
*/
9396
void pop_from_input_queue(size_t size) {
97+
assert(m_fd == -1);
9498
m_input_buffer.erase(0, size);
9599
}
96100

101+
static uint32_t get_size_in_network_byte_order(const char* d) noexcept {
102+
return (static_cast<uint32_t>(d[3])) |
103+
(static_cast<uint32_t>(d[2]) << 8U) |
104+
(static_cast<uint32_t>(d[1]) << 16U) |
105+
(static_cast<uint32_t>(d[0]) << 24U);
106+
}
107+
108+
static uint32_t check_size(uint32_t size) {
109+
if (size > static_cast<uint32_t>(max_blob_header_size)) {
110+
throw osmium::pbf_error{"invalid BlobHeader size (> max_blob_header_size)"};
111+
}
112+
return size;
113+
}
114+
97115
/**
98-
* Read the given number of bytes from the input queue.
116+
* Read exactly size bytes from fd into buffer.
99117
*
100-
* @param size Number of bytes to read
101-
* @returns String with the data
102-
* @throws osmium::pbf_error If size bytes can't be read
118+
* @returns true if size bytes could be read
119+
* false if EOF was encountered
103120
*/
104-
std::string read_from_input_queue(size_t size) {
105-
ensure_available_in_input_queue(size);
121+
static bool read_exactly(int fd, char* buffer, std::size_t size) {
122+
std::size_t to_read = size;
106123

107-
std::string output(m_input_buffer, 0, size);
108-
pop_from_input_queue(size);
124+
while (to_read > 0) {
125+
auto const read_size = osmium::io::detail::reliable_read(fd, buffer + (size - to_read), to_read);
126+
if (read_size == 0) { // EOF
127+
return false;
128+
}
129+
to_read -= read_size;
130+
}
109131

110-
return output;
132+
return true;
111133
}
112134

113135
/**
114136
* Read 4 bytes in network byte order from file. They contain
115137
* the length of the following BlobHeader.
116138
*/
117139
uint32_t read_blob_header_size_from_file() {
140+
if (m_fd != -1) {
141+
std::array<char, sizeof(uint32_t)> buffer;
142+
if (!read_exactly(m_fd, buffer.data(), buffer.size())) {
143+
return 0; // EOF
144+
}
145+
return check_size(get_size_in_network_byte_order(buffer.data()));
146+
}
147+
118148
uint32_t size = 0;
119149

120150
try {
121-
// size is encoded in network byte order
122151
ensure_available_in_input_queue(sizeof(size));
123-
const char* d = m_input_buffer.data();
124-
size = (static_cast<uint32_t>(d[3])) |
125-
(static_cast<uint32_t>(d[2]) << 8U) |
126-
(static_cast<uint32_t>(d[1]) << 16U) |
127-
(static_cast<uint32_t>(d[0]) << 24U);
152+
size = get_size_in_network_byte_order(m_input_buffer.data());
128153
pop_from_input_queue(sizeof(size));
129154
} catch (const osmium::pbf_error&) {
130155
return 0; // EOF
@@ -178,10 +203,15 @@ namespace osmium {
178203
return 0;
179204
}
180205

206+
if (m_fd != -1) {
207+
auto const buffer = read_from_input_queue_with_check(size);
208+
const auto blob_size = decode_blob_header(protozero::data_view{buffer.data(), size}, expected_type);
209+
return blob_size;
210+
}
211+
181212
ensure_available_in_input_queue(size);
182213
const auto blob_size = decode_blob_header(protozero::data_view{m_input_buffer.data(), size}, expected_type);
183214
pop_from_input_queue(size);
184-
185215
return blob_size;
186216
}
187217

@@ -190,7 +220,21 @@ namespace osmium {
190220
throw osmium::pbf_error{std::string{"invalid blob size: "} +
191221
std::to_string(size)};
192222
}
193-
return read_from_input_queue(size);
223+
224+
std::string buffer;
225+
if (m_fd != -1) {
226+
buffer.resize(size);
227+
228+
if (!read_exactly(m_fd, &*buffer.begin(), size)) {
229+
throw osmium::pbf_error{"unexpected EOF"};
230+
}
231+
} else {
232+
ensure_available_in_input_queue(size);
233+
buffer.append(m_input_buffer, 0, size);
234+
pop_from_input_queue(size);
235+
}
236+
237+
return buffer;
194238
}
195239

196240
// Parse the header in the PBF OSMHeader blob.
@@ -218,7 +262,8 @@ namespace osmium {
218262
public:
219263

220264
explicit PBFParser(parser_arguments& args) :
221-
Parser(args) {
265+
Parser(args),
266+
m_fd(args.fd) {
222267
}
223268

224269
PBFParser(const PBFParser&) = delete;
@@ -237,6 +282,8 @@ namespace osmium {
237282
if (read_types() != osmium::osm_entity_bits::nothing) {
238283
parse_data_blobs();
239284
}
285+
286+
osmium::io::detail::reliable_close(m_fd);
240287
}
241288

242289
}; // class PBFParser

include/osmium/io/reader.hpp

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ namespace osmium {
111111

112112
detail::future_string_queue_type m_input_queue;
113113

114+
int m_fd;
115+
114116
std::unique_ptr<osmium::io::Decompressor> m_decompressor;
115117

116118
osmium::io::detail::ReadThreadManager m_read_thread_manager;
@@ -152,6 +154,7 @@ namespace osmium {
152154

153155
// This function will run in a separate thread.
154156
static void parser_thread(osmium::thread::Pool& pool,
157+
int fd,
155158
const detail::ParserFactory::create_parser_type& creator,
156159
detail::future_string_queue_type& input_queue,
157160
detail::future_buffer_queue_type& osmdata_queue,
@@ -162,6 +165,7 @@ namespace osmium {
162165
std::promise<osmium::io::Header> promise{std::move(header_promise)};
163166
osmium::io::detail::parser_arguments args = {
164167
pool,
168+
fd,
165169
input_queue,
166170
osmdata_queue,
167171
promise,
@@ -248,6 +252,19 @@ namespace osmium {
248252
return fd;
249253
}
250254

255+
static std::unique_ptr<Decompressor> make_decompressor(const osmium::io::File& file, int fd) {
256+
const auto& factory = osmium::io::CompressionFactory::instance();
257+
if (file.buffer()) {
258+
return factory.create_decompressor(file.compression(), file.buffer(), file.buffer_size());
259+
}
260+
261+
if (file.format() == file_format::pbf) {
262+
return std::unique_ptr<Decompressor>{new DummyDecompressor{}};
263+
}
264+
265+
return factory.create_decompressor(file.compression(), fd);
266+
}
267+
251268
public:
252269

253270
/**
@@ -295,9 +312,8 @@ namespace osmium {
295312
m_file(file.check()),
296313
m_creator(detail::ParserFactory::instance().get_creator_function(m_file)),
297314
m_input_queue(detail::get_input_queue_size(), "raw_input"),
298-
m_decompressor(m_file.buffer() ?
299-
osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), m_file.buffer(), m_file.buffer_size()) :
300-
osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), open_input_file_or_url(m_file.filename(), &m_childpid))),
315+
m_fd(m_file.buffer() ? -1 : open_input_file_or_url(m_file.filename(), &m_childpid)),
316+
m_decompressor(make_decompressor(m_file, m_fd)),
301317
m_read_thread_manager(*m_decompressor, m_input_queue),
302318
m_osmdata_queue(detail::get_osmdata_queue_size(), "parser_results"),
303319
m_osmdata_queue_wrapper(m_osmdata_queue),
@@ -313,7 +329,9 @@ namespace osmium {
313329

314330
std::promise<osmium::io::Header> header_promise;
315331
m_header_future = header_promise.get_future();
316-
m_thread = osmium::thread::thread_handler{parser_thread, std::ref(*m_pool), std::ref(m_creator),
332+
333+
const int fd_for_parser = m_decompressor->is_real() ? -1 : m_fd;
334+
m_thread = osmium::thread::thread_handler{parser_thread, std::ref(*m_pool), fd_for_parser, std::ref(m_creator),
317335
std::ref(m_input_queue), std::ref(m_osmdata_queue),
318336
std::move(header_promise), m_read_which_entities,
319337
m_read_metadata, m_buffers_kind};

test/data-tests/testdata-xml.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ static header_buffer_type parse_xml(std::string input) {
8787

8888
osmium::io::detail::parser_arguments args = {
8989
pool,
90+
-1,
9091
input_queue,
9192
output_queue,
9293
header_promise,

0 commit comments

Comments
 (0)