This is the multi-page printable view of this section. Click here to print.
Technical and Design
1 - Compaction Offload
Note: This feature is still in development.
This chapter discusses compaction offload, which is designed to separate the compaction workload from the local horaedb nodes and delegate it to external compaction nodes.
Overview
|
|
The diagram above describes the architecture of cluster for compaction offload, where some key concepts need to be explained:
Compaction Node
: Takes responsibility to handle offloaded compaction tasks. The compaction node receives the compaction task and performs the actual merging of SSTables, then sends back the task result to HoraeDB.HoraeMeta Cluster
: HoraeMeta acts as a compaction nodes manager in the compaction offload scenario. It monitors the compaction nodes cluster and schedule the compaction nodes.
The procedure of remote compaction based above architecture diagram is:
- HoraeDB servers fetch the information of suitable compaction nodes from the HoraeMeta.
- HoraeDB submit the compaction task to the remote compaction node, according to the information fetched from HoraeMeta.
- Compaction node executes the task and write results to the temporary workspace.
- Compaction node sends compaction results back to HoraeDB.
- HoraeDB receives the result, installs the data in temporary workspace and purges compaction input files.
The architecture above makes it easy to implement some wonderful features like load balancing and high availability. Let’s dive into the key components in the architecture and talking about how these features are implemented.
Compaction Node
Compaction Node
runs the main logic of compaction. It is implemented based on HoraeDB and distinguished by:
NodeType
: A config parameter used to distinguished theHoraeDB
andCompactionNode
. This info would be sent to HoraeMeta through heartbeat.
The compaction service is implemented as grpc service.
HoraeMeta
HoraeMeta
manages the compaction nodes cluster with CompactionNodeManager
, which takes responsibilities for compaction nodes metadata management and scheduling.
The compaction nodes metadata includes:
- Compaction node information, such as node name, node state;
- A compaction node name list, used as the key to access compaction node info, for better scheduling with round-robin strategy;
- …
As for the compaction nodes scheduling work, it mainly includes:
- Receiving the heartbeats from the compaction node and determining the online status of these registered nodes.
- Performing load balancing according to the compaction nodes cluster info.
- Providing the info of suitable compaction node for HoraeDB when remote compaction execution is needed.
Load Balancing
Load Balancing is critical for compaction nodes cluster to make their overall processing more efficient. The effect of load balancing mainly based on the schedule algorithm for compaction nodes impl in CompactionNodeManager
.
(ps: The current implementation of schedule algorithm is round-robin strategy for easiness.)
The main process for the schedule algorithm based on real load is:
- HoraeMeta collects the compaction nodes load information through the heartbeats to create a load overview of the compaction nodes cluster.
- Pick a compaction node with low load according to the load overview.
High Availability
The fault tolerance of above architecture can be achieved by such a procedure:
- When detecting that the heartbeat is broken,
HoraeMeta
determines that the compaction node is offline. - When
HoraeMeta
can not provide suitable compaction node for HoraeDB or compaction node doesn’t return the task result successfully, HoraeDB would switches to run compaction task locally.
2 - Introduction to Architecture of HoraeDB Cluster
Note: Some of the features mentioned in the article have not yet been implemented.
Overview
|
|
The diagram above describes the architecture of a HoraeDB cluster, where some key concepts need to be explained:
HoraeMeta Cluster
: Takes responsibilities for managing the metadata and resource scheduling of the cluster;Shard(L)/Shard(F)
: Leader shard and follower shard consisting of multiple tables;HoraeDB
: One HoraeDB instance consisting of multiple shards;WAL Service
: Write-ahead log service for storing new-written real-time data;Object Storage
: Object storage service for storing SST converted from memtable;
From the architecture diagram above, it can be concluded that the compute and storage are separated in the HoraeDB cluster, which makes it easy to implement useful distributed features, such as elastic autoscaling of compute/storage resources, high availability, load balancing, and so on.
Let’s dive into some of the key components mentioned above before explaining how these features are implemented.
Shard
Shard
is the basic scheduling unit in the cluster, which consists of a group of tables. And the tables in a shard share the same region for better storage locality in the WAL Service
, and because of this, it is efficient to recover the data of all tables in the shard by scanning the entire WAL region. For most of implementations of WAL Service
, without the shard concept, it costs a lot to recover the table data one by one due to massive random IO, and this case will deteriorate sharply when the number of tables grows to a certain level.
A specific role, Leader
or Follower
, should be assigned to a shard. A pair of leader-follower shards share the same set of tables, and the leader shard can serve the write and query requests from the client while the follower shard can only serve the read-only requests, and must synchronize the newly written data from the WAL service in order to provide the latest snapshot for data retrieval. Actually, the follower is not needed if the high availability is not required, while with at least one follower, it takes only a short time to resume service by simply switching the Follower
to Leader
when the HoraeDB instance on which the leader shard exists crashes.
The diagram below concludes the relationship between HoraeDB instance, Shard
, Table
. As shown in the diagram, the leader and follower shards are interleaved on the HoraeDB instance.
|
|
Since Shard
is the basic scheduling unit, it is natural to introduce some basic shard operations:
- Create/Drop table to/from a shard;
- Open/Close a shard;
- Split one shard into two shards;
- Merge two shards into one shard;
- Switch the role of a shard;
With these basic shard operations, some complex scheduling logic can be implemented, e.g. perform an expansion by splitting one shard into two shards and migrating one of them to the new HoraeDB instance.
HoraeMeta
HoraeMeta
is implemented by embedding an ETCD inside to ensure consistency and takes responsibilities for cluster metadata management and scheduling.
The cluster metadata includes:
- Table information, such as table name, table ID, and which cluster the table belongs to;
- The mapping between table and shard and between shard and HoraeDB instance;
- …
As for the cluster scheduling work, it mainly includes:
- Receiving the heartbeats from the HoraeDB instances and determining the online status of these registered instances;
- Assigning specific role shards to the registered HoraeDB instances;
- Participating in table creation by assigning a unique table ID and the most appropriate shard to the table;
- Performing load balancing through shard operations according to the load information sent with the heartbeats;
- Performing expansion through shard operations when new instances are registered;
- Initiating failover through shard operations when old instances go offline;
Route
In order to avoid the overhead of forwarding requests, the communication between clients and the HoraeDB instances is peer-to-peer, that is to say, the client should retrieve routing information from the server before sending any specific write/query requests.
Actually, the routing information is decided by the HoraeMeta
, but clients are only allowed the access to it through the HoraeDB instances rather than HoraeMeta
, to avoid potential performance issues on the HoraeMeta
.
WAL Service & Object Storage
In the HoraeDB cluster, WAL Service
and Object Storage
exist as separate distributed systems featured with HA, data replication and scalability. Current distributed implementations for WAL Service
includes Kafka
and OBKV
(access OceanBase
by its table api), and the implementations for Object Storage
include popular object storage services, such as AWS S3, Azure object storage and Aliyun OSS.
The two components are similar in that they are introduced to serve as the underlying storage layer for separating compute and storage, while the difference between two components is obvious that WAL Service
is used to store the newly written data from the real-time write requests whose individual size is small but quantity is large, and Object Storage
is used to store the read-friendly data files (SST) organized in the background, whose individual size is large and aggregate size is much larger.
The two components make it much easier to implement the horaedb cluster, which features horizontal scalability, high availability and load balancing.
Scalability
Scalability is an important feature for a distributed system. Let’s take a look at to how the horizontal scalability of the HoraeDB cluster is achieved.
First, the two storage components (WAL Service
and Object Storage
) should be horizontally scalable when deciding on the actual implementations for them, so the two storage services can be expanded separately if the storage capacity is not sufficient.
It will be a little bit complex when discussing the scalability of the compute service. Basically, these cases will bring the capacity problem:
- Massive queries on massive tables;
- Massive queries on a single large table;
- Massive queries on a normal table;
For the first case, it is easy to achieve horizontal scalability just by assigning shards that are created or split from old shards to expanded HoraeDB instances.
For the second case, the table partitioning is proposed and after partitioning, massive queries are distributed across multiple HoraeDB instances.
And the last case is the most important and the most difficult. Actually, the follower shard can handle part of the queries, but the number of follower shards is limited by the throughput threshold of the synchronization from the WAL regions. As shown in the diagram below, a pure compute shard can be introduced if the followers are not enough to handle the massive queries. Such a shard is not required to synchronize data with the leader shard, and retrieves the newly written data from the leader/follower shard only when the query comes. As for the SSTs required by the query, they will be downloaded from Object Storage
and cached afterwards. With the two parts of the data, the compute resources are fully utilized to execute the CPU-intensive query plan. As we can see, such a shard can be added with only a little overhead (retrieving some data from the leader/follower shard when it needs), so to some extent, the horizontal scalability is achieved.
|
|
High Availability
Assuming that WAL service
and Object Storage
are highly available, the high availability of the HoraeDB cluster can be achieved by such a procedure:
- When detecting that the heartbeat is broken,
HoraeMeta
determines that the HoraeDB instance is offline; - The follower shards whose paired leader shards exist on the offline instance are switched to leader shards for fast failover;
- A slow failover can be achieved by opening the crashed shards on another instance if such follower shards don’t exist.
|
|
Load Balancing
HoraeMeta collects the instance load information contained in the received heartbeats to create a load overview of the whole cluster, according to which the load balancing can be implemented as an automatic mechanism:
- Pick a shard on a low-load instance for the newly created table;
- Migrate a shard from a high-load instance load to another low-load instance;
- Split the large shard on the high-load instance and migrate the split shards to other low-load instances;
3 - Introduction to HoraeDB's Architecture
Target
- Provide the overview of HoraeDB to the developers who want to know more about HoraeDB but have no idea where to start.
- Make a brief introduction to the important modules of HoraeDB and the connections between these modules but details about their implementations are not be involved.
Motivation
HoraeDB is a timeseries database (TSDB). However, HoraeDB’s goal is to handle both timeseries and analytic workloads compared with the classic TSDB, which usually have a poor performance in handling analytic workloads.
In the classic timeseries database, the Tag
columns (InfluxDB calls them Tag
and Prometheus calls them Label
) are normally indexed by generating an inverted index. However, it is found that the cardinality of Tag
varies in different scenarios. And in some scenarios the cardinality of Tag
is very high (we name this case after analytic workload), and it takes a very high cost to store and retrieve the inverted index. On the other hand, it is observed that scanning+pruning often used by the analytical databases can do a good job to handle such analytic workload.
The basic design idea of HoraeDB is to adopt a hybrid storage format and the corresponding query method for a better performance in processing both timeseries and analytic workloads.
Architecture
|
|
The figure above shows the architecture of HoraeDB stand-alone service and the details of some important modules will be described in the following part.
RPC Layer
module path: https://github.com/apache/incubator-horaedb/tree/main/server
The current RPC supports multiple protocols including HTTP, gRPC, MySQL.
Basically, HTTP and MySQL are used to debug HoraeDB, query manually and perform DDL operations (such as creating, deleting tables, etc.). And gRPC protocol can be regarded as a customized protocol for high-performance, which is suitable for massive reading and writing operations.
SQL Layer
module path: https://github.com/apache/incubator-horaedb/tree/main/query_frontend
SQL layer takes responsibilities for parsing sql and generating the query plan.
Based on sqlparser a sql dialect, which introduces some key concepts including Tag
and Timestamp
, is provided for processing timeseries data. And by utilizing DataFusion the planner is able to generate both regular logical plans and tailored ones which is used to implement the special operators defined by timeseries queries, e.g PromQL
.
Interpreter
module path: https://github.com/apache/incubator-horaedb/tree/main/interpreters
The Interpreter
module encapsulates the SQL CRUD
operations. In the query procedure, a sql received by HoraeDB is parsed, converted into the query plan and then executed in some specific interpreter, such as SelectInterpreter
, InsertInterpreter
and etc.
Catalog
module path: https://github.com/apache/incubator-horaedb/tree/main/catalog_impls
Catalog
is actually the module managing metadata and the levels of metadata adopted by HoraeDB is similar to PostgreSQL: Catalog > Schema > Table
, but they are only used as namespace.
At present, Catalog
and Schema
have two different kinds of implementation for standalone and distributed mode because some strategies to generate ids and ways to persist metadata differ in different mode.
Query Engine
module path: https://github.com/apache/incubator-horaedb/tree/main/query_engine
Query Engine
is responsible for optimizing and executing query plan given a basic SQL plan provided by SQL layer and now such work is mainly delegated to DataFusion.
In addition to the basic functions of SQL, HoraeDB also defines some customized query protocols and optimization rules for some specific query plans by utilizing the extensibility provided by DataFusion. For example, the implementation of PromQL
is implemented in this way and read it if you are interested.
Pluggable Table Engine
module path: https://github.com/apache/incubator-horaedb/tree/main/table_engine
Table Engine
is actually a storage engine for managing tables in HoraeDB and the pluggability of Table Engine
is a core design of HoraeDB which matters in achieving our long-term target, e.g supporting handle log or tracing workload by implementing new storage engines. HoraeDB will have multiple kinds of Table Engine
for different workloads and the most appropriate one should be chosen as the storage engine according to the workload pattern.
Now the requirements for a Table Engine
are:
- Manage all the shared resources under the engine:
- Memory
- Storage
- CPU
- Manage metadata of tables such as table schema and table options;
- Provide
Table
instances which providesread
andwrite
methods; - Take responsibilities for creating, opening, dropping and closing
Table
instance; - ….
Actually the things that a Table Engine
needs to process are a little complicated. And now in HoraeDB only one Table Engine
called Analytic
is provided and does a good job in processing analytical workload, but it is not ready yet to handle the timeseries workload (we plan to enhance it for a better performance by adding some indexes which help handle timeseries workload).
The following part gives a description about details of Analytic Table Engine
.
WAL
module path: https://github.com/apache/incubator-horaedb/tree/main/wal
The model of HoraeDB processing data is WAL
+ MemTable
that the recent written data is written to WAL
first and then to MemTable
and after a certain amount of data in MemTable
is accumulated, the data will be organized in a query-friendly form to persistent devices.
Now three implementations of WAL
are provided for standalone and distributed mode:
- For standalone mode,
WAL
is based onRocksDB
and data is persisted on the local disk. - For distributed mode,
WAL
is required as a distributed component and to be responsible for durability of the newly written data, so now we provide an implementation based on OceanBase. - For distributed mode, in addition to OceanBase, we also provide a more lightweight implementation based on
Apache Kafka
.
MemTable
module path: https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/memtable
For WAL
can’t provide efficient data retrieval, the newly written data is also stored in Memtable
for efficient data retrieval, after a certain amount of data is reached, HoraeDB organizes the data in MemTable
into a query-friendly storage format (SST
) and stores it to the persistent device.
The current implementation of MemTable
is based on agatedb’s skiplist. It allows concurrent reads and writes and can control memory usage based on Arena.
Flush
module path: https://github.com/apache/incubator-horaedb/blob/main/analytic_engine/src/instance/flush_compaction.rs
What Flush
does is that when the memory usage of MemTable
reaches the threshold, some MemTables
are selected for flushing into query-friendly SST
s saved on persistent device.
During the flushing procedure, the data will be divided by a certain time range (which is configured by table option Segment Duration
), and any SST
is ensured that the timestamps of the data in it are in the same Segment
. Actually this is also a common operation in most timeseries databases which organizes data in the time dimension to speed up subsequent time-related operations, such as querying data over a time range and assisting purge data outside the TTL
.
Compaction
module path: https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/compaction
The data of MemTable
is flushed as SST
s, but the file size of recently flushed SST
may be very small. And too small or too many SST
s lead to the poor query performance. Therefore, Compaction
is then introduced to rearrange the SST
s so that the multiple smaller SST
files can be compacted into a larger SST
file.
Manifest
module path: https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/meta
Manifest
takes responsibilities for managing tables’ metadata of Analytic Engine
including:
- Table schema and table options;
- The sequence number where the newest flush finishes;
- The information of all the
SST
s belonging to the table.
Now the Manifest
is based on WAL
and Object Storage
. The newly written updates on the Manifest
are persisted as logs in WAL
, and in order to avoid infinite expansion of Manifest
(actually every Flush
leads to an update), Snapshot
is also introduced to clean up the history of metadata updates, and the generated Snapshot
will be saved to Object Storage
.
Object Storage
module path: https://github.com/apache/incubator-horaedb/tree/main/components/object_store
The SST
generated by Flush
needs to be persisted and the abstraction of the persistent storage device is ObjectStore
including multiple implementations:
- Based on local file system;
- Based on Alibaba Cloud OSS.
The distributed architecture of HoraeDB separates storage and computing, which requires Object Store
needs to be a highly available and reliable service independent of HoraeDB. Therefore, storage systems like Amazon S3, Alibaba Cloud OSS is a good choice and in the future implementations on storage systems of some other cloud service providers is planned to provide.
SST
module path: https://github.com/apache/incubator-horaedb/tree/main/analytic_engine/src/sst
SST
is actually an abstraction that can have multiple specific implementations. The current implementation is based on Parquet, which is a column-oriented data file format designed for efficient data storage and retrieval.
The format of SST
is very critical for retrieving data and is also the most important part to perform well in handling both timeseries and analytic workloads. At present, our Parquet-based implementation is good at processing analytic workload but is poor at processing timeseries workload. In our roadmap, we will explore more storage formats in order to achieve a good performance in both workloads.
Space
module path: https://github.com/apache/incubator-horaedb/blob/main/analytic_engine/src/space.rs
In Analytic Engine
, there is a concept called space
and here is an explanation for it to resolve some ambiguities when read source code. Actually Analytic Engine
does not have the concept of catalog
and schema
and only provides two levels of relationship: space
and table
. And in the implementation, the schema id
(which should be unique across all catalog
s) on the upper layer is actually mapped to space id
.
The space
in Analytic Engine
serves mainly for isolation of resources for different tenants, such as the usage of memory.
Critical Path
After a brief introduction to some important modules of HoraeDB, we will give a description for some critical paths in code, hoping to provide interested developers with a guide for reading the code.
Query
|
|
Take SELECT
SQL as an example. The figure above shows the query procedure and the numbers in it indicates the order of calling between the modules.
Here are the details:
- Server module chooses a proper rpc module (it may be HTTP, gRPC or mysql) to process the requests according the protocol used by the requests;
- Parse SQL in the request by the parser;
- With the parsed sql and the information provided by catalog/schema module, DataFusion can generate the logical plan;
- With the logical plan, the corresponding
Interpreter
is created and logical plan will be executed by it; - For the logical plan of normal
Select
SQL, it will be executed throughSelectInterpreter
; - In the
SelectInterpreter
the specific query logic is executed by theQuery Engine
:- Optimize the logical plan;
- Generate the physical plan;
- Optimize the physical plan;
- Execute the physical plan;
- The execution of physical plan involves
Analytic Engine
:- Data is obtained by
read
method ofTable
instance provided byAnalytic Engine
; - The source of the table data is
SST
andMemtable
, and the data can be filtered by the pushed down predicates; - After retrieving the table data,
Query Engine
will complete the specific computation and generate the final results;
- Data is obtained by
SelectInterpreter
gets the results and feeds them to the protocol module;- After the protocol layer converts the results, the server module responds to the client with them.
The following is the flow of function calls in version v1.2.2:
┌───────────────────────◀─────────────┐ ┌───────────────────────┐
│ handle_sql │────────┐ │ │ parse_sql │
└───────────────────────┘ │ │ └────────────────┬──────┘
│ ▲ │ │ ▲ │
│ │ │ │ │ │
│ │ │ └36───┐ │ 11
1│ │ │ │ │ │
│ 8│ │ │ │ │
│ │ │ │ 10 │
│ │ │ │ │ │
▼ │ │ │ │ ▼
┌─────────────────┴─────┐ 9│ ┌┴─────┴────────────────┐───────12─────────▶┌───────────────────────┐
│maybe_forward_sql_query│ └────────▶│fetch_sql_query_output │ │ statement_to_plan │
└───┬───────────────────┘ └────┬──────────────────┘◀───────19─────────└───────────────────────┘
│ ▲ │ ▲ │ ▲
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ 35 13 18
2│ 7│ 20 │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ ▼ │
▼ │ ▼ │ ┌───────────────────────┐
┌───────────────────────┐───────────6───────▶┌─────────────────┴─────┐ ┌─────────────────┴─────┐ │Planner::statement_to_p│
│ forward_with_endpoint │ │ forward │ │execute_plan_involving_│ │ lan │
└───────────────────────┘◀────────5──────────└───┬───────────────────┘ ┌──│ partition_table │◀────────┐ └───┬───────────────────┘
│ ▲ │ └───────────────────────┘ │ │ ▲
│ │ │ │ ▲ │ │ │
│ │ │ │ │ │ 14 17
┌───────────────────────┐ │ 4│ │ │ │ │ │ │
┌─────│ PhysicalPlan::execute │ 3│ │ │ 21 │ │ │ │
│ └───────────────────────┘◀──┐ │ │ │ │ 22 │ │ │
│ │ │ │ │ │ │ │ ▼ │
│ │ │ │ │ │ │ │ ┌────────────────────────┐
│ │ ▼ │ │ ▼ │ 34 │sql_statement_to_datafus│
│ ┌───────────────────────┐ 30 ┌─────────────────┴─────┐ │ ┌─────────────────┴─────┐ │ │ ion_plan │
31 │ build_df_session_ctx │ │ │ route │ │ │ build_interpreter │ │ └────────────────────────┘
│ └────┬──────────────────┘ │ └───────────────────────┘ │ └───────────────────────┘ │ │ ▲
│ │ ▲ │ │ │ │ │
│ 27 26 │ 23 │ 15 16
│ ▼ │ │ │ │ │ │
└────▶┌────────────────┴──────┐ │ ┌───────────────────────┐ │ │ │ │
│ execute_logical_plan ├───┴────32────────▶│ execute │──────────┐ │ ┌───────────────────────┐ │ ▼ │
└────┬──────────────────┘◀────────────25────┴───────────────────────┘ 33 │ │interpreter_execute_pla│ │ ┌────────────────────────┐
│ ▲ ▲ └──────┴──▶│ n │────────┘ │SqlToRel::sql_statement_│
28 │ └──────────24────────────────┴───────────────────────┘ │ to_datafusion_plan │
│ 29 └────────────────────────┘
▼ │
┌────────────────┴──────┐
│ optimize_plan │
└───────────────────────┘
- The received request will be forwarded to
handle_sql
after various protocol conversions, and since the request may not be processed by this node, it may need to be forwarded tomaybe_forward_sql_query
to handle the forwarding logic. - After constructing the
ForwardRequest
inmaybe_forward_sql_query
, callforward
- After constructing the
RouteRequest
inforward
, callroute
- Use
route
to get the destination nodeendpoint
and return toforward
. - Call
forward_with_endpoint
to forward the request - return
forward
- return
maybe_forward_sql_query
- return
handle_sql
- If this is a
Local
request, callfetch_sql_query_output
to process it - Call
parse_sql
to parsesql
intoStatment
- return
fetch_sql_query_output
- Call
statement_to_plan
withStatment
- Construct
Planner
withctx
andStatment
, and call thestatement_to_plan
method ofPlanner
- The
planner
will call the correspondingplanner
method for the requested category, at this point oursql
is a query and will callsql_statement_to_plan
- Call
sql_statement_to_datafusion_plan
, which will generate thedatafusion
object, and then callSqlToRel::sql_statement_to_plan
- The generated logical plan is returned from
SqlToRel::sql_statement_to_plan
- return
- return
- return
- Call
execute_plan_involving_partition_table
(in the default configuration) for subsequent optimization and execution of this logical plan - Call
build_interpreter
to generateInterpreter
- return
- Call
Interpreter's
interpreter_execute_plan
method for logical plan execution. - The corresponding
execute
function is called, at this time thesql
is a query, so the execute of theSelectInterpreter
will be called - call
execute_logical_plan
, which will callbuild_df_session_ctx
to generate the optimizer build_df_session_ctx
will use theconfig
information to generate the corresponding context, first using datafusion and some custom optimization rules (in logical_optimize_rules()) to generate the logical plan optimizer, usingapply_adapters_for_physical_optimize_rules
to generate the physical plan optimizer- return optimizer
- Call
optimize_plan
, using the optimizer just generated to first optimize the logical plan and then the physical plan - Return to optimized physical plan
- execute physical plan
- returned after execution
- After collecting the results of all slices, return
- return
- return
- return
- Return to the upper layer for network protocol conversion and finally return to the request sender
Write
|
|
Take INSERT
SQL as an example. The figure above shows the query procedure and the numbers in it indicates the order of calling between the modules.
Here are the details:
- Server module chooses a proper rpc module (it may be HTTP, gRPC or mysql) to process the requests according the protocol used by the requests;
- Parse SQL in the request by the parser;
- With the parsed sql and the catalog/schema module, DataFusion can generate the logical plan;
- With the logical plan, the corresponding
Interpreter
is created and logical plan will be executed by it; - For the logical plan of normal
INSERT
SQL, it will be executed throughInsertInterpreter
; - In the
InsertInterpreter
,write
method ofTable
providedAnalytic Engine
is called:- Write the data into
WAL
first; - Write the data into
MemTable
then;
- Write the data into
- Before writing to
MemTable
, the memory usage will be checked. If the memory usage is too high, the flush process will be triggered:- Persist some old MemTables as
SST
s; - Store updates about the new
SST
s and the flushed sequence number ofWAL
toManifest
; - Delete the corresponding
WAL
entries;
- Persist some old MemTables as
- Server module responds to the client with the execution result.
4 - Storage
The storage engine mainly provides the following two functions:
- Persistence of data
- Under the premise of ensuring the correctness of the data, organize the data in the most reasonable way to meet the query needs of different scenarios.
This document will introduce the internal implementation of the storage engine in HoraeDB. Readers can refer to the content here to explore how to use HoraeDB efficiently.
Overall Structure
HoraeDB is a distributed storage system based on the share-nothing architecture.
Data between different servers is isolated from each other and does not affect each other. The storage engine in each stand-alone machine is a variant of log-structured merge-tree, which is optimized for time-series scenarios. The following figure shows its core components:
Write Ahead Log (WAL)
A write request will be written to
- memtable in memory
- WAL in durable storage
Since memtable is not persisted to the underlying storage system in real time, so WAL is required to ensure the reliability of the data in memtable.
On the other hand, due to the design of the distributed architecture, WAL itself is required to be highly available. Now there are following implementations in HoraeDB:
- Local disk (based on RocksDB, no distributed high availability)
- OceanBase
- Kafka
Memtable
Memtable is a memory data structure used to hold recently written table data. Different tables have its corresponding memtable.
Memtable is read-write by default (aka active), and when the write reaches some threshold, it will become read-only and be replaced by a new memtable.
The read-only memtable will be flushed to the underlying storage system in SST format by background thread. After flush is completed, the read-only memtable can be destroyed, and the corresponding data in WAL can also be deleted.
Sorted String Table(SST)
SST is a persistent format for data, which is stored in the order of primary keys of table. Currently, HoraeDB uses parquet format for this.
For HoraeDB, SST has an important option: segment_duration, only SST within the same segment can be merged, which is benefical for time-series data. And it is also convenient to eliminate expired data.
In addition to storing the original data, the statistical information of the data will also be stored in the SST to speed up the query, such as the maximum value, the minimum value, etc.
Compactor
Compactor can merge multiple small SST files into one, which is used to solve the problem of too many small files. In addition, Compactor will also delete expired data and duplicate data during the compaction. In future, compaction maybe add more task, such as downsample.
The current compaction strategy in HoraeDB reference Cassandra:
Manifest
Manifest records metadata of table, SST file, such as: the minimum and maximum timestamps of the data in an SST.
Due to the design of the distributed architecture, the manifest itself is required to be highly available. Now in HoraeDB, there are mainly the following implementations:
- WAL
- ObjectStore
ObjectStore
ObjectStore is place where data (i.e. SST) is persisted.
Generally speaking major cloud vendors should provide corresponding services, such as Alibaba Cloud’s OSS and AWS’s S3.
5 - Table Partitioning
Note: This feature is still in development, and the API may change in the future.
This chapter discusses PartitionTable
.
The partition table syntax used by HoraeDB is similar to that of MySQL.
General partition tables include Range Partitioning
, List Partitoning
, Hash Partitioning
, and Key Partititioning
.
HoraeDB currently only supports Key Partitioning
.
Architecture
Similar to MySQL, different portions of a partition table are stored as separate tables in different locations.
Currently designed, a partition table can be opened on multiple HoraeDB nodes, supports writing and querying at the same time, and can be expanded horizontally.
As shown in the figure below, PartitionTable
is opened on node0 and node1, and the physical subtables where the actual data are stored on node2 and node3.
┌───────────────────────┐ ┌───────────────────────┐
│Node0 │ │Node1 │
│ ┌────────────────┐ │ │ ┌────────────────┐ │
│ │ PartitionTable │ │ │ │ PartitionTable │ │
│ └────────────────┘ │ │ └────────────────┘ │
│ │ │ │ │ │
└────────────┼──────────┘ └───────────┼───────────┘
│ │
│ │
┌───────────────────────┼─────────────────────────────┼───────────────────────┐
│ │ │ │
┌────────────┼───────────────────────┼─────────────┐ ┌─────────────┼───────────────────────┼────────────┐
│Node2 │ │ │ │Node3 │ │ │
│ ▼ ▼ │ │ ▼ ▼ │
│ ┌─────────────────────┐ ┌─────────────────────┐ │ │ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ SubTable_0 │ │ SubTable_1 │ │ │ │ SubTable_2 │ │ SubTable_3 │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ └─────────────────────┘ └─────────────────────┘ │ │ └─────────────────────┘ └─────────────────────┘ │
│ │ │ │
└──────────────────────────────────────────────────┘ └──────────────────────────────────────────────────┘
Key Partitioning
Key Partitioning
supports one or more column calculations, using the hash algorithm provided by HoraeDB for calculations.
Use restrictions:
- Only tag column is supported as partition key.
LINEAR KEY
is not supported yet.
The table creation statement for the key partitioning is as follows:
|
|
Refer to MySQL key partitioning.
Query
Since the partition table data is actually stored in different physical tables, it is necessary to calculate the actual requested physical table according to the query request when querying.
The query will calculate the physical table to be queried according to the query parameters, and then remotely request the node where the physical table is located to obtain data through the HoraeDB internal service remote engine (support predicate pushdown).
The implementation of the partition table is in PartitionTableImpl.
- Step 1: Parse query sql and calculate the physical table to be queried according to the query parameters.
- Step 2: Query data of physical table.
- Step 3: Compute with the raw data.
│
1 │
│
▼
┌───────────────┐
│Node0 │
│ │
│ │
└───────────────┘
┬
2 │ 2
┌──────────────┴──────────────┐
│ ▲ │
│ 3 │ 3 │
▼ ─────────────┴───────────── ▼
┌───────────────┐ ┌───────────────┐
│Node1 │ │Node2 │
│ │ │ │
│ │ │ │
└───────────────┘ └───────────────┘
Key partitioning
- Filters like
and
,or
,in
,=
will choose specific SubTables. - Fuzzy matching filters like
<
,>
are also supported, but may have poor performance since it will scan all physical tables.
Key partitioning
rule is implemented in KeyRule.
Write
The write process is similar to the query process.
First, according to the partition rules, the write request is split into different partitioned physical tables, and then sent to different physical nodes through the remote engine for actual data writing.
6 - Wal
- WAL on RocksDB
- WAL on Kafka
- WAL on OceanBase will be introduced in later.
7 - WAL on Disk
Architecture
This section introduces the implementation of a standalone Write-Ahead Log (WAL, hereinafter referred to as “the log”) based on a local disk. In this implementation, the log is managed at the region level.
┌────────────────────────────┐
│ HoraeDB │
│ │
│ ┌────────────────────────┐ │
│ │ WAL │ │ ┌────────────────────────┐
│ │ │ │ │ │
│ │ ...... │ │ │ File System │
│ │ │ │ │ │
│ │ ┌────────────────────┐ │ │ manage │ ┌────────────────────┐ │
Write ─────┼─┼─► Region ├─┼─┼─────────┼─► Region Dir │ │
│ │ │ │ │ │ │ │ │ │
Read ─────┼─┼─► ┌────────────┐ │ │ │ mmap │ │ ┌────────────────┐ │ │
│ │ │ │ Segment 0 ├───┼─┼─┼─────────┼─┼─► Segment File 0 │ │ │
│ │ │ └────────────┘ │ │ │ │ │ └────────────────┘ │ │
Delete ─────┼─┼─► ┌────────────┐ │ │ │ mmap │ │ ┌────────────────┐ │ │
│ │ │ │ Segment 1 ├───┼─┼─┼─────────┼─┼─► Segment File 1 │ │ │
│ │ │ └────────────┘ │ │ │ │ │ └────────────────┘ │ │
│ │ │ ┌────────────┐ │ │ │ mmap │ │ ┌────────────────┐ │ │
│ │ │ │ Segment 2 ├───┼─┼─┼─────────┼─┼─► Segment File 2 │ │ │
│ │ │ └────────────┘ │ │ │ │ │ └────────────────┘ │ │
│ │ │ ...... │ │ │ │ │ ...... │ │
│ │ └────────────────────┘ │ │ │ └────────────────────┘ │
│ │ ...... │ │ │ ...... │
│ └────────────────────────┘ │ └────────────────────────┘
└────────────────────────────┘
Data Model
File Paths
Each region has its own directory to manage all segments for that region. The directory is named after the region’s ID. Each segment is named using the format seg_<id>
, with IDs starting from 0 and incrementing.
Segment Format
Logs for all tables within a region are stored in segments, arranged in ascending order of sequence numbers. The structure of the segment files is as follows:
Segment0 Segment1
┌────────────┐ ┌────────────┐
│ Magic Num │ │ Magic Num │
├────────────┤ ├────────────┤
│ Record │ │ Record │
├────────────┤ ├────────────┤
│ Record │ │ Record │
├────────────┤ ├────────────┤ ....
│ Record │ │ Record │
├────────────┤ ├────────────┤
│ ... │ │ ... │
│ │ │ │
└────────────┘ └────────────┘
seg_0 seg_1
In memory, each segment stores additional information used for read, write, and delete operations:
|
|
Log Format
The log format within a segment is as follows:
+---------+--------+------------+--------------+--------------+-------+
| version | crc | table id | sequence num | value length | value |
| (u8) | (u32) | (u64) | (u64) | (u32) |(bytes)|
+---------+--------+------------+--------------+--------------+-------+
Field Descriptions:
version
: Log version number.crc
: Used to ensure data consistency. Computes the CRC checksum from the table id to the end of the record.table id
: The unique identifier of the table.sequence num
: The sequence number of the record.value length
: The byte length of the value.value
: The value in the general log format.
The region ID is not stored in the log because it can be obtained from the file path.
Main Processes
Opening the WAL
Identify all region directories under the WAL directory.
In each region directory, identify all segment files.
Open each segment file, traverse all logs within it, record the start and end offsets of each log, and record the minimum and maximum sequence numbers of each
TableId
in the segment, then close the file.If there is no region directory or there are no segment files under the directory, automatically create the corresponding directory and files.
Reading Logs
Based on the metadata of the segments, determine all segments involved in the current read operation.
Open these segments in order of their IDs from smallest to largest, and decode the raw bytes into logs.
Writing Logs
Serialize the logs to be written into byte data and append them to the segment file with the largest ID.
When a segment is created, it pre-allocates a fixed size of 64MB and will not change dynamically. When the pre-allocated space is used up, a new segment is created, and appending continues in the new segment.
After each append,
flush
is not called immediately; by default,flush
is performed every ten writes or when the segment file is closed.Update the segment’s metadata
table_ranges
in memory.
Deleting Logs
Suppose logs in the table with ID table_id
and sequence numbers less than seq_num
need to be marked as deleted:
Update the
table_ranges
field of the relevant segments in memory, updating the minimum sequence number of the table toseq_num + 1
.If after modification, the minimum sequence number of the table in this segment is greater than the maximum sequence number, remove the table from
table_ranges
.If a segment’s
table_ranges
is empty and it is not the segment with the largest ID, delete the segment file.
8 - WAL on Kafka
Architecture
In this section we present a distributed WAL implementation(based on Kafka). Write-ahead logs(hereinafter referred to as logs) of tables are managed here by region, which can be simply understood as a shared log file of multiple tables.
As shown in the following figure, regions are mapped to topics(with only one partition) in Kafka. And usually two topics are needed by a region, one is used for storing logs and the other is used for storing metadata.
|
|
Data Model
Log Format
The common log format described in WAL on RocksDB is used here.
Metadata
Each region will maintain its metadata both in memory and in Kafka, we call it RegionMeta
here. It can be thought of as a map, taking table id as a key and TableMeta
as a value.
We briefly introduce the variables in TableMeta
here:
next_seq_num
, the sequence number allocated to the next log entry.latest_marked_deleted
, the last flushed sequence number, all logs in the table with a lower sequence number than it can be removed.current_high_watermark
, the high watermark in the Kafka partition after the last writing of this table.seq_offset_mapping
, mapping from sequence numbers to offsets will be done on every write and will removed to the updatedlatest_marked_deleted
after flushing.
┌─────────────────────────────────────────┐
│ RegionMeta │
│ │
│ Map<TableId, TableMeta> table_metas │
└─────────────────┬───────────────────────┘
│
│
│
└─────┐
│
│
┌──────────────────────┴──────────────────────────────┐
│ TableMeta │
│ │
│ SequenceNumber next_seq_num │
│ │
│ SequenceNumber latest_mark_deleted │
│ │
│ KafkaOffset high_watermark │
│ │
│ Map<SequenceNumber, KafkaOffset> seq_offset_mapping │
└─────────────────────────────────────────────────────┘
Main Process
We focus on the main process in one region, following process will be introduced:
- Open or create region.
- Write and read logs.
- Delete logs.
Open or Create Region
Steps
- Search the region in the opened namespace.
- If the region found, the most important thing we need to do is to recover its metadata, we will introduce this later.
- If the region not found and auto creating is defined, just create the corresponding topic in Kafka.
- Add the found or created region to cache, return it afterwards.
Recovery
As mentioned above, the RegionMeta
is actually a map of the TableMeta
. So here we will focus on recovering a specific TableMeta
, and examples will be given to better illustrate this process.
- First, recover the
RegionMeta
from snapshot. We will take a snapshot of theRegionMeta
in some scenarios (e.g. mark logs deleted, clean logs) and put it to the meta topic. The snapshot is actually theRegionMeta
at a particular point in time. When recovering a region, we can use it to avoid scanning all logs in the data topic. The following is the example, we recover from the snapshot taken at the time when Kafka high watermark is 64:
|
|
- Recovering from logs. After recovering from snapshot, we can continue to recover by scanning logs in data topic from the Kafka high watermark when snapshot is taken, and obviously that avoid scanning the whole data topic. Let’s see the example:
|
|
Write and Read Logs
The writing and reading process in a region is simple.
For writing:
- Open the specified region (auto create it if necessary).
- Put the logs to specified Kafka partition by client.
- Update
next_seq_num
,current_high_watermark
andseq_offset_mapping
in correspondingTableMeta
.
For reading:
- Open the specified region.
- Just read all the logs of the region, and the split and replay work will be done by the caller.
Delete Logs
Log deletion can be divided into two steps:
- Mark the logs deleted.
- Do delayed cleaning work periodically in a background thread.
Mark
- Update
latest_mark_deleted
andseq_offset_mapping
(just retain the entries whose’s sequence >= updated latest_mark_deleted) inTableMeta
. - Maybe we need to make and sync the
RegionMeta
snapshot to Kafka while dropping table.
Clean
The cleaning logic done in a background thread called cleaner:
- Make
RegionMeta
snapshot. - Decide whether to clean the logs based on the snapshot.
- If so, sync the snapshot to Kafka first, then clean the logs.
9 - WAL on RocksDB
Architecture
In this section we present a standalone WAL implementation (based on RocksDB). Write-ahead logs(hereinafter referred to as logs) of tables are managed here by table, and we call the corresponding storage data structure TableUnit
. All related data (logs or some metadata) is stored in a single column family for simplicity.
|
|
Data Model
Common Log Format
We use the common key and value format here. Here is the defined key format, and the following is introduction for fields in it:
namespace
: multiple instances of WAL can exist for different purposes (e.g. manifest also needs wal). The namespace is used to distinguish them.region_id
: in some WAL implementations we may need to manage logs from multiple tables, region is the concept to describe such a set of table logs. Obviously the region id is the identification of the region.table_id
: identification of the table logs to which they belong.sequence_num
: each login table can be assigned an identifier, called a sequence number here.version
: for compatibility with old and new formats.
|
|
Here is the defined value format, version
is the same as the key format, payload
can be understood as encoded log content.
|
|
Metadata
The metadata here is stored in the same key-value format as the log. Actually only the last flushed sequence is stored in this implementation. Here is the defined metadata key format and field instructions:
namespace
,table_id
,version
are the same as the log format.key_type
, used to define the type of metadata. MaxSeq now defines that metadata of this type will only record the most recently flushed sequence in the table. Because it is only used in wal on RocksDB, which manages the logs at table level, so there is no region id in this key.
|
|
Here is the defined metadata value format, as you can see, just the version
and max_seq
(flushed sequence) in it:
|
|
Main Process
- Open
TableUnit
:- Read the latest log entry of all tables to recover the next sequence numbers of tables mainly.
- Scan the metadata to recover next sequence num as a supplement (because some table has just triggered flush and no new written logs after this, so no logs exists now).
- Write and read logs. Just write and read key-value from RocksDB.
- Delete logs. For simplicity It will remove corresponding logs synchronously.