Apache HoraeDB™ (incubating) is a high-performance, distributed, cloud native time-series database.
Motivation
In the classic timeseries database, the Tag columns (InfluxDB calls them Tag and Prometheus calls them Label) are normally indexed by generating an inverted index. However, it is found that the cardinality of Tag varies in different scenarios. And in some scenarios the cardinality of Tag is very high, and it takes a very high cost to store and retrieve the inverted index. On the other hand, it is observed that scanning+pruning often used by the analytical databases can do a good job to handle such these scenarios.
The basic design idea of HoraeDB is to adopt a hybrid storage format and the corresponding query method for a better performance in processing both timeseries and analytic workloads.
1 - Getting Started
This page shows you how to get started with HoraeDB quickly. You’ll start a standalone HoraeDB server, and then insert and read some sample data using SQL.
Start server
HoraeDB docker image is the easiest way to get started, if you haven’t installed Docker, go there to install it first.
Note: please choose tag version >= v1.0.0, others are mainly for testing.
You can use command below to start a standalone server
The easiest to use is HTTP, so sections below will use it for demo. For production environments, gRPC/MySQL are recommended.
Customize docker configuration
Refer the command as below, you can customize the configuration of horaedb-server in docker, and mount the data directory /data to the hard disk of the docker host machine.
This chapter introduces the data model of HoraeDB.
2.1.1.1 - Data Types
HoraeDB implements table model, and the supported data types are similar to MySQL.
The following table lists the mapping relationship between MySQL and HoraeDB.
Support Data Type(case-insensitive)
SQL
HoraeDB
null
Null
timestamp
Timestamp
double
Double
float
Float
string
String
Varbinary
Varbinary
uint64
UInt64
uint32
UInt32
uint16
UInt16
uint8
UInt8
int64/bigint
Int64
int32/int
Int32
int16/smallint
Int16
int8/tinyint
Int8
boolean
Boolean
date
Date
time
Time
2.1.1.2 - Special Columns
Tables in HoraeDB have the following constraints:
Primary key is required
The primary key must contain a time column, and can only contain one time column
The primary key must be non-null, so all columns in primary key must be non-null.
Timestamp Column
Tables in HoraeDB must have one timestamp column maps to timestamp in timeseries data, such as timestamp in OpenTSDB/Prometheus.
The timestamp column can be set with timestamp key keyword, like TIMESTAMP KEY(ts).
Tag Column
Tag is use to defined column as tag column, similar to tag in timeseries data, such as tag in OpenTSDB and label in Prometheus.
Primary Key
The primary key is used for data deduplication and sorting. The primary key is composed of some columns and one time column.
The primary key can be set in the following some ways:
use primary key keyword
use tag to auto generate TSID, HoraeDB will use (TSID,timestamp) as primary key
only set Timestamp column, HoraeDB will use (timestamp) as primary key
Notice: If the primary key and tag are specified at the same time, then the tag column is just an additional information identification and will not affect the logic.
If primary keyis not set, and tag columns is provided, TSID will auto generated from hash of tag columns.
In essence, this is also a mechanism for automatically generating id.
2.1.2 - Identifier
Identifier in HoraeDB can be used as table name, column name etc. It cannot be preserved keywords or start with number and punctuation symbols. HoraeDB allows to quote identifiers with back quotes (`). In this case it can be any string like 00_table or select.
2.1.3 - Data Definition Statements
This chapter introduces the data definition statements.
2.1.3.1 - ALTER TABLE
ALTER TABLE can change the schema or options of a table.
ALTER TABLE SCHEMA
HoraeDB current supports ADD COLUMN to alter table schema.
1
2
3
-- create a table and add a column to it
CREATETABLE`t`(aint,ttimestampNOTNULL,TIMESTAMPKEY(t))ENGINE=Analytic;ALTERTABLE`t`ADDCOLUMN(bstring);
It now becomes:
-- DESCRIBE TABLE `t`;
name type is_primary is_nullable is_tag
t timestamp true false false
tsid uint64 true false false
a int false true false
b string false true false
ALTER TABLE OPTIONS
HoraeDB current supports MODIFY SETTING to alter table schema.
1
2
3
-- create a table and add a column to it
CREATETABLE`t`(aint,ttimestampNOTNULL,TIMESTAMPKEY(t))ENGINE=Analytic;ALTERTABLE`t`MODIFYSETTINGwrite_buffer_size='300M';
The SQL above tries to modify the write_buffer_size of the table, and the table’s option becomes:
Drop Table removes a specific table. This statement should be used with caution, because it removes both the table definition and table data, and this removal is not recoverable.
2.1.4 - Data Manipulation Statements
This chapter introduces the data manipulation statements.
There are serval utilities SQL in HoraeDB that can help in table manipulation or query inspection.
SHOW CREATE TABLE
1
SHOWCREATETABLEtable_name;
SHOW CREATE TABLE returns a CREATE TABLE DDL that will create a same table with the given one. Including columns, table engine and options. The schema and options shows in CREATE TABLE will based on the current version of the table. An example:
-- create one table
CREATETABLE`t`(abigint,bintdefault3,cstringdefault'x',dsmallintnull,ttimestampNOTNULL,TIMESTAMPKEY(t))ENGINE=Analytic;-- Result: affected_rows: 0
-- show how one table should be created.
SHOWCREATETABLE`t`;-- Result DDL:
CREATETABLE`t`(`t`timestampNOTNULL,`tsid`uint64NOTNULL,`a`bigint,`b`int,`c`string,`d`smallint,PRIMARYKEY(t,tsid),TIMESTAMPKEY(t))ENGINE=AnalyticWITH(arena_block_size='2097152',compaction_strategy='default',compression='ZSTD',enable_ttl='true',num_rows_per_row_group='8192',segment_duration='',ttl='7d',update_mode='OVERWRITE',write_buffer_size='33554432')
DESCRIBE
1
DESCRIBEtable_name;
DESCRIBE will show a detailed schema of one table. The attributes include column name and type, whether it is tag and primary key (todo: ref) and whether it’s nullable. The auto created column tsid will also be included (todo: ref).
logical_plan
Projection: #MAX(07_optimizer_t.value) AS c1, #AVG(07_optimizer_t.value) AS c2
Aggregate: groupBy=[[#07_optimizer_t.name]], aggr=[[MAX(#07_optimizer_t.value), AVG(#07_optimizer_t.value)]]
TableScan: 07_optimizer_t projection=Some([name, value])
physical_plan
ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]
AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 6)
AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]
ScanTable: table=07_optimizer_t, parallelism=8, order=None
2.1.6 - Options
Options below can be used when create table for analytic engine
enable_ttl, bool. When enable TTL on a table, rows older than ttl will be deleted and can’t be querid, default true
ttl, duration, lifetime of a row, only used when enable_ttl is true. default 7d.
storage_format, string. The underlying column’s format. Availiable values:
columnar, default
hybrid, Note: This feature is still in development, and it may change in the future.
The meaning of those two values are in Storage format section.
Storage Format
There are mainly two formats supported in analytic engine. One is columnar, which is the traditional columnar format, with one table column in one physical column:
1
2
3
4
5
6
7
8
9
| Timestamp | Device ID | Status Code | Tag 1 | Tag 2 |
| --------- |---------- | ----------- | ----- | ----- |
| 12:01 | A | 0 | v1 | v1 |
| 12:01 | B | 0 | v2 | v2 |
| 12:02 | A | 0 | v1 | v1 |
| 12:02 | B | 1 | v2 | v2 |
| 12:03 | A | 0 | v1 | v1 |
| 12:03 | B | 0 | v2 | v2 |
| ..... | | | | |
The other one is hybrid, an experimental format used to simulate row-oriented storage in columnar storage to accelerate classic time-series query.
In classic time-series user cases like IoT or DevOps, queries will typically first group their result by series id(or device id), then by timestamp. In order to achieve good performance in those scenarios, the data physical layout should match this style, so the hybrid format is proposed like this:
1
2
3
4
5
| Device ID | Timestamp | Status Code | Tag 1 | Tag 2 | minTime | maxTime |
|-----------|---------------------|-------------|-------|-------|---------|---------|
| A | [12:01,12:02,12:03] | [0,0,0] | v1 | v1 | 12:01 | 12:03 |
| B | [12:01,12:02,12:03] | [0,1,0] | v2 | v2 | 12:01 | 12:03 |
| ... | | | | | | |
Within one file, rows belonging to the same primary key(eg: series/device id) are collapsed into one row
The columns besides primary key are divided into two categories:
collapsible, those columns will be collapsed into a list. Used to encode fields in time-series table
Note: only fixed-length type is supported now
non-collapsible, those columns should only contain one distinct value. Used to encode tags in time-series table
Note: only string type is supported now
Two more columns are added, minTime and maxTime. Those are used to cut unnecessary rows out in query.
This will create a table with hybrid format, users can inspect data format with parquet-tools. The table above should have following parquet schema:
message arrow_schema {
optional group ts (LIST) {
repeated group list {
optional int64 item (TIMESTAMP(MILLIS,false));
}
}
required int64 tsid (INTEGER(64,false));
optional binary tag1 (STRING);
optional binary tag2 (STRING);
optional group value1 (LIST) {
repeated group list {
optional double item;
}
}
optional group value2 (LIST) {
repeated group list {
optional int32 item;
}
}
}
2.1.7 - Scalar Functions
HoraeDB SQL is implemented with DataFusion, Here is the list of scalar functions. See more detail, Refer to Datafusion
Math Functions
Function
Description
abs(x)
absolute value
acos(x)
inverse cosine
asin(x)
inverse sine
atan(x)
inverse tangent
atan2(y, x)
inverse tangent of y / x
ceil(x)
nearest integer greater than or equal to argument
cos(x)
cosine
exp(x)
exponential
floor(x)
nearest integer less than or equal to argument
ln(x)
natural logarithm
log10(x)
base 10 logarithm
log2(x)
base 2 logarithm
power(base, exponent)
base raised to the power of exponent
round(x)
round to nearest integer
signum(x)
sign of the argument (-1, 0, +1)
sin(x)
sine
sqrt(x)
square root
tan(x)
tangent
trunc(x)
truncate toward zero
Conditional Functions
Function
Description
coalesce
Returns the first of its arguments that is not null. Null is returned only if all arguments are null. It is often used to substitute a default value for null values when data is retrieved for display.
nullif
Returns a null value if value1 equals value2; otherwise it returns value1. This can be used to perform the inverse operation of the coalesce expression.
String Functions
Function
Description
ascii
Returns the numeric code of the first character of the argument. In UTF8 encoding, returns the Unicode code point of the character. In other multibyte encodings, the argument must be an ASCII character.
bit_length
Returns the number of bits in a character string expression.
btrim
Removes the longest string containing any of the specified characters from the start and end of string.
char_length
Equivalent to length.
character_length
Equivalent to length.
concat
Concatenates two or more strings into one string.
concat_ws
Combines two values with a given separator.
chr
Returns the character based on the number code.
initcap
Capitalizes the first letter of each word in a string.
left
Returns the specified leftmost characters of a string.
length
Returns the number of characters in a string.
lower
Converts all characters in a string to their lower case equivalent.
lpad
Left-pads a string to a given length with a specific set of characters.
ltrim
Removes the longest string containing any of the characters in characters from the start of string.
md5
Calculates the MD5 hash of a given string.
octet_length
Equivalent to length.
repeat
Returns a string consisting of the input string repeated a specified number of times.
replace
Replaces all occurrences in a string of a substring with a new substring.
reverse
Reverses a string.
right
Returns the specified rightmost characters of a string.
rpad
Right-pads a string to a given length with a specific set of characters.
rtrim
Removes the longest string containing any of the characters in characters from the end of string.
digest
Calculates the hash of a given string.
split_part
Splits a string on a specified delimiter and returns the specified field from the resulting array.
starts_with
Checks whether a string starts with a particular substring.
strpos
Searches a string for a specific substring and returns its position.
substr
Extracts a substring of a string.
translate
Translates one set of characters into another.
trim
Removes the longest string containing any of the characters in characters from either the start or end of string.
upper
Converts all characters in a string to their upper case equivalent.
Regular Expression Functions
Function
Description
regexp_match
Determines whether a string matches a regular expression pattern.
regexp_replace
Replaces all occurrences in a string of a substring that matches a regular expression pattern with a new substring.
Temporal Functions
Function
Description
to_timestamp
Converts a string to type Timestamp(Nanoseconds, None).
to_timestamp_millis
Converts a string to type Timestamp(Milliseconds, None).
to_timestamp_micros
Converts a string to type Timestamp(Microseconds, None).
to_timestamp_seconds
Converts a string to type Timestamp(Seconds, None).
extract
Retrieves subfields such as year or hour from date/time values.
date_part
Retrieves subfield from date/time values.
date_trunc
Truncates date/time values to specified precision.
date_bin
Bin date/time values to specified precision.
from_unixtime
Converts Unix epoch to type Timestamp(Nanoseconds, None).
now
Returns current time as Timestamp(Nanoseconds, UTC).
Other Functions
Function
Description
array
Create an array.
arrow_typeof
Returns underlying type.
in_list
Check if value in list.
random
Generate random value.
sha224
sha224
sha256
sha256
sha384
sha384
sha512
sha512
to_hex
Convert to hex.
2.1.8 - Aggregate Functions
HoraeDB SQL is implemented with DataFusion, Here is the list of aggregate functions. See more detail, Refer to Datafusion
General
Function
Description
min
Returns the minimum value in a numerical column
max
Returns the maximum value in a numerical column
count
Returns the number of rows
avg
Returns the average of a numerical column
sum
Sums a numerical column
array_agg
Puts values into an array
Statistical
Function
Description
var / var_samp
Returns the variance of a given column
var_pop
Returns the population variance of a given column
stddev / stddev_samp
Returns the standard deviation of a given column
stddev_pop
Returns the population standard deviation of a given column
covar / covar_samp
Returns the covariance of a given column
covar_pop
Returns the population covariance of a given column
corr
Returns the correlation coefficient of a given column
Approximate
Function
Description
approx_distinct
Returns the approximate number (HyperLogLog) of distinct input values
approx_median
Returns the approximate median of input values. It is an alias of approx_percentile_cont(x, 0.5).
approx_percentile_cont
Returns the approximate percentile (TDigest) of input values, where p is a float64 between 0 and 1 (inclusive). It supports raw data as input and build Tdigest sketches during query time, and is approximately equal to approx_percentile_cont_with_weight(x, 1, p).
approx_percentile_cont_with_weight
Returns the approximate percentile (TDigest) of input values with weight, where w is weight column expression and p is a float64 between 0 and 1 (inclusive). It supports raw data as input or pre-aggregated TDigest sketches, then builds or merges Tdigest sketches during query time. TDigest sketches are a list of centroid (x, w), where x stands for mean and w stands for weight.
2.2 - Cluster Deployment
In the Getting Started section, we have introduced the deployment of single HoraeDB instance.
Besides, as a distributed timeseries database, multiple HoraeDB instances can be deployed as a cluster to serve with high availability and scalability.
Currently, work about the integration with kubernetes is still in process, so HoraeDB cluster can only be deployed manually. And there are two modes of cluster deployment:
2.2.1 - NoMeta
Note: This feature is for testing use only, not recommended for production use, related features may change in the future.
This guide shows how to deploy a HoraeDB cluster without HoraeMeta, but with static, rule-based routing.
The crucial point here is that HoraeDB server provides configurable routing function on table name so what we need is just a valid config containing routing rules which will be shipped to every HoraeDB instance in the cluster.
Target
First, let’s assume that our target is to deploy a cluster consisting of two HoraeDB instances on the same machine. And a large cluster of more HoraeDB instances can be deployed according to the two-instance example.
Prepare Config
Basic
Suppose the basic config of HoraeDB is:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[server]
bind_addr = "0.0.0.0"http_port = 5440grpc_port = 8831[logger]
level = "info"[tracing]
dir = "/tmp/horaedb"[analytic.storage.object_store]
type = "Local"data_dir = "/tmp/horaedb"[analytic.wal]
type = "RocksDB"data_dir = "/tmp/horaedb"
In order to deploy two HoraeDB instances on the same machine, the config should choose different ports to serve and data directories to store data.
Say the HoraeDB_0’s config is:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[server]
bind_addr = "0.0.0.0"http_port = 5440grpc_port = 8831[logger]
level = "info"[tracing]
dir = "/tmp/horaedb_0"[analytic.storage.object_store]
type = "Local"data_dir = "/tmp/horaedb_0"[analytic.wal]
type = "RocksDB"data_dir = "/tmp/horaedb_0"
Then the HoraeDB_1’s config is:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[server]
bind_addr = "0.0.0.0"http_port = 15440grpc_port = 18831[logger]
level = "info"[tracing]
dir = "/tmp/horaedb_1"[analytic.storage.object_store]
type = "Local"data_dir = "/tmp/horaedb_1"[analytic.wal]
type = "RocksDB"data_dir = "/tmp/horaedb_1"
Schema&Shard Declaration
Then we should define the common part – schema&shard declaration and routing rules.
This rule means that all the table with prod_ prefix belonging to public_0 should be routed to shard_0 of public_0, that is to say, HoraeDB_0. As for the other tables whose names are not prefixed by prod_ will be routed by hash to both shard_0 and shard_1 of public_0.
Besides prefix rule, we can also define a hash rule:
This rule tells HoraeDB to route public_1’s tables to both shard_0 and shard_1 of public_1, that is to say, HoraeDB0 and HoraeDB_1. And actually this is default routing behavior if no such rule provided for schema public_1.
For now, we can provide the full example config for HoraeDB_0 and HoraeDB_1:
Let’s name the two different config files as config_0.toml and config_1.toml but you should know in the real environment the different HoraeDB instances can be deployed across different machines, that is to say, there is no need to choose different ports and data directories for different HoraeDB instances so that all the HoraeDB instances can share one exactly same config file.
Start HoraeDBs
After the configs are prepared, what we should to do is to start HoraeDB container with the specific config.
After the two containers are created and starting running, read and write requests can be served by the two-instances HoraeDB cluster.
2.2.2 - WithMeta
This guide shows how to deploy a HoraeDB cluster with HoraeMeta. And with the HoraeMeta, the whole HoraeDB cluster will feature: high availability, load balancing and horizontal scalability if the underlying storage used by HoraeDB is separated service.
Deploy HoraeMeta
Introduce
HoraeMeta is one of the core services of HoraeDB distributed mode, it is used to manage and schedule the HoraeDB cluster. By the way, the high availability of HoraeMeta is ensured by embedding ETCD. Also, the ETCD service is provided for HoraeDB servers to manage the distributed shard locks.
At present, HoraeMeta supports specifying service startup configuration in two ways: configuration file and environment variable. We provide an example of configuration file startup. For details, please refer to config. The configuration priority of environment variables is higher than that of configuration files. When they exist at the same time, the environment variables shall prevail.
Dynamic or Static
Even with the HoraeMeta, the HoraeDB cluster can be deployed with a static or a dynamic topology. With a static topology, the table distribution is static after the cluster is initialized while with the dynamic topology, the tables can migrate between different HoraeDB nodes to achieve load balance or failover. However, the dynamic topology can be enabled only if the storage used by the HoraeDB node is remote, otherwise the data may be corrupted when tables are transferred to a different HoraeDB node when the data of HoraeDB is persisted locally.
Currently, the dynamic scheduling over the cluster topology is disabled by default in HoraeMeta, and in this guide, we won’t enable it because local storage is adopted here. If you want to enable the dynamic scheduling, the TOPOLOGY_TYPE can be set as dynamic (static by default), and after that, load balancing and failover will work. However, don’t enable it if what the underlying storage is local disk.
With the static topology, the params DEFAULT_CLUSTER_NODE_COUNT, which denotes the number of the HoraeDB nodes in the deployed cluster and should be set to the real number of machines for HoraeDB server, matters a lot because after cluster initialization the HoraeDB nodes can’t be changed any more.
Start HoraeMeta Instances
HoraeMeta is based on etcd to achieve high availability. In product environment, we usually deploy multiple nodes, but in local environment and testing, we can directly deploy a single node to simplify the entire deployment process.
Standalone
1
2
3
docker run -d --name horaemeta-server \
-p 2379:2379 \
ghcr.io/apache/horaemeta-server:nightly-20231225-ab067bf0
And if the storage used by the HoraeDB is remote and you want to enable the dynamic schedule features of the HoraeDB cluster, the -e TOPOLOGY_TYPE=dynamic can be added to the docker run command.
Deploy HoraeDB
In the NoMeta mode, HoraeDB only requires the local disk as the underlying storage because the topology of the HoraeDB cluster is static. However, with HoraeMeta, the cluster topology can be dynamic, that is to say, HoraeDB can be configured to use a non-local storage service for the features of a distributed system: HA, load balancing, scalability and so on. And HoraeDB can be still configured to use a local storage with HoraeMeta, which certainly leads to a static cluster topology.
The relevant storage configurations include two parts:
Object Storage
WAL Storage
Note: If you are deploying HoraeDB over multiple nodes in a production environment, please set the environment variable for the server address as follows:
1
exportHORAEDB_SERVER_ADDR="{server_address}:8831"
This address is used for communication between HoraeMeta and HoraeDB, please ensure it is valid.
Object Storage
Local Storage
Similarly, we can configure HoraeDB to use a local disk as the underlying storage:
1
2
3
[analytic.storage.object_store]
type = "Local"data_dir = "/home/admin/data/horaedb"
OSS
Aliyun OSS can be also used as the underlying storage for HoraeDB, with which the data is replicated for disaster recovery. Here is a example config, and you have to replace the templates with the real OSS parameters:
Amazon S3 can be also used as the underlying storage for HoraeDB. Here is a example config, and you have to replace the templates with the real S3 parameters:
The WAL based on RocksDB is also a kind of local storage for HoraeDB, which is easy for a quick start:
1
2
3
[analytic.wal]
type = "RocksDB"data_dir = "/home/admin/data/horaedb"
OceanBase
If you have deployed a OceanBase cluster, HoraeDB can use it as the WAL storage for data disaster recovery. Here is a example config for such WAL, and you have to replace the templates with real OceanBase parameters:
If you have deployed a Kafka cluster, HoraeDB can also use it as the WAL storage. Here is example config for it, and you have to replace the templates with real parameters of the Kafka cluster:
1
2
3
4
5
[analytic.wal]
type = "Kafka"[analytic.wal.kafka.client]
boost_brokers = [ "{boost_broker1}", "{boost_broker2}" ]
Meta Client Config
Besides the storage configurations, HoraeDB must be configured to start in WithMeta mode and connect to the deployed HoraeMeta:
Compaction offload is also supported. To enable compaction offload, the corresponding compaction mode with node picker and endpoint should be configured.
node_picker: There are two types of node picker – Local and Remote(WIP).
When the Local is setted, the local compaction task would be offloaded to the specific remote compaction server, which decided by endpoint.
endpoint: The endpoint, in the form addr:port, indicating the grpc port of the remote compaction server.
A Compaction Server, responsible for executing the compaction task, is also needed. Currently horaedb-server will act as this role, in the future we can move it to an independent service.
Complete Config of HoraeDB
With all the parts of the configurations mentioned above, a runnable complete config for HoraeDB can be made. In order to make the HoraeDB cluster runnable, we can decide to adopt RocksDB-based WAL and local-disk-based Object Storage without compaction offload:
Let’s name this config file as config.toml. And the example configs, in which the templates must be replaced with real parameters before use, for remote storages are also provided:
using database, database can be overwritten by ReqContext in single Write or SQLRequest
RPCMaxRecvMsgSize
configration for grpc MaxCallRecvMsgSize, default 1024 _ 1024 _ 1024
RouteMaxCacheSize
If the maximum number of router cache size, router client whill evict oldest if exceeded, default is 10000
Notice:
HoraeDB currently only supports the default database public now, multiple databases will be supported in the future
Manage Table
HoraeDB uses SQL to manage tables, such as creating tables, deleting tables, or adding columns, etc., which is not much different from when you use SQL to manage other databases.
For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.
Of course, you can also use create table statement to manage the table more finely (such as adding indexes).
Example for creating table
1
2
3
4
5
6
7
8
9
10
11
12
13
createTableSQL :=`
CREATE TABLE IF NOT EXISTS demo (
name string TAG,
value double,
t timestamp NOT NULL,
TIMESTAMP KEY(t)
) ENGINE=Analytic with (enable_ttl=false)` req := horaedb.SQLQueryRequest{
Tables: []string{"demo"},
SQL: createTableSQL,
}
resp, err := client.SQLQuery(context.Background(), req)
finalCeresDBOptionsopts=CeresDBOptions.newBuilder("127.0.0.1",8831,DIRECT)// CeresDB default grpc port 8831,use DIRECT RouteMode.database("public")// use database for client, can be overridden by the RequestContext in request// maximum retry times when write fails// (only some error codes will be retried, such as the routing table failure).writeMaxRetries(1)// maximum retry times when read fails// (only some error codes will be retried, such as the routing table failure).readMaxRetries(1).build();finalCeresDBClientclient=newCeresDBClient();if(!client.init(opts)){thrownewIllegalStateException("Fail to start CeresDBClient");}
The initialization requires at least three parameters:
Endpoint: 127.0.0.1
Port: 8831
RouteMode: DIRECT/PROXY
Here is the explanation of RouteMode. There are two kinds of RouteMode,The Direct mode should be adopted to avoid forwarding overhead if all the servers are accessible to the client.
However, the Proxy mode is the only choice if the access to the servers from the client must go through a gateway.
For more configuration options, see configuration
Notice: HoraeDB currently only supports the default database public now, multiple databases will be supported in the future;
Create Table Example
For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.
Of course, you can also use create table statement to manage the table more finely (such as adding indexes).
The following table creation statement(using the SQL API included in SDK )shows all field types supported by HoraeDB:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Create table manually, creating table schema ahead of data ingestion is not requiredStringcreateTableSql="CREATE TABLE IF NOT EXISTS machine_table("+"ts TIMESTAMP NOT NULL,"+"city STRING TAG NOT NULL,"+"ip STRING TAG NOT NULL,"+"cpu DOUBLE NULL,"+"mem DOUBLE NULL,"+"TIMESTAMP KEY(ts)"+// timestamp column must be specified") ENGINE=Analytic";Result<SqlQueryOk,Err>createResult=client.sqlQuery(newSqlQueryRequest(createTableSql)).get();if(!createResult.isOk()){thrownewIllegalStateException("Fail to create table");}
Drop Table Example
Here is an example of dropping table:
1
2
3
4
5
6
StringdropTableSql="DROP TABLE machine_table";Result<SqlQueryOk,Err>dropResult=client.sqlQuery(newSqlQueryRequest(dropTableSql)).get();if(!dropResult.isOk()){thrownewIllegalStateException("Fail to drop table");}
Write Data Example
Firstly, you can use PointBuilder to build HoraeDB points:
1
2
3
4
5
6
7
8
9
10
11
12
List<Point>pointList=newLinkedList<>();for(inti=0;i<100;i++){// build one pointfinalPointpoint=Point.newPointBuilder("machine_table").setTimestamp(t0).addTag("city","Singapore").addTag("ip","10.0.0.1").addField("cpu",Value.withDouble(0.23)).addField("mem",Value.withDouble(0.55)).build();points.add(point);}
Then, you can use write interface to write data:
1
2
3
4
5
6
7
8
finalCompletableFuture<Result<WriteOk,Err>>wf=client.write(newWriteRequest(pointList));// here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture APIfinalResult<WriteOk,Err>writeResult=wf.get();Assert.assertTrue(writeResult.isOk());// `Result` class referenced the Rust language practice, provides rich functions (such as mapXXX, andThen) transforming the result value to improve programming efficiency. You can refer to the API docs for detail usage.Assert.assertEquals(3,writeResult.getOk().getSuccess());Assert.assertEquals(3,writeResult.mapOr(0,WriteOk::getSuccess).intValue());Assert.assertEquals(0,writeResult.mapOr(-1,WriteOk::getFailed).intValue());
finalSqlQueryRequestqueryRequest=SqlQueryRequest.newBuilder().forTables("machine_table")// table name is optional. If not provided, SQL parser will parse the `ssql` to get the table name and do the routing automaticly.sql("select * from machine_table where ts = %d",t0)//.build();finalCompletableFuture<Result<SqlQueryOk,Err>>qf=client.sqlQuery(queryRequest);// here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture APIfinalResult<SqlQueryOk,Err>queryResult=qf.get();Assert.assertTrue(queryResult.isOk());finalSqlQueryOkqueryOk=queryResult.getOk();Assert.assertEquals(1,queryOk.getRowCount());// get rows as listfinalList<Row>rows=queryOk.getRowList();Assert.assertEquals(t0,rows.get(0).getColumn("ts").getValue().getTimestamp());Assert.assertEquals("Singapore",rows.get(0).getColumn("city").getValue().getString());Assert.assertEquals("10.0.0.1",rows.get(0).getColumn("ip").getValue().getString());Assert.assertEquals(0.23,rows.get(0).getColumn("cpu").getValue().getDouble(),0.0000001);Assert.assertEquals(0.55,rows.get(0).getColumn("mem").getValue().getDouble(),0.0000001);// get rows as streamfinalStream<Row>rowStream=queryOk.stream();rowStream.forEach(row->System.out.println(row.toString()));
longstart=System.currentTimeMillis();longt=start;finalStreamWriteBuf<Point,WriteOk>writeBuf=client.streamWrite("machine_table");for(inti=0;i<1000;i++){finalPointstreamData=Point.newPointBuilder("machine_table").setTimestamp(t).addTag("city","Beijing").addTag("ip","10.0.0.3").addField("cpu",Value.withDouble(0.42)).addField("mem",Value.withDouble(0.67)).build();writeBuf.writeAndFlush(Collections.singletonList(streamData));t=t+1;}finalCompletableFuture<WriteOk>writeOk=writeBuf.completed();Assert.assertEquals(1000,writeOk.join().getSuccess());finalSqlQueryRequeststreamQuerySql=SqlQueryRequest.newBuilder().sql("select * from %s where city = '%s' and ts >= %d and ts < %d","machine_table","Beijing",start,t).build();finalResult<SqlQueryOk,Err>streamQueryResult=client.sqlQuery(streamQuerySql).get();Assert.assertTrue(streamQueryResult.isOk());Assert.assertEquals(1000,streamQueryResult.getOk().getRowCount());
Firstly, it’s worth noting that the imported packages are used across all the code snippets in this guide, and they will not be repeated in the following.
The initialization requires at least two parameters:
Endpoint: the server endpoint consisting of ip address and serving port, e.g. 127.0.0.1:8831;
Mode: The mode of the communication between client and server, and there are two kinds of Mode: Direct and Proxy.
Endpoint is simple, while Mode deserves more explanation. The Direct mode should be adopted to avoid forwarding overhead if all the servers are accessible to the client. However, the Proxy mode is the only choice if the access to the servers from the client must go through a gateway.
The default_database can be set and will be used if following rpc calling without setting the database in the RpcContext.
By configuring the RpcConfig, resource and performance of the client can be manipulated, and all of the configurations can be referred at here.
Create Table
For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.
Of course, you can also use create table statement to manage the table more finely (such as adding indexes).
Here is a example for creating table by the initialized client:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
asyncdefasync_query(client, ctx, req):
await client.sql_query(ctx, req)
create_table_sql ='CREATE TABLE IF NOT EXISTS demo ( \
name string TAG, \
value double, \
t timestamp NOT NULL, \
TIMESTAMP KEY(t)) ENGINE=Analytic with (enable_ttl=false)'req = SqlQueryRequest(['demo'], create_table_sql)
rpc_ctx = RpcContext()
rpc_ctx.database ='public'rpc_ctx.timeout_ms =100event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(async_query(client, rpc_ctx, req))
RpcContext can be used to overwrite the default database and timeout defined in the initialization of the client.
Write Data
PointBuilder can be used to construct a point, which is actually a row in data set. The write request consists of multiple points.
As the example shows, two parameters are needed to construct the SqlQueryRequest:
The tables involved by this sql query;
The query sql.
Currently, the first parameter is necessary for performance on routing.
With retrieved data, we can process it row by row and column by column:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Access row by index in the resp.for row_idx inrange(0, resp.num_rows()):
row_tokens = []
row = resp.row_by_idx(row_idx)
for col_idx inrange(0, row.num_cols()):
col = row.column_by_idx(col_idx)
row_tokens.append(f"{col.name()}:{col.value()}#{col.data_type()}")
print(f"row#{row_idx}: {','.join(row_tokens)}")
# Access row by iter in the resp.for row in resp.iter_rows():
row_tokens = []
for col in row.iter_columns():
row_tokens.append(f"{col.name()}:{col.value()}#{col.data_type()}")
print(f"row: {','.join(row_tokens)}")
Drop Table
Finally, we can drop the table by the sql api, which is similar to the table creation:
Set default_database, it will be used if following rpc calling without setting the database in the RpcContext(will be introduced in later):
1
letbuilder=builder.default_database("public");
Finally, we build client from builder:
1
letclient=builder.build();
Manage Table
For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.
Of course, you can also use create table statement to manage the table more finely (such as adding indexes).
You can use the sql query interface to create or drop table, related setting will be introduced in sql query section.
Create table:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
letcreate_table_sql=r#"CREATE TABLE IF NOT EXISTS horaedb (
str_tag string TAG,
int_tag int32 TAG,
var_tag varbinary TAG,
str_field string,
int_field int32,
bin_field varbinary,
t timestamp NOT NULL,
TIMESTAMP KEY(t)) ENGINE=Analytic with
(enable_ttl='false')"#;letreq=SqlQueryRequest{tables: vec!["horaedb".to_string()],sql: create_table_sql.to_string(),};letresp=client.sql_query(rpc_ctx,&req).await.expect("Should succeed to create table");
Drop table:
1
2
3
4
5
6
7
8
9
10
letdrop_table_sql="DROP TABLE horaedb";letreq=SqlQueryRequest{tables: vec!["horaedb".to_string()],sql: drop_table_sql.to_string(),};letresp=client.sql_query(rpc_ctx,&req).await.expect("Should succeed to create table");
Write
We support to write with the time series data model like InfluxDB.
Build the point first by PointBuilder, the related data structure of tag value and field value in it is defined as Value, detail about Value:
New rpc_ctx, and it can also be defined on demand or just use the default value, detail about rpc ctx:
Finally, write to server by client.
1
2
3
4
5
letrpc_ctx=RpcContext{database: Some("public".to_string()),..Default::default()};letresp=client.write(rpc_ctx,&write_req).await.expect("Should success to write");
Sql Query
We support to query data with sql.
Define related tables and sql in sql query request:
This guide introduces the operation and maintenance of HoraeDB, including cluster installation, database&table operations, fault tolerance, disaster recovery, data import and export, etc.
2.4.1 - Block List
Add block list
If you want to reject query for a table, you can add table name to ‘read_block_list’.
The Operations for HoraeDB cluster mode, it can only be used when HoraeMeta is deployed.
Operation Interface
You need to replace 127.0.0.1 with the actual project path.
Query table
When tableNames is not empty, use tableNames for query.
When tableNames is empty, ids are used for query. When querying with ids, schemaName is useless.
curl --location --request GET 'http://127.0.0.1:5000/route/{table_name}'
2.5 - Ecosystem
HoraeDB has an open ecosystem that encourages collaboration and innovation, allowing developers to use what suits them best.
2.5.1 - InfluxDB
InfluxDB is a time series database designed to handle high write and query loads. It is an integral component of the TICK stack. InfluxDB is meant to be used as a backing store for any use case involving large amounts of timestamped data, including DevOps monitoring, application metrics, IoT sensor data, and real-time analytics.
Measurement will be mapped to table in HoraeDB, and it will be created automatically in first write(Note: The default TTL is 7d, and points written exceed TTL will be discarded directly).
For example, when inserting data above, table below will be automatically created in HoraeDB:
When InfluxDB writes data, the timestamp precision is nanosecond by default, HoraeDB only supports millisecond timestamp, user can specify the data precision by precision parameter, HoraeDB will automatically convert to millisecond precision internally.
Query string parameters such as db aren’t supported.
Query
1
curl -G 'http://localhost:5440/influxdb/v1/query' --data-urlencode 'q=SELECT * FROM "demo"'
Metric will be mapped to table in HoraeDB, and it will be created automatically in first write(Note: The default TTL is 7d, and points written exceed TTL will be discarded directly).
For example, when inserting data above, table below will be automatically created in HoraeDB:
OpenTSDB query protocol is not currently supported, tracking issue.
2.5.3 - Prometheus
Prometheus is a popular cloud-native monitoring tool that is widely adopted by organizations due to its scalability, reliability, and scalability. It is used to scrape metrics from cloud-native services, such as Kubernetes and OpenShift, and stores it in a time-series database. Prometheus is also easily extensible, allowing users to extend its features and capabilities with other databases.
HoraeDB can be used as a long-term storage solution for Prometheus. Both remote read and remote write API are supported.
Config
You can configure Prometheus to use HoraeDB as a remote storage by adding following lines to prometheus.yml:
We would like to structure our commit message like this:
<type>[optional scope]: <description>
There are three parts. type is used to classify which kind of work this commit does. scope is an optional field that provides additional contextual information. And the last field is your description of this commit.
Type
Here we list some common types and their meanings.
feat: Implement a new feature.
fix: Patch a bug.
docs: Add document or comment.
build: Change the build script or configuration.
style: Style change (only). No logic involved.
refactor: Refactor an existing module for performance, structure, or other reasons.
test: Enhance test coverage or sqlness.
chore: None of the above.
Scope
The scope is more flexible than type. And it may have different values under different types.
For example, In a feat or build commit we may use the code module to define scope, like
But you don’t need to add a scope every time. This isn’t mandatory. It’s just a way to help describe the commit.
After all
There are many other rules or scenarios in conventional commit’s website. We are still exploring a better and more friendly workflow. Please do let us know by open an issue if you have any suggestions ❤️
3.3 - Compile
In order to compile HoraeDB, some relevant dependencies(including the Rust toolchain) should be installed.
Dependencies
Ubuntu
Assuming the development environment is Ubuntu20.04, execute the following command to install the required dependencies:
If the development environment is MacOS, execute the following command to install the required dependencies.
Install command line tools:
1
xcode-select --install
Install protobuf:
1
brew install protobuf
Rust
Rust can be installed by rustup. After installing rustup, when entering the HoraeDB project, the specified Rust version will be automatically downloaded according to the rust-toolchain file.
After execution, you need to add environment variables to use the Rust toolchain. Basically, just put the following commands into your ~/.bashrc or ~/.bash_profile:
1
source$HOME/.cargo/env
Compile and Run
horaedb-server
Compile horaedb-server by the following command in project root directory:
cargo build
Then you can run it using the default configuration file provided in the codebase.
When compiling on macOS, you may encounter following errors:
IO error: while open a file for lock: /var/folders/jx/grdtrdms0zl3hy6zp251vjh80000gn/T/.tmpmFOAF9/manifest/LOCK: Too many open files
or
error: could not compile `regex-syntax` (lib)
warning: build failed, waiting for other jobs to finish...
LLVM ERROR: IO failure on output stream: File too large
error: could not compile `syn` (lib)
To fix those, you should adjust ulimit as follows:
1
2
ulimit -n unlimited
ulimit -f unlimited
horaemeta-server
Building horaemeta-server require Golang version >= 1.21, please install it before compile.
Then in horaemeta directory, execute:
1
go build -o bin/horaemeta-server ./cmd/horaemeta-server/main.go
Then you can run horaemeta-server like this:
1
bin/horaemeta-server
3.4 - Profiling
CPU profiling
HoraeDB provides cpu profiling http api debug/profile/cpu.
Example:
// 60s cpu sampling data
curl 0:5000/debug/profile/cpu/60
// Output file path.
/tmp/flamegraph_cpu.svg
Heap profiling
HoraeDB provides heap profiling http api debug/profile/heap.
As every Rust programmer knows, the language has many powerful features, and there are often
several patterns which can express the same idea. Also, as every professional programmer comes to
discover, code is almost always read far more than it is written.
Thus, we choose to use a consistent set of idioms throughout our code so that it is easier to read
and understand for both existing and new contributors.
Unsafe and Platform-Dependent conditional compilation
Avoid unsafe Rust
One of the main reasons to use Rust as an implementation language is its strong memory safety
guarantees; Almost all of these guarantees are voided by the use of unsafe. Thus, unless there is
an excellent reason and the use is discussed beforehand, it is unlikely HoraeDB will accept patches
with unsafe code.
We may consider taking unsafe code given:
performance benchmarks showing a very compelling improvement
a compelling explanation of why the same performance can not be achieved using safe code
If your error contains a trait object (e.g. Box<dyn std::error::Error + Send + Sync>), in order
to use context() you need to wrap the error in a Box, we provide a box_err function to help do this conversion:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#[derive(Debug, Snafu)]pubenumError{#[snafu(display("gRPC planner got error listing partition keys: {}", source))]ListingPartitions{source: Box<dynstd::error::Error+Send+Sync>,},}...useusecommon_util::error::BoxError;// Wrap error in a box prior to calling context()
database.partition_keys().await.box_err().context(ListingPartitions)?;
Each error cause in a module should have a distinct Error enum variant
Specific error types are preferred over a generic error with a message or kind field.
Good:
Makes it easier to track down the offending code based on a specific failure
Reduces the size of the error enum (String is 3x 64-bit vs no space)
Makes it easier to remove vestigial errors
Is more concise
1
2
3
4
5
6
7
8
9
10
11
12
13
#[derive(Debug, Snafu)]pubenumError{#[snafu(display("Error writing remaining lines {}", source))]UnableToWriteGoodLines{source: IngestError},#[snafu(display("Error while closing the table writer {}", source))]UnableToCloseTableWriter{source: IngestError},}// ...
write_lines.context(UnableToWriteGoodLines)?;close_writer.context(UnableToCloseTableWriter))?;
Bad:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
pubenumError{#[snafu(display("Error {}: {}", message, source))]WritingError{source: IngestError,message: String,},}write_lines.context(WritingError{message: String::from("Error while writing remaining lines"),})?;close_writer.context(WritingError{message: String::from("Error while closing the table writer"),})?;
Leaf error should contains backtrace
In order to make debugging easier, leaf errors in error chain should contains a backtrace.
// Error in module A
pubenumError{#[snafu(display("This is a leaf error, source:{}.\nBacktrace:\n{}", source, backtrace))]LeafError{source: ErrorFromDependency,backtrace: Backtrace},}// Error in module B
pubenumError{#[snafu(display("Another error, source:{}.\nBacktrace:\n{}", source, backtrace))]AnotherError{/// This error wraps another error that already has a
/// backtrace. Instead of capturing our own, we forward the
/// request for the backtrace to the inner error. This gives a
/// more accurate backtrace.
#[snafu(backtrace)]source: crate::A::Error,},}
Tests
Don’t return Result from test functions
At the time of this writing, if you return Result from test functions to use ? in the test
function body and an Err value is returned, the test failure message is not particularly helpful.
Therefore, prefer not having a return type for test functions and instead using expect or
unwrap in test function bodies.
Release multi-language clients, including Java, Rust and Python.
Static cluster mode with HoraeMeta.
Basic implementation of hybrid storage format.
v0.4.0
Implement more sophisticated cluster solution that enhances reliability and scalability of HoraeDB.
Set up nightly benchmark with TSBS.
v1.0.0-alpha (Released)
Implement Distributed WAL based on Apache Kafka.
Release Golang client.
Improve the query performance for classic time series workloads.
Support dynamic migration of tables in cluster mode.
v1.0.0
Formally release HoraeDB and its SDKs with all breaking changes finished.
Finish the majority of work related to Table Partitioning.
Various efforts to improve query performance, especially for cloud-native cluster mode. These works include:
Multi-tier cache.
Introduce various methods to reduce the data fetched from remote storage (improve the accuracy of SST data filtering).
Increase the parallelism while fetching data from remote object-store.
Improve data ingestion performance by introducing resource control over compaction.
Afterwards
With an in-depth understanding of the time-series database and its various use cases, the majority of our work will focus on performance, reliability, scalability, ease of use, and collaborations with open-source communities.
Add utilities that support PromQL, InfluxQL, OpenTSDB protocol, and so on.
Provide basic utilities for operation and maintenance. Specifically, the following are included:
Deployment tools that fit well for cloud infrastructures like Kubernetes.
Enhance self-observability, especially critical logs and metrics should be supplemented.
Develop various tools that ease the use of HoraeDB. For example, data import and export tools.
Explore new storage formats that will improve performance on hybrid workloads (analytical and time-series workloads).
This chapter discusses compaction offload, which is designed to separate the compaction workload from the local horaedb nodes and delegate it to external compaction nodes.
The diagram above describes the architecture of cluster for compaction offload, where some key concepts need to be explained:
Compaction Node: Takes responsibility to handle offloaded compaction tasks. The compaction node receives the compaction task and performs the actual merging of SSTables, then sends back the task result to HoraeDB.
HoraeMeta Cluster: HoraeMeta acts as a compaction nodes manager in the compaction offload scenario. It monitors the compaction nodes cluster and schedule the compaction nodes.
The procedure of remote compaction based above architecture diagram is:
HoraeDB servers fetch the information of suitable compaction nodes from the HoraeMeta.
HoraeDB submit the compaction task to the remote compaction node, according to the information fetched from HoraeMeta.
Compaction node executes the task and write results to the temporary workspace.
Compaction node sends compaction results back to HoraeDB.
HoraeDB receives the result, installs the data in temporary workspace and purges compaction input files.
The architecture above makes it easy to implement some wonderful features like load balancing and high availability. Let’s dive into the key components in the architecture and talking about how these features are implemented.
Compaction Node
Compaction Node runs the main logic of compaction. It is implemented based on HoraeDB and distinguished by:
NodeType: A config parameter used to distinguished the HoraeDB and CompactionNode. This info would be sent to HoraeMeta through heartbeat.
The compaction service is implemented as grpc service.
HoraeMeta
HoraeMeta manages the compaction nodes cluster with CompactionNodeManager, which takes responsibilities for compaction nodes metadata management and scheduling.
The compaction nodes metadata includes:
Compaction node information, such as node name, node state;
A compaction node name list, used as the key to access compaction node info, for better scheduling with round-robin strategy;
…
As for the compaction nodes scheduling work, it mainly includes:
Receiving the heartbeats from the compaction node and determining the online status of these registered nodes.
Performing load balancing according to the compaction nodes cluster info.
Providing the info of suitable compaction node for HoraeDB when remote compaction execution is needed.
Load Balancing
Load Balancing is critical for compaction nodes cluster to make their overall processing more efficient. The effect of load balancing mainly based on the schedule algorithm for compaction nodes impl in CompactionNodeManager.
(ps: The current implementation of schedule algorithm is round-robin strategy for easiness.)
The main process for the schedule algorithm based on real load is:
HoraeMeta collects the compaction nodes load information through the heartbeats to create a load overview of the compaction nodes cluster.
Pick a compaction node with low load according to the load overview.
High Availability
The fault tolerance of above architecture can be achieved by such a procedure:
When detecting that the heartbeat is broken, HoraeMeta determines that the compaction node is offline.
When HoraeMeta can not provide suitable compaction node for HoraeDB or compaction node doesn’t return the task result successfully, HoraeDB would switches to run compaction task locally.
4.2 - Introduction to Architecture of HoraeDB Cluster
Note: Some of the features mentioned in the article have not yet been implemented.
The diagram above describes the architecture of a HoraeDB cluster, where some key concepts need to be explained:
HoraeMeta Cluster: Takes responsibilities for managing the metadata and resource scheduling of the cluster;
Shard(L)/Shard(F): Leader shard and follower shard consisting of multiple tables;
HoraeDB: One HoraeDB instance consisting of multiple shards;
WAL Service: Write-ahead log service for storing new-written real-time data;
Object Storage: Object storage service for storing SST converted from memtable;
From the architecture diagram above, it can be concluded that the compute and storage are separated in the HoraeDB cluster, which makes it easy to implement useful distributed features, such as elastic autoscaling of compute/storage resources, high availability, load balancing, and so on.
Let’s dive into some of the key components mentioned above before explaining how these features are implemented.
Shard
Shard is the basic scheduling unit in the cluster, which consists of a group of tables. And the tables in a shard share the same region for better storage locality in the WAL Service, and because of this, it is efficient to recover the data of all tables in the shard by scanning the entire WAL region. For most of implementations of WAL Service, without the shard concept, it costs a lot to recover the table data one by one due to massive random IO, and this case will deteriorate sharply when the number of tables grows to a certain level.
A specific role, Leader or Follower, should be assigned to a shard. A pair of leader-follower shards share the same set of tables, and the leader shard can serve the write and query requests from the client while the follower shard can only serve the read-only requests, and must synchronize the newly written data from the WAL service in order to provide the latest snapshot for data retrieval. Actually, the follower is not needed if the high availability is not required, while with at least one follower, it takes only a short time to resume service by simply switching the Follower to Leader when the HoraeDB instance on which the leader shard exists crashes.
The diagram below concludes the relationship between HoraeDB instance, Shard, Table. As shown in the diagram, the leader and follower shards are interleaved on the HoraeDB instance.
Since Shard is the basic scheduling unit, it is natural to introduce some basic shard operations:
Create/Drop table to/from a shard;
Open/Close a shard;
Split one shard into two shards;
Merge two shards into one shard;
Switch the role of a shard;
With these basic shard operations, some complex scheduling logic can be implemented, e.g. perform an expansion by splitting one shard into two shards and migrating one of them to the new HoraeDB instance.
HoraeMeta
HoraeMeta is implemented by embedding an ETCD inside to ensure consistency and takes responsibilities for cluster metadata management and scheduling.
The cluster metadata includes:
Table information, such as table name, table ID, and which cluster the table belongs to;
The mapping between table and shard and between shard and HoraeDB instance;
…
As for the cluster scheduling work, it mainly includes:
Receiving the heartbeats from the HoraeDB instances and determining the online status of these registered instances;
Assigning specific role shards to the registered HoraeDB instances;
Participating in table creation by assigning a unique table ID and the most appropriate shard to the table;
Performing load balancing through shard operations according to the load information sent with the heartbeats;
Performing expansion through shard operations when new instances are registered;
Initiating failover through shard operations when old instances go offline;
Route
In order to avoid the overhead of forwarding requests, the communication between clients and the HoraeDB instances is peer-to-peer, that is to say, the client should retrieve routing information from the server before sending any specific write/query requests.
Actually, the routing information is decided by the HoraeMeta, but clients are only allowed the access to it through the HoraeDB instances rather than HoraeMeta, to avoid potential performance issues on the HoraeMeta.
WAL Service & Object Storage
In the HoraeDB cluster, WAL Service and Object Storage exist as separate distributed systems featured with HA, data replication and scalability. Current distributed implementations for WAL Service includes Kafka and OBKV (access OceanBase by its table api), and the implementations for Object Storage include popular object storage services, such as AWS S3, Azure object storage and Aliyun OSS.
The two components are similar in that they are introduced to serve as the underlying storage layer for separating compute and storage, while the difference between two components is obvious that WAL Service is used to store the newly written data from the real-time write requests whose individual size is small but quantity is large, and Object Storage is used to store the read-friendly data files (SST) organized in the background, whose individual size is large and aggregate size is much larger.
The two components make it much easier to implement the horaedb cluster, which features horizontal scalability, high availability and load balancing.
Scalability
Scalability is an important feature for a distributed system. Let’s take a look at to how the horizontal scalability of the HoraeDB cluster is achieved.
First, the two storage components (WAL Service and Object Storage) should be horizontally scalable when deciding on the actual implementations for them, so the two storage services can be expanded separately if the storage capacity is not sufficient.
It will be a little bit complex when discussing the scalability of the compute service. Basically, these cases will bring the capacity problem:
Massive queries on massive tables;
Massive queries on a single large table;
Massive queries on a normal table;
For the first case, it is easy to achieve horizontal scalability just by assigning shards that are created or split from old shards to expanded HoraeDB instances.
For the second case, the table partitioning is proposed and after partitioning, massive queries are distributed across multiple HoraeDB instances.
And the last case is the most important and the most difficult. Actually, the follower shard can handle part of the queries, but the number of follower shards is limited by the throughput threshold of the synchronization from the WAL regions. As shown in the diagram below, a pure compute shard can be introduced if the followers are not enough to handle the massive queries. Such a shard is not required to synchronize data with the leader shard, and retrieves the newly written data from the leader/follower shard only when the query comes. As for the SSTs required by the query, they will be downloaded from Object Storage and cached afterwards. With the two parts of the data, the compute resources are fully utilized to execute the CPU-intensive query plan. As we can see, such a shard can be added with only a little overhead (retrieving some data from the leader/follower shard when it needs), so to some extent, the horizontal scalability is achieved.
HoraeMeta collects the instance load information contained in the received heartbeats to create a load overview of the whole cluster, according to which the load balancing can be implemented as an automatic mechanism:
Pick a shard on a low-load instance for the newly created table;
Migrate a shard from a high-load instance load to another low-load instance;
Split the large shard on the high-load instance and migrate the split shards to other low-load instances;
4.3 - Introduction to HoraeDB's Architecture
Target
Provide the overview of HoraeDB to the developers who want to know more about HoraeDB but have no idea where to start.
Make a brief introduction to the important modules of HoraeDB and the connections between these modules but details about their implementations are not be involved.
Motivation
HoraeDB is a timeseries database (TSDB). However, HoraeDB’s goal is to handle both timeseries and analytic workloads compared with the classic TSDB, which usually have a poor performance in handling analytic workloads.
In the classic timeseries database, the Tag columns (InfluxDB calls them Tag and Prometheus calls them Label) are normally indexed by generating an inverted index. However, it is found that the cardinality of Tag varies in different scenarios. And in some scenarios the cardinality of Tag is very high (we name this case after analytic workload), and it takes a very high cost to store and retrieve the inverted index. On the other hand, it is observed that scanning+pruning often used by the analytical databases can do a good job to handle such analytic workload.
The basic design idea of HoraeDB is to adopt a hybrid storage format and the corresponding query method for a better performance in processing both timeseries and analytic workloads.
The figure above shows the architecture of HoraeDB stand-alone service and the details of some important modules will be described in the following part.
The current RPC supports multiple protocols including HTTP, gRPC, MySQL.
Basically, HTTP and MySQL are used to debug HoraeDB, query manually and perform DDL operations (such as creating, deleting tables, etc.). And gRPC protocol can be regarded as a customized protocol for high-performance, which is suitable for massive reading and writing operations.
SQL layer takes responsibilities for parsing sql and generating the query plan.
Based on sqlparser a sql dialect, which introduces some key concepts including Tag and Timestamp, is provided for processing timeseries data. And by utilizing DataFusion the planner is able to generate both regular logical plans and tailored ones which is used to implement the special operators defined by timeseries queries, e.g PromQL.
The Interpreter module encapsulates the SQL CRUD operations. In the query procedure, a sql received by HoraeDB is parsed, converted into the query plan and then executed in some specific interpreter, such as SelectInterpreter, InsertInterpreter and etc.
Catalog is actually the module managing metadata and the levels of metadata adopted by HoraeDB is similar to PostgreSQL: Catalog > Schema > Table, but they are only used as namespace.
At present, Catalog and Schema have two different kinds of implementation for standalone and distributed mode because some strategies to generate ids and ways to persist metadata differ in different mode.
Query Engine is responsible for optimizing and executing query plan given a basic SQL plan provided by SQL layer and now such work is mainly delegated to DataFusion.
In addition to the basic functions of SQL, HoraeDB also defines some customized query protocols and optimization rules for some specific query plans by utilizing the extensibility provided by DataFusion. For example, the implementation of PromQL is implemented in this way and read it if you are interested.
Table Engine is actually a storage engine for managing tables in HoraeDB and the pluggability of Table Engine is a core design of HoraeDB which matters in achieving our long-term target, e.g supporting handle log or tracing workload by implementing new storage engines. HoraeDB will have multiple kinds of Table Engine for different workloads and the most appropriate one should be chosen as the storage engine according to the workload pattern.
Now the requirements for a Table Engine are:
Manage all the shared resources under the engine:
Memory
Storage
CPU
Manage metadata of tables such as table schema and table options;
Provide Table instances which provides read and write methods;
Take responsibilities for creating, opening, dropping and closing Table instance;
….
Actually the things that a Table Engine needs to process are a little complicated. And now in HoraeDB only one Table Engine called Analytic is provided and does a good job in processing analytical workload, but it is not ready yet to handle the timeseries workload (we plan to enhance it for a better performance by adding some indexes which help handle timeseries workload).
The following part gives a description about details of Analytic Table Engine.
The model of HoraeDB processing data is WAL + MemTable that the recent written data is written to WAL first and then to MemTable and after a certain amount of data in MemTable is accumulated, the data will be organized in a query-friendly form to persistent devices.
Now three implementations of WAL are provided for standalone and distributed mode:
For standalone mode, WAL is based on RocksDB and data is persisted on the local disk.
For distributed mode, WAL is required as a distributed component and to be responsible for durability of the newly written data, so now we provide an implementation based on OceanBase.
For distributed mode, in addition to OceanBase, we also provide a more lightweight implementation based on Apache Kafka.
For WAL can’t provide efficient data retrieval, the newly written data is also stored in Memtable for efficient data retrieval, after a certain amount of data is reached, HoraeDB organizes the data in MemTable into a query-friendly storage format (SST) and stores it to the persistent device.
The current implementation of MemTable is based on agatedb’s skiplist. It allows concurrent reads and writes and can control memory usage based on Arena.
What Flush does is that when the memory usage of MemTable reaches the threshold, some MemTables are selected for flushing into query-friendly SSTs saved on persistent device.
During the flushing procedure, the data will be divided by a certain time range (which is configured by table option Segment Duration), and any SST is ensured that the timestamps of the data in it are in the same Segment. Actually this is also a common operation in most timeseries databases which organizes data in the time dimension to speed up subsequent time-related operations, such as querying data over a time range and assisting purge data outside the TTL.
The data of MemTable is flushed as SSTs, but the file size of recently flushed SST may be very small. And too small or too many SSTs lead to the poor query performance. Therefore, Compaction is then introduced to rearrange the SSTs so that the multiple smaller SST files can be compacted into a larger SST file.
Manifest takes responsibilities for managing tables’ metadata of Analytic Engine including:
Table schema and table options;
The sequence number where the newest flush finishes;
The information of all the SSTs belonging to the table.
Now the Manifest is based on WAL and Object Storage. The newly written updates on the Manifest are persisted as logs in WAL, and in order to avoid infinite expansion of Manifest (actually every Flush leads to an update), Snapshot is also introduced to clean up the history of metadata updates, and the generated Snapshot will be saved to Object Storage.
The SST generated by Flush needs to be persisted and the abstraction of the persistent storage device is ObjectStore including multiple implementations:
The distributed architecture of HoraeDB separates storage and computing, which requires Object Store needs to be a highly available and reliable service independent of HoraeDB. Therefore, storage systems like Amazon S3, Alibaba Cloud OSS is a good choice and in the future implementations on storage systems of some other cloud service providers is planned to provide.
SST is actually an abstraction that can have multiple specific implementations. The current implementation is based on Parquet, which is a column-oriented data file format designed for efficient data storage and retrieval.
The format of SST is very critical for retrieving data and is also the most important part to perform well in handling both timeseries and analytic workloads. At present, our Parquet-based implementation is good at processing analytic workload but is poor at processing timeseries workload. In our roadmap, we will explore more storage formats in order to achieve a good performance in both workloads.
In Analytic Engine, there is a concept called space and here is an explanation for it to resolve some ambiguities when read source code. Actually Analytic Engine does not have the concept of catalog and schema and only provides two levels of relationship: space and table. And in the implementation, the schema id (which should be unique across all catalogs) on the upper layer is actually mapped to space id.
The space in Analytic Engine serves mainly for isolation of resources for different tenants, such as the usage of memory.
Critical Path
After a brief introduction to some important modules of HoraeDB, we will give a description for some critical paths in code, hoping to provide interested developers with a guide for reading the code.
The received request will be forwarded to handle_sql after various protocol conversions, and since the request may not be processed by this node, it may need to be forwarded to maybe_forward_sql_query to handle the forwarding logic.
After constructing the ForwardRequest in maybe_forward_sql_query, call forward
After constructing the RouteRequest in forward, call route
Use route to get the destination node endpoint and return to forward.
Call forward_with_endpoint to forward the request
return forward
return maybe_forward_sql_query
return handle_sql
If this is a Local request, call fetch_sql_query_output to process it
Call parse_sql to parse sql into Statment
return fetch_sql_query_output
Call statement_to_plan with Statment
Construct Planner with ctx and Statment, and call the statement_to_plan method of Planner
The planner will call the corresponding planner method for the requested category, at this point our sql is a query and will call sql_statement_to_plan
Call sql_statement_to_datafusion_plan , which will generate the datafusion object, and then call SqlToRel::sql_statement_to_plan
The generated logical plan is returned from SqlToRel::sql_statement_to_plan
return
return
return
Call execute_plan_involving_partition_table (in the default configuration) for subsequent optimization and execution of this logical plan
Call build_interpreter to generate Interpreter
return
Call Interpreter'sinterpreter_execute_plan method for logical plan execution.
The corresponding execute function is called, at this time the sql is a query, so the execute of the SelectInterpreter will be called
call execute_logical_plan , which will call build_df_session_ctx to generate the optimizer
build_df_session_ctx will use the config information to generate the corresponding context, first using datafusion and some custom optimization rules (in logical_optimize_rules()) to generate the logical plan optimizer, using apply_adapters_for_physical_optimize_rules to generate the physical plan optimizer
return optimizer
Call optimize_plan, using the optimizer just generated to first optimize the logical plan and then the physical plan
Return to optimized physical plan
execute physical plan
returned after execution
After collecting the results of all slices, return
return
return
return
Return to the upper layer for network protocol conversion and finally return to the request sender
Take INSERT SQL as an example. The figure above shows the query procedure and the numbers in it indicates the order of calling between the modules.
Here are the details:
Server module chooses a proper rpc module (it may be HTTP, gRPC or mysql) to process the requests according the protocol used by the requests;
Parse SQL in the request by the parser;
With the parsed sql and the catalog/schema module, DataFusion can generate the logical plan;
With the logical plan, the corresponding Interpreter is created and logical plan will be executed by it;
For the logical plan of normal INSERT SQL, it will be executed through InsertInterpreter;
In the InsertInterpreter, write method of Table provided Analytic Engine is called:
Write the data into WAL first;
Write the data into MemTable then;
Before writing to MemTable, the memory usage will be checked. If the memory usage is too high, the flush process will be triggered:
Persist some old MemTables as SSTs;
Store updates about the new SSTs and the flushed sequence number of WAL to Manifest;
Delete the corresponding WAL entries;
Server module responds to the client with the execution result.
4.4 - Storage
The storage engine mainly provides the following two functions:
Persistence of data
Under the premise of ensuring the correctness of the data, organize the data in the most reasonable way to meet the query needs of different scenarios.
This document will introduce the internal implementation of the storage engine in HoraeDB. Readers can refer to the content here to explore how to use HoraeDB efficiently.
Overall Structure
HoraeDB is a distributed storage system based on the share-nothing architecture.
Data between different servers is isolated from each other and does not affect each other. The storage engine in each stand-alone machine is a variant of log-structured merge-tree, which is optimized for time-series scenarios. The following figure shows its core components:
Write Ahead Log (WAL)
A write request will be written to
memtable in memory
WAL in durable storage
Since memtable is not persisted to the underlying storage system in real time, so WAL is required to ensure the reliability of the data in memtable.
On the other hand, due to the design of the distributed architecture, WAL itself is required to be highly available. Now there are following implementations in HoraeDB:
Memtable is a memory data structure used to hold recently written table data. Different tables have its corresponding memtable.
Memtable is read-write by default (aka active), and when the write reaches some threshold, it will become read-only and be replaced by a new memtable.
The read-only memtable will be flushed to the underlying storage system in SST format by background thread. After flush is completed, the read-only memtable can be destroyed, and the corresponding data in WAL can also be deleted.
Sorted String Table(SST)
SST is a persistent format for data, which is stored in the order of primary keys of table. Currently, HoraeDB uses parquet format for this.
For HoraeDB, SST has an important option: segment_duration, only SST within the same segment can be merged, which is benefical for time-series data. And it is also convenient to eliminate expired data.
In addition to storing the original data, the statistical information of the data will also be stored in the SST to speed up the query, such as the maximum value, the minimum value, etc.
Compactor
Compactor can merge multiple small SST files into one, which is used to solve the problem of too many small files. In addition, Compactor will also delete expired data and duplicate data during the compaction. In future, compaction maybe add more task, such as downsample.
The current compaction strategy in HoraeDB reference Cassandra:
Manifest records metadata of table, SST file, such as: the minimum and maximum timestamps of the data in an SST.
Due to the design of the distributed architecture, the manifest itself is required to be highly available. Now in HoraeDB, there are mainly the following implementations:
WAL
ObjectStore
ObjectStore
ObjectStore is place where data (i.e. SST) is persisted.
Generally speaking major cloud vendors should provide corresponding services, such as Alibaba Cloud’s OSS and AWS’s S3.
4.5 - Table Partitioning
Note: This feature is still in development, and the API may change in the future.
This chapter discusses PartitionTable.
The partition table syntax used by HoraeDB is similar to that of MySQL.
General partition tables include Range Partitioning, List Partitoning, Hash Partitioning, and Key Partititioning.
HoraeDB currently only supports Key Partitioning.
Architecture
Similar to MySQL, different portions of a partition table are stored as separate tables in different locations.
Currently designed, a partition table can be opened on multiple HoraeDB nodes, supports writing and querying at the same time, and can be expanded horizontally.
As shown in the figure below, PartitionTable is opened on node0 and node1, and the physical subtables where the actual data are stored on node2 and node3.
Since the partition table data is actually stored in different physical tables, it is necessary to calculate the actual requested physical table according to the query request when querying.
The query will calculate the physical table to be queried according to the query parameters, and then remotely request the node where the physical table is located to obtain data through the HoraeDB internal service remote engine (support predicate pushdown).
The write process is similar to the query process.
First, according to the partition rules, the write request is split into different partitioned physical tables, and then sent to different physical nodes through the remote engine for actual data writing.
This section introduces the implementation of a standalone Write-Ahead Log (WAL, hereinafter referred to as “the log”) based on a local disk. In this implementation, the log is managed at the region level.
Each region has its own directory to manage all segments for that region. The directory is named after the region’s ID. Each segment is named using the format seg_<id>, with IDs starting from 0 and incrementing.
Segment Format
Logs for all tables within a region are stored in segments, arranged in ascending order of sequence numbers. The structure of the segment files is as follows:
Segment0 Segment1
┌────────────┐ ┌────────────┐
│ Magic Num │ │ Magic Num │
├────────────┤ ├────────────┤
│ Record │ │ Record │
├────────────┤ ├────────────┤
│ Record │ │ Record │
├────────────┤ ├────────────┤ ....
│ Record │ │ Record │
├────────────┤ ├────────────┤
│ ... │ │ ... │
│ │ │ │
└────────────┘ └────────────┘
seg_0 seg_1
In memory, each segment stores additional information used for read, write, and delete operations:
1
2
3
4
5
6
7
8
9
10
pubstructSegment{/// A hashmap storing both min and max sequence numbers of records within
/// this segment for each `TableId`.
table_ranges: HashMap<TableId,(SequenceNumber,SequenceNumber)>,/// An optional vector of positions within the segment.
record_position: Vec<Position>,...}
Log Format
The log format within a segment is as follows:
+---------+--------+------------+--------------+--------------+-------+
| version | crc | table id | sequence num | value length | value |
| (u8) | (u32) | (u64) | (u64) | (u32) |(bytes)|
+---------+--------+------------+--------------+--------------+-------+
Field Descriptions:
version: Log version number.
crc: Used to ensure data consistency. Computes the CRC checksum from the table id to the end of the record.
table id: The unique identifier of the table.
sequence num: The sequence number of the record.
value length: The byte length of the value.
value: The value in the general log format.
The region ID is not stored in the log because it can be obtained from the file path.
Main Processes
Opening the WAL
Identify all region directories under the WAL directory.
In each region directory, identify all segment files.
Open each segment file, traverse all logs within it, record the start and end offsets of each log, and record the minimum and maximum sequence numbers of each TableId in the segment, then close the file.
If there is no region directory or there are no segment files under the directory, automatically create the corresponding directory and files.
Reading Logs
Based on the metadata of the segments, determine all segments involved in the current read operation.
Open these segments in order of their IDs from smallest to largest, and decode the raw bytes into logs.
Writing Logs
Serialize the logs to be written into byte data and append them to the segment file with the largest ID.
When a segment is created, it pre-allocates a fixed size of 64MB and will not change dynamically. When the pre-allocated space is used up, a new segment is created, and appending continues in the new segment.
After each append, flush is not called immediately; by default, flush is performed every ten writes or when the segment file is closed.
Update the segment’s metadata table_ranges in memory.
Deleting Logs
Suppose logs in the table with ID table_id and sequence numbers less than seq_num need to be marked as deleted:
Update the table_ranges field of the relevant segments in memory, updating the minimum sequence number of the table to seq_num + 1.
If after modification, the minimum sequence number of the table in this segment is greater than the maximum sequence number, remove the table from table_ranges.
If a segment’s table_ranges is empty and it is not the segment with the largest ID, delete the segment file.
4.8 - WAL on Kafka
Architecture
In this section we present a distributed WAL implementation(based on Kafka). Write-ahead logs(hereinafter referred to as logs) of tables are managed here by region, which can be simply understood as a shared log file of multiple tables.
As shown in the following figure, regions are mapped to topics(with only one partition) in Kafka. And usually two topics are needed by a region, one is used for storing logs and the other is used for storing metadata.
The common log format described in WAL on RocksDB is used here.
Metadata
Each region will maintain its metadata both in memory and in Kafka, we call it RegionMeta here. It can be thought of as a map, taking table id as a key and TableMeta as a value.
We briefly introduce the variables in TableMeta here:
next_seq_num, the sequence number allocated to the next log entry.
latest_marked_deleted, the last flushed sequence number, all logs in the table with a lower sequence number than it can be removed.
current_high_watermark, the high watermark in the Kafka partition after the last writing of this table.
seq_offset_mapping, mapping from sequence numbers to offsets will be done on every write and will removed to the updated latest_marked_deleted after flushing.
We focus on the main process in one region, following process will be introduced:
Open or create region.
Write and read logs.
Delete logs.
Open or Create Region
Steps
Search the region in the opened namespace.
If the region found, the most important thing we need to do is to recover its metadata, we will introduce this later.
If the region not found and auto creating is defined, just create the corresponding topic in Kafka.
Add the found or created region to cache, return it afterwards.
Recovery
As mentioned above, the RegionMeta is actually a map of the TableMeta. So here we will focus on recovering a specific TableMeta, and examples will be given to better illustrate this process.
First, recover the RegionMeta from snapshot. We will take a snapshot of the RegionMeta in some scenarios (e.g. mark logs deleted, clean logs) and put it to the meta topic. The snapshot is actually the RegionMeta at a particular point in time. When recovering a region, we can use it to avoid scanning all logs in the data topic. The following is the example, we recover from the snapshot taken at the time when Kafka high watermark is 64:
Recovering from logs. After recovering from snapshot, we can continue to recover by scanning logs in data topic from the Kafka high watermark when snapshot is taken, and obviously that avoid scanning the whole data topic. Let’s see the example:
The writing and reading process in a region is simple.
For writing:
Open the specified region (auto create it if necessary).
Put the logs to specified Kafka partition by client.
Update next_seq_num, current_high_watermark and seq_offset_mapping in corresponding TableMeta.
For reading:
Open the specified region.
Just read all the logs of the region, and the split and replay work will be done by the caller.
Delete Logs
Log deletion can be divided into two steps:
Mark the logs deleted.
Do delayed cleaning work periodically in a background thread.
Mark
Update latest_mark_deleted and seq_offset_mapping(just retain the entries whose’s sequence >= updated latest_mark_deleted) in TableMeta.
Maybe we need to make and sync the RegionMeta snapshot to Kafka while dropping table.
Clean
The cleaning logic done in a background thread called cleaner:
Make RegionMeta snapshot.
Decide whether to clean the logs based on the snapshot.
If so, sync the snapshot to Kafka first, then clean the logs.
4.9 - WAL on RocksDB
Architecture
In this section we present a standalone WAL implementation (based on RocksDB). Write-ahead logs(hereinafter referred to as logs) of tables are managed here by table, and we call the corresponding storage data structure TableUnit. All related data (logs or some metadata) is stored in a single column family for simplicity.
We use the common key and value format here.
Here is the defined key format, and the following is introduction for fields in it:
namespace: multiple instances of WAL can exist for different purposes (e.g. manifest also needs wal). The namespace is used to distinguish them.
region_id: in some WAL implementations we may need to manage logs from multiple tables, region is the concept to describe such a set of table logs. Obviously the region id is the identification of the region.
table_id: identification of the table logs to which they belong.
sequence_num: each login table can be assigned an identifier, called a sequence number here.
version: for compatibility with old and new formats.
The metadata here is stored in the same key-value format as the log. Actually only the last flushed sequence is stored in this implementation. Here is the defined metadata key format and field instructions:
namespace, table_id, version are the same as the log format.
key_type, used to define the type of metadata. MaxSeq now defines that metadata of this type will only record the most recently flushed sequence in the table.
Because it is only used in wal on RocksDB, which manages the logs at table level, so there is no region id in this key.
Read the latest log entry of all tables to recover the next sequence numbers of tables mainly.
Scan the metadata to recover next sequence num as a supplement (because some table has just triggered flush and no new written logs after this, so no logs exists now).
Write and read logs. Just write and read key-value from RocksDB.
Delete logs. For simplicity It will remove corresponding logs synchronously.