HoraeDB is a timeseries database (TSDB). However, HoraeDB’s goal is to handle both timeseries and analytic workloads compared with the classic TSDB, which usually have a poor performance in handling analytic workloads.
In the classic timeseries database, the Tag
columns (InfluxDB calls them Tag
and Prometheus calls them Label
) are normally indexed by generating an inverted index. However, it is found that the cardinality of Tag
varies in different scenarios. And in some scenarios the cardinality of Tag
is very high (we name this case after analytic workload), and it takes a very high cost to store and retrieve the inverted index. On the other hand, it is observed that scanning+pruning often used by the analytical databases can do a good job to handle such analytic workload.
The basic design idea of HoraeDB is to adopt a hybrid storage format and the corresponding query method for a better performance in processing both timeseries and analytic workloads.
|
|
The figure above shows the architecture of HoraeDB stand-alone service and the details of some important modules will be described in the following part.
module path: https://github.com/apache/incubator-horaedb/tree/main/server
The current RPC supports multiple protocols including HTTP, gRPC, MySQL.
Basically, HTTP and MySQL are used to debug HoraeDB, query manually and perform DDL operations (such as creating, deleting tables, etc.). And gRPC protocol can be regarded as a customized protocol for high-performance, which is suitable for massive reading and writing operations.
module path: https://github.com/apache/incubator-horaedb/tree/main/query_frontend
SQL layer takes responsibilities for parsing sql and generating the query plan.
Based on sqlparser a sql dialect, which introduces some key concepts including Tag
and Timestamp
, is provided for processing timeseries data. And by utilizing DataFusion the planner is able to generate both regular logical plans and tailored ones which is used to implement the special operators defined by timeseries queries, e.g PromQL
.
module path: https://github.com/apache/incubator-horaedb/tree/main/interpreters
The Interpreter
module encapsulates the SQL CRUD
operations. In the query procedure, a sql received by HoraeDB is parsed, converted into the query plan and then executed in some specific interpreter, such as SelectInterpreter
, InsertInterpreter
and etc.
module path: https://github.com/apache/incubator-horaedb/tree/main/catalog_impls
Catalog
is actually the module managing metadata and the levels of metadata adopted by HoraeDB is similar to PostgreSQL: Catalog > Schema > Table
, but they are only used as namespace.
At present, Catalog
and Schema
have two different kinds of implementation for standalone and distributed mode because some strategies to generate ids and ways to persist metadata differ in different mode.
module path: https://github.com/apache/incubator-horaedb/tree/main/query_engine
Query Engine
is responsible for optimizing and executing query plan given a basic SQL plan provided by SQL layer and now such work is mainly delegated to DataFusion.
In addition to the basic functions of SQL, HoraeDB also defines some customized query protocols and optimization rules for some specific query plans by utilizing the extensibility provided by DataFusion. For example, the implementation of PromQL
is implemented in this way and read it if you are interested.
module path: https://github.com/apache/incubator-horaedb/tree/main/table_engine
Table Engine
is actually a storage engine for managing tables in HoraeDB and the pluggability of Table Engine
is a core design of HoraeDB which matters in achieving our long-term target, e.g supporting handle log or tracing workload by implementing new storage engines. HoraeDB will have multiple kinds of Table Engine
for different workloads and the most appropriate one should be chosen as the storage engine according to the workload pattern.
Now the requirements for a Table Engine
are:
Table
instances which provides read
and write
methods;Table
instance;Actually the things that a Table Engine
needs to process are a little complicated. And now in HoraeDB only one Table Engine
called Analytic
is provided and does a good job in processing analytical workload, but it is not ready yet to handle the timeseries workload (we plan to enhance it for a better performance by adding some indexes which help handle timeseries workload).
The following part gives a description about details of Analytic Table Engine
.
module path: https://github.com/apache/incubator-horaedb/tree/main/wal
The model of HoraeDB processing data is WAL
+ MemTable
that the recent written data is written to WAL
first and then to MemTable
and after a certain amount of data in MemTable
is accumulated, the data will be organized in a query-friendly form to persistent devices.
Now three implementations of WAL
are provided for standalone and distributed mode:
WAL
is based on RocksDB
and data is persisted on the local disk.WAL
is required as a distributed component and to be responsible for durability of the newly written data, so now we provide an implementation based on OceanBase.Apache Kafka
.module path: https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/memtable
For WAL
can’t provide efficient data retrieval, the newly written data is also stored in Memtable
for efficient data retrieval, after a certain amount of data is reached, HoraeDB organizes the data in MemTable
into a query-friendly storage format (SST
) and stores it to the persistent device.
The current implementation of MemTable
is based on agatedb’s skiplist. It allows concurrent reads and writes and can control memory usage based on Arena.
module path: https://github.com/apache/incubator-horaedb/blob/main/analytic_engine/src/instance/flush_compaction.rs
What Flush
does is that when the memory usage of MemTable
reaches the threshold, some MemTables
are selected for flushing into query-friendly SST
s saved on persistent device.
During the flushing procedure, the data will be divided by a certain time range (which is configured by table option Segment Duration
), and any SST
is ensured that the timestamps of the data in it are in the same Segment
. Actually this is also a common operation in most timeseries databases which organizes data in the time dimension to speed up subsequent time-related operations, such as querying data over a time range and assisting purge data outside the TTL
.
module path: https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/compaction
The data of MemTable
is flushed as SST
s, but the file size of recently flushed SST
may be very small. And too small or too many SST
s lead to the poor query performance. Therefore, Compaction
is then introduced to rearrange the SST
s so that the multiple smaller SST
files can be compacted into a larger SST
file.
module path: https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/meta
Manifest
takes responsibilities for managing tables’ metadata of Analytic Engine
including:
SST
s belonging to the table.Now the Manifest
is based on WAL
and Object Storage
. The newly written updates on the Manifest
are persisted as logs in WAL
, and in order to avoid infinite expansion of Manifest
(actually every Flush
leads to an update), Snapshot
is also introduced to clean up the history of metadata updates, and the generated Snapshot
will be saved to Object Storage
.
module path: https://github.com/apache/incubator-horaedb/tree/main/components/object_store
The SST
generated by Flush
needs to be persisted and the abstraction of the persistent storage device is ObjectStore
including multiple implementations:
The distributed architecture of HoraeDB separates storage and computing, which requires Object Store
needs to be a highly available and reliable service independent of HoraeDB. Therefore, storage systems like Amazon S3, Alibaba Cloud OSS is a good choice and in the future implementations on storage systems of some other cloud service providers is planned to provide.
module path: https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/sst
SST
is actually an abstraction that can have multiple specific implementations. The current implementation is based on Parquet, which is a column-oriented data file format designed for efficient data storage and retrieval.
The format of SST
is very critical for retrieving data and is also the most important part to perform well in handling both timeseries and analytic workloads. At present, our Parquet-based implementation is good at processing analytic workload but is poor at processing timeseries workload. In our roadmap, we will explore more storage formats in order to achieve a good performance in both workloads.
module path: https://github.com/apache/incubator-horaedb/blob/main/analytic_engine/src/space.rs
In Analytic Engine
, there is a concept called space
and here is an explanation for it to resolve some ambiguities when read source code. Actually Analytic Engine
does not have the concept of catalog
and schema
and only provides two levels of relationship: space
and table
. And in the implementation, the schema id
(which should be unique across all catalog
s) on the upper layer is actually mapped to space id
.
The space
in Analytic Engine
serves mainly for isolation of resources for different tenants, such as the usage of memory.
After a brief introduction to some important modules of HoraeDB, we will give a description for some critical paths in code, hoping to provide interested developers with a guide for reading the code.
|
|
Take SELECT
SQL as an example. The figure above shows the query procedure and the numbers in it indicates the order of calling between the modules.
Here are the details:
Interpreter
is created and logical plan will be executed by it;Select
SQL, it will be executed through SelectInterpreter
;SelectInterpreter
the specific query logic is executed by the Query Engine
:Analytic Engine
:read
method of Table
instance provided by Analytic Engine
;SST
and Memtable
, and the data can be filtered by the pushed down predicates;Query Engine
will complete the specific computation and generate the final results;SelectInterpreter
gets the results and feeds them to the protocol module;The following is the flow of function calls in version v1.2.2:
┌───────────────────────◀─────────────┐ ┌───────────────────────┐
│ handle_sql │────────┐ │ │ parse_sql │
└───────────────────────┘ │ │ └────────────────┬──────┘
│ ▲ │ │ ▲ │
│ │ │ │ │ │
│ │ │ └36───┐ │ 11
1│ │ │ │ │ │
│ 8│ │ │ │ │
│ │ │ │ 10 │
│ │ │ │ │ │
▼ │ │ │ │ ▼
┌─────────────────┴─────┐ 9│ ┌┴─────┴────────────────┐───────12─────────▶┌───────────────────────┐
│maybe_forward_sql_query│ └────────▶│fetch_sql_query_output │ │ statement_to_plan │
└───┬───────────────────┘ └────┬──────────────────┘◀───────19─────────└───────────────────────┘
│ ▲ │ ▲ │ ▲
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ 35 13 18
2│ 7│ 20 │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ ▼ │
▼ │ ▼ │ ┌───────────────────────┐
┌───────────────────────┐───────────6───────▶┌─────────────────┴─────┐ ┌─────────────────┴─────┐ │Planner::statement_to_p│
│ forward_with_endpoint │ │ forward │ │execute_plan_involving_│ │ lan │
└───────────────────────┘◀────────5──────────└───┬───────────────────┘ ┌──│ partition_table │◀────────┐ └───┬───────────────────┘
│ ▲ │ └───────────────────────┘ │ │ ▲
│ │ │ │ ▲ │ │ │
│ │ │ │ │ │ 14 17
┌───────────────────────┐ │ 4│ │ │ │ │ │ │
┌─────│ PhysicalPlan::execute │ 3│ │ │ 21 │ │ │ │
│ └───────────────────────┘◀──┐ │ │ │ │ 22 │ │ │
│ │ │ │ │ │ │ │ ▼ │
│ │ │ │ │ │ │ │ ┌────────────────────────┐
│ │ ▼ │ │ ▼ │ 34 │sql_statement_to_datafus│
│ ┌───────────────────────┐ 30 ┌─────────────────┴─────┐ │ ┌─────────────────┴─────┐ │ │ ion_plan │
31 │ build_df_session_ctx │ │ │ route │ │ │ build_interpreter │ │ └────────────────────────┘
│ └────┬──────────────────┘ │ └───────────────────────┘ │ └───────────────────────┘ │ │ ▲
│ │ ▲ │ │ │ │ │
│ 27 26 │ 23 │ 15 16
│ ▼ │ │ │ │ │ │
└────▶┌────────────────┴──────┐ │ ┌───────────────────────┐ │ │ │ │
│ execute_logical_plan ├───┴────32────────▶│ execute │──────────┐ │ ┌───────────────────────┐ │ ▼ │
└────┬──────────────────┘◀────────────25────┴───────────────────────┘ 33 │ │interpreter_execute_pla│ │ ┌────────────────────────┐
│ ▲ ▲ └──────┴──▶│ n │────────┘ │SqlToRel::sql_statement_│
28 │ └──────────24────────────────┴───────────────────────┘ │ to_datafusion_plan │
│ 29 └────────────────────────┘
▼ │
┌────────────────┴──────┐
│ optimize_plan │
└───────────────────────┘
handle_sql
after various protocol conversions, and since the request may not be processed by this node, it may need to be forwarded to maybe_forward_sql_query
to handle the forwarding logic.ForwardRequest
in maybe_forward_sql_query
, call forward
RouteRequest
in forward
, call route
route
to get the destination node endpoint
and return to forward
.forward_with_endpoint
to forward the requestforward
maybe_forward_sql_query
handle_sql
Local
request, call fetch_sql_query_output
to process itparse_sql
to parse sql
into Statment
fetch_sql_query_output
statement_to_plan
with Statment
Planner
with ctx
and Statment
, and call the statement_to_plan
method of Planner
planner
will call the corresponding planner
method for the requested category, at this point our sql
is a query and will call sql_statement_to_plan
sql_statement_to_datafusion_plan
, which will generate the datafusion
object, and then call SqlToRel::sql_statement_to_plan
SqlToRel::sql_statement_to_plan
execute_plan_involving_partition_table
(in the default configuration) for subsequent optimization and execution of this logical planbuild_interpreter
to generate Interpreter
Interpreter's
interpreter_execute_plan
method for logical plan execution.execute
function is called, at this time the sql
is a query, so the execute of the SelectInterpreter
will be calledexecute_logical_plan
, which will call build_df_session_ctx
to generate the optimizerbuild_df_session_ctx
will use the config
information to generate the corresponding context, first using datafusion and some custom optimization rules (in logical_optimize_rules()) to generate the logical plan optimizer, using apply_adapters_for_physical_optimize_rules
to generate the physical plan optimizeroptimize_plan
, using the optimizer just generated to first optimize the logical plan and then the physical plan
|
|
Take INSERT
SQL as an example. The figure above shows the query procedure and the numbers in it indicates the order of calling between the modules.
Here are the details:
Interpreter
is created and logical plan will be executed by it;INSERT
SQL, it will be executed through InsertInterpreter
;InsertInterpreter
, write
method of Table
provided Analytic Engine
is called:WAL
first;MemTable
then;MemTable
, the memory usage will be checked. If the memory usage is too high, the flush process will be triggered:SST
s;SST
s and the flushed sequence number of WAL
to Manifest
;WAL
entries;