1 - SQL Syntax

1.1 - Data Model

This chapter introduces the data model of HoraeDB.

1.1.1 - Data Types

HoraeDB implements table model, and the supported data types are similar to MySQL. The following table lists the mapping relationship between MySQL and HoraeDB.

Support Data Type(case-insensitive)

SQLHoraeDB
nullNull
timestampTimestamp
doubleDouble
floatFloat
stringString
VarbinaryVarbinary
uint64UInt64
uint32UInt32
uint16UInt16
uint8UInt8
int64/bigintInt64
int32/intInt32
int16/smallintInt16
int8/tinyintInt8
booleanBoolean
dateDate
timeTime

1.1.2 - Special Columns

Tables in HoraeDB have the following constraints:

  • Primary key is required
  • The primary key must contain a time column, and can only contain one time column
  • The primary key must be non-null, so all columns in primary key must be non-null.

Timestamp Column

Tables in HoraeDB must have one timestamp column maps to timestamp in timeseries data, such as timestamp in OpenTSDB/Prometheus. The timestamp column can be set with timestamp key keyword, like TIMESTAMP KEY(ts).

Tag Column

Tag is use to defined column as tag column, similar to tag in timeseries data, such as tag in OpenTSDB and label in Prometheus.

Primary Key

The primary key is used for data deduplication and sorting. The primary key is composed of some columns and one time column. The primary key can be set in the following some ways:

  • use primary key keyword
  • use tag to auto generate TSID, HoraeDB will use (TSID,timestamp) as primary key
  • only set Timestamp column, HoraeDB will use (timestamp) as primary key

Notice: If the primary key and tag are specified at the same time, then the tag column is just an additional information identification and will not affect the logic.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
CREATE TABLE with_primary_key(
  ts TIMESTAMP NOT NULL,
  c1 STRING NOT NULL,
  c2 STRING NULL,
  c4 STRING NULL,
  c5 STRING NULL,
  TIMESTAMP KEY(ts),
  PRIMARY KEY(c1, ts)
) ENGINE=Analytic WITH (ttl='7d');

CREATE TABLE with_tag(
    ts TIMESTAMP NOT NULL,
    c1 STRING TAG NOT NULL,
    c2 STRING TAG NULL,
    c3 STRING TAG NULL,
    c4 DOUBLE NULL,
    c5 STRING NULL,
    c6 STRING NULL,
    TIMESTAMP KEY(ts)
) ENGINE=Analytic WITH (ttl='7d');

CREATE TABLE with_timestamp(
    ts TIMESTAMP NOT NULL,
    c1 STRING NOT NULL,
    c2 STRING NULL,
    c3 STRING NULL,
    c4 DOUBLE NULL,
    c5 STRING NULL,
    c6 STRING NULL,
    TIMESTAMP KEY(ts)
) ENGINE=Analytic WITH (ttl='7d');

TSID

If primary keyis not set, and tag columns is provided, TSID will auto generated from hash of tag columns. In essence, this is also a mechanism for automatically generating id.

1.2 - Identifier

Identifier in HoraeDB can be used as table name, column name etc. It cannot be preserved keywords or start with number and punctuation symbols. HoraeDB allows to quote identifiers with back quotes (`). In this case it can be any string like 00_table or select.

1.3 - Data Definition Statements

This chapter introduces the data definition statements.

1.3.1 - ALTER TABLE

ALTER TABLE can change the schema or options of a table.

ALTER TABLE SCHEMA

HoraeDB current supports ADD COLUMN to alter table schema.

1
2
3
-- create a table and add a column to it
CREATE TABLE `t`(a int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;
ALTER TABLE `t` ADD COLUMN (b string);

It now becomes:

-- DESCRIBE TABLE `t`;

name    type        is_primary  is_nullable is_tag

t       timestamp   true        false       false
tsid    uint64      true        false       false
a       int         false       true        false
b       string      false       true        false

ALTER TABLE OPTIONS

HoraeDB current supports MODIFY SETTING to alter table schema.

1
2
3
-- create a table and add a column to it
CREATE TABLE `t`(a int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;
ALTER TABLE `t` MODIFY SETTING write_buffer_size='300M';

The SQL above tries to modify the write_buffer_size of the table, and the table’s option becomes:

1
CREATE TABLE `t` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `a` int, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='AUTO', ttl='7d', update_mode='OVERWRITE', write_buffer_size='314572800')

Besides, the ttl can be altered from 7 days to 10 days by such SQL:

1
ALTER TABLE `t` MODIFY SETTING ttl='10d';

1.3.2 - CREATE TABLE

Basic syntax

Basic syntax:

1
2
3
4
5
CREATE TABLE [IF NOT EXISTS]
    table_name ( column_definitions )
    [partition_options]
    ENGINE = engine_type
    [WITH ( table_options )];

Column definition syntax:

1
column_name column_type [[NOT] NULL] [TAG | TIMESTAMP KEY | PRIMARY KEY] [DICTIONARY] [COMMENT '']

Partition options syntax:

1
PARTITION BY KEY (column_list) [PARTITIONS num]

Table options syntax are key-value pairs. Value should be quoted with quotation marks ('). E.g.:

1
... WITH ( enable_ttl='false' )

IF NOT EXISTS

Add IF NOT EXISTS to tell HoraeDB to ignore errors if the table name already exists.

Define Column

A column’s definition should at least contains the name and type parts. All supported types are listed here.

Column is default be nullable. i.e. NULL keyword is implied. Adding NOT NULL constrains to make it required.

1
2
3
4
5
6
7
-- this definition
a_nullable int
-- equals to
a_nullable int NULL

-- add NOT NULL to make it required
b_not_null NOT NULL

A column can be marked as special column with related keyword.

For string tag column, we recommend to define it as dictionary to reduce memory consumption:

1
`tag1` string TAG DICTIONARY

Engine

Specifies which engine this table belongs to. HoraeDB current support Analytic engine type. This attribute is immutable.

Partition Options

Note: This feature is only supported in distributed version.

1
CREATE TABLE ... PARTITION BY KEY

Example below creates a table with 8 partitions, and partitioned by name:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE TABLE `demo` (
    `name` string TAG COMMENT 'client username',
    `value` double NOT NULL,
    `t` timestamp NOT NULL,
    timestamp KEY (t)
)
    PARTITION BY KEY(name) PARTITIONS 8
    ENGINE=Analytic
    with (
    enable_ttl='false'
)

1.3.3 - DROP TABLE

Basic syntax

Basic syntax:

1
DROP TABLE [IF EXISTS] table_name

Drop Table removes a specific table. This statement should be used with caution, because it removes both the table definition and table data, and this removal is not recoverable.

1.4 - Data Manipulation Statements

This chapter introduces the data manipulation statements.

1.4.1 - INSERT

Basic syntax

Basic syntax:

1
2
3
INSERT [INTO] tbl_name
    [(col_name [, col_name] ...)]
    { {VALUES | VALUE} (value_list) [, (value_list)] ... }

INSERT inserts new rows into a HoraeDB table. Here is an example:

1
INSERT INTO demo(`time_stammp`, tag1) VALUES(1667374200022, 'horaedb')

1.4.2 - SELECT

Basic syntax

Basic syntax (parts between [] are optional):

1
2
3
4
5
6
7
SELECT select_expr [, select_expr] ...
    FROM table_name
    [WHERE where_condition]
    [GROUP BY {col_name | expr} ... ]
    [ORDER BY {col_name | expr}
    [ASC | DESC]
    [LIMIT [offset,] row_count ]

Select syntax in HoraeDB is similar to mysql, here is an example:

1
SELECT * FROM `demo` WHERE time_stamp > '2022-10-11 00:00:00' AND time_stamp < '2022-10-12 00:00:00' LIMIT 10

1.5 - Utility Statements

There are serval utilities SQL in HoraeDB that can help in table manipulation or query inspection.

SHOW CREATE TABLE

1
SHOW CREATE TABLE table_name;

SHOW CREATE TABLE returns a CREATE TABLE DDL that will create a same table with the given one. Including columns, table engine and options. The schema and options shows in CREATE TABLE will based on the current version of the table. An example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- create one table
CREATE TABLE `t` (a bigint, b int default 3, c string default 'x', d smallint null, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;
-- Result: affected_rows: 0

-- show how one table should be created.
SHOW CREATE TABLE `t`;

-- Result DDL:
CREATE TABLE `t` (
    `t` timestamp NOT NULL,
    `tsid` uint64 NOT NULL,
    `a` bigint,
    `b` int,
    `c` string,
    `d` smallint,
    PRIMARY KEY(t,tsid),
    TIMESTAMP KEY(t)
) ENGINE=Analytic WITH (
    arena_block_size='2097152',
    compaction_strategy='default',
    compression='ZSTD',
    enable_ttl='true',
    num_rows_per_row_group='8192',
    segment_duration='',
    ttl='7d',
    update_mode='OVERWRITE',
    write_buffer_size='33554432'
)

DESCRIBE

1
DESCRIBE table_name;

DESCRIBE will show a detailed schema of one table. The attributes include column name and type, whether it is tag and primary key (todo: ref) and whether it’s nullable. The auto created column tsid will also be included (todo: ref).

Example:

1
2
3
CREATE TABLE `t`(a int, b string, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;

DESCRIBE TABLE `t`;

The result is:

name    type        is_primary  is_nullable is_tag

t       timestamp   true        false       false
tsid    uint64      true        false       false
a       int         false       true        false
b       string      false       true        false

EXPLAIN

1
EXPLAIN query;

EXPLAIN shows how a query will be executed. Add it to the beginning of a query like

1
EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM `t` GROUP BY name;

will give

logical_plan
Projection: #MAX(07_optimizer_t.value) AS c1, #AVG(07_optimizer_t.value) AS c2
  Aggregate: groupBy=[[#07_optimizer_t.name]], aggr=[[MAX(#07_optimizer_t.value), AVG(#07_optimizer_t.value)]]
    TableScan: 07_optimizer_t projection=Some([name, value])

physical_plan
ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]
  AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]
    CoalesceBatchesExec: target_batch_size=4096
      RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 6)
        AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]
          ScanTable: table=07_optimizer_t, parallelism=8, order=None

1.6 - Options

Options below can be used when create table for analytic engine

  • enable_ttl, bool. When enable TTL on a table, rows older than ttl will be deleted and can’t be querid, default true

  • ttl, duration, lifetime of a row, only used when enable_ttl is true. default 7d.

  • storage_format, string. The underlying column’s format. Availiable values:

    • columnar, default
    • hybrid, Note: This feature is still in development, and it may change in the future.

    The meaning of those two values are in Storage format section.

Storage Format

There are mainly two formats supported in analytic engine. One is columnar, which is the traditional columnar format, with one table column in one physical column:

1
2
3
4
5
6
7
8
9
| Timestamp | Device ID | Status Code | Tag 1 | Tag 2 |
| --------- |---------- | ----------- | ----- | ----- |
| 12:01     | A         | 0           | v1    | v1    |
| 12:01     | B         | 0           | v2    | v2    |
| 12:02     | A         | 0           | v1    | v1    |
| 12:02     | B         | 1           | v2    | v2    |
| 12:03     | A         | 0           | v1    | v1    |
| 12:03     | B         | 0           | v2    | v2    |
| .....     |           |             |       |       |

The other one is hybrid, an experimental format used to simulate row-oriented storage in columnar storage to accelerate classic time-series query.

In classic time-series user cases like IoT or DevOps, queries will typically first group their result by series id(or device id), then by timestamp. In order to achieve good performance in those scenarios, the data physical layout should match this style, so the hybrid format is proposed like this:

1
2
3
4
5
 | Device ID | Timestamp           | Status Code | Tag 1 | Tag 2 | minTime | maxTime |
 |-----------|---------------------|-------------|-------|-------|---------|---------|
 | A         | [12:01,12:02,12:03] | [0,0,0]     | v1    | v1    | 12:01   | 12:03   |
 | B         | [12:01,12:02,12:03] | [0,1,0]     | v2    | v2    | 12:01   | 12:03   |
 | ...       |                     |             |       |       |         |         |
  • Within one file, rows belonging to the same primary key(eg: series/device id) are collapsed into one row
  • The columns besides primary key are divided into two categories:
    • collapsible, those columns will be collapsed into a list. Used to encode fields in time-series table
      • Note: only fixed-length type is supported now
    • non-collapsible, those columns should only contain one distinct value. Used to encode tags in time-series table
      • Note: only string type is supported now
  • Two more columns are added, minTime and maxTime. Those are used to cut unnecessary rows out in query.
    • Note: Not implemented yet.

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE TABLE `device` (
    `ts` timestamp NOT NULL,
    `tag1` string tag,
    `tag2` string tag,
    `value1` double,
    `value2` int,
    timestamp KEY (ts)) ENGINE=Analytic
  with (
    enable_ttl = 'false',
    storage_format = 'hybrid'
);

This will create a table with hybrid format, users can inspect data format with parquet-tools. The table above should have following parquet schema:

message arrow_schema {
  optional group ts (LIST) {
    repeated group list {
      optional int64 item (TIMESTAMP(MILLIS,false));
    }
  }
  required int64 tsid (INTEGER(64,false));
  optional binary tag1 (STRING);
  optional binary tag2 (STRING);
  optional group value1 (LIST) {
    repeated group list {
      optional double item;
    }
  }
  optional group value2 (LIST) {
    repeated group list {
      optional int32 item;
    }
  }
}

1.7 - Scalar Functions

HoraeDB SQL is implemented with DataFusion, Here is the list of scalar functions. See more detail, Refer to Datafusion

Math Functions

FunctionDescription
abs(x)absolute value
acos(x)inverse cosine
asin(x)inverse sine
atan(x)inverse tangent
atan2(y, x)inverse tangent of y / x
ceil(x)nearest integer greater than or equal to argument
cos(x)cosine
exp(x)exponential
floor(x)nearest integer less than or equal to argument
ln(x)natural logarithm
log10(x)base 10 logarithm
log2(x)base 2 logarithm
power(base, exponent)base raised to the power of exponent
round(x)round to nearest integer
signum(x)sign of the argument (-1, 0, +1)
sin(x)sine
sqrt(x)square root
tan(x)tangent
trunc(x)truncate toward zero

Conditional Functions

FunctionDescription
coalesceReturns the first of its arguments that is not null. Null is returned only if all arguments are null. It is often used to substitute a default value for null values when data is retrieved for display.
nullifReturns a null value if value1 equals value2; otherwise it returns value1. This can be used to perform the inverse operation of the coalesce expression.

String Functions

FunctionDescription
asciiReturns the numeric code of the first character of the argument. In UTF8 encoding, returns the Unicode code point of the character. In other multibyte encodings, the argument must be an ASCII character.
bit_lengthReturns the number of bits in a character string expression.
btrimRemoves the longest string containing any of the specified characters from the start and end of string.
char_lengthEquivalent to length.
character_lengthEquivalent to length.
concatConcatenates two or more strings into one string.
concat_wsCombines two values with a given separator.
chrReturns the character based on the number code.
initcapCapitalizes the first letter of each word in a string.
leftReturns the specified leftmost characters of a string.
lengthReturns the number of characters in a string.
lowerConverts all characters in a string to their lower case equivalent.
lpadLeft-pads a string to a given length with a specific set of characters.
ltrimRemoves the longest string containing any of the characters in characters from the start of string.
md5Calculates the MD5 hash of a given string.
octet_lengthEquivalent to length.
repeatReturns a string consisting of the input string repeated a specified number of times.
replaceReplaces all occurrences in a string of a substring with a new substring.
reverseReverses a string.
rightReturns the specified rightmost characters of a string.
rpadRight-pads a string to a given length with a specific set of characters.
rtrimRemoves the longest string containing any of the characters in characters from the end of string.
digestCalculates the hash of a given string.
split_partSplits a string on a specified delimiter and returns the specified field from the resulting array.
starts_withChecks whether a string starts with a particular substring.
strposSearches a string for a specific substring and returns its position.
substrExtracts a substring of a string.
translateTranslates one set of characters into another.
trimRemoves the longest string containing any of the characters in characters from either the start or end of string.
upperConverts all characters in a string to their upper case equivalent.

Regular Expression Functions

FunctionDescription
regexp_matchDetermines whether a string matches a regular expression pattern.
regexp_replaceReplaces all occurrences in a string of a substring that matches a regular expression pattern with a new substring.

Temporal Functions

FunctionDescription
to_timestampConverts a string to type Timestamp(Nanoseconds, None).
to_timestamp_millisConverts a string to type Timestamp(Milliseconds, None).
to_timestamp_microsConverts a string to type Timestamp(Microseconds, None).
to_timestamp_secondsConverts a string to type Timestamp(Seconds, None).
extractRetrieves subfields such as year or hour from date/time values.
date_partRetrieves subfield from date/time values.
date_truncTruncates date/time values to specified precision.
date_binBin date/time values to specified precision.
from_unixtimeConverts Unix epoch to type Timestamp(Nanoseconds, None).
nowReturns current time as Timestamp(Nanoseconds, UTC).

Other Functions

FunctionDescription
arrayCreate an array.
arrow_typeofReturns underlying type.
in_listCheck if value in list.
randomGenerate random value.
sha224sha224
sha256sha256
sha384sha384
sha512sha512
to_hexConvert to hex.

1.8 - Aggregate Functions

HoraeDB SQL is implemented with DataFusion, Here is the list of aggregate functions. See more detail, Refer to Datafusion

General

FunctionDescription
minReturns the minimum value in a numerical column
maxReturns the maximum value in a numerical column
countReturns the number of rows
avgReturns the average of a numerical column
sumSums a numerical column
array_aggPuts values into an array

Statistical

FunctionDescription
var / var_sampReturns the variance of a given column
var_popReturns the population variance of a given column
stddev / stddev_sampReturns the standard deviation of a given column
stddev_popReturns the population standard deviation of a given column
covar / covar_sampReturns the covariance of a given column
covar_popReturns the population covariance of a given column
corrReturns the correlation coefficient of a given column

Approximate

FunctionDescription
approx_distinctReturns the approximate number (HyperLogLog) of distinct input values
approx_medianReturns the approximate median of input values. It is an alias of approx_percentile_cont(x, 0.5).
approx_percentile_contReturns the approximate percentile (TDigest) of input values, where p is a float64 between 0 and 1 (inclusive). It supports raw data as input and build Tdigest sketches during query time, and is approximately equal to approx_percentile_cont_with_weight(x, 1, p).
approx_percentile_cont_with_weightReturns the approximate percentile (TDigest) of input values with weight, where w is weight column expression and p is a float64 between 0 and 1 (inclusive). It supports raw data as input or pre-aggregated TDigest sketches, then builds or merges Tdigest sketches during query time. TDigest sketches are a list of centroid (x, w), where x stands for mean and w stands for weight.

2 - Cluster Deployment

In the Getting Started section, we have introduced the deployment of single HoraeDB instance.

Besides, as a distributed timeseries database, multiple HoraeDB instances can be deployed as a cluster to serve with high availability and scalability.

Currently, work about the integration with kubernetes is still in process, so HoraeDB cluster can only be deployed manually. And there are two modes of cluster deployment:

2.1 - NoMeta

Note: This feature is for testing use only, not recommended for production use, related features may change in the future.

This guide shows how to deploy a HoraeDB cluster without HoraeMeta, but with static, rule-based routing.

The crucial point here is that HoraeDB server provides configurable routing function on table name so what we need is just a valid config containing routing rules which will be shipped to every HoraeDB instance in the cluster.

Target

First, let’s assume that our target is to deploy a cluster consisting of two HoraeDB instances on the same machine. And a large cluster of more HoraeDB instances can be deployed according to the two-instance example.

Prepare Config

Basic

Suppose the basic config of HoraeDB is:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
[server]
bind_addr = "0.0.0.0"
http_port = 5440
grpc_port = 8831

[logger]
level = "info"

[tracing]
dir = "/tmp/horaedb"

[analytic.storage.object_store]
type = "Local"
data_dir = "/tmp/horaedb"

[analytic.wal]
type = "RocksDB"
data_dir = "/tmp/horaedb"

In order to deploy two HoraeDB instances on the same machine, the config should choose different ports to serve and data directories to store data.

Say the HoraeDB_0’s config is:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
[server]
bind_addr = "0.0.0.0"
http_port = 5440
grpc_port = 8831

[logger]
level = "info"

[tracing]
dir = "/tmp/horaedb_0"

[analytic.storage.object_store]
type = "Local"
data_dir = "/tmp/horaedb_0"

[analytic.wal]
type = "RocksDB"
data_dir = "/tmp/horaedb_0"

Then the HoraeDB_1’s config is:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
[server]
bind_addr = "0.0.0.0"
http_port = 15440
grpc_port = 18831

[logger]
level = "info"

[tracing]
dir = "/tmp/horaedb_1"

[analytic.storage.object_store]
type = "Local"
data_dir = "/tmp/horaedb_1"

[analytic.wal]
type = "RocksDB"
data_dir = "/tmp/horaedb_1"

Schema&Shard Declaration

Then we should define the common part – schema&shard declaration and routing rules.

Here is the config for schema&shard declaration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[cluster_deployment]
mode = "NoMeta"

[[cluster_deployment.topology.schema_shards]]
schema = 'public_0'
[[cluster_deployment.topology.schema_shards.shard_views]]
shard_id = 0
[cluster_deployment.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831
[[cluster_deployment.topology.schema_shards.shard_views]]
shard_id = 1
[cluster_deployment.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831

[[cluster_deployment.topology.schema_shards]]
schema = 'public_1'
[[cluster_deployment.topology.schema_shards.shard_views]]
shard_id = 0
[cluster_deployment.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831
[[cluster_deployment.topology.schema_shards.shard_views]]
shard_id = 1
[cluster_deployment.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 18831

In the config above, two schemas are declared:

  • public_0 has two shards served by HoraeDB_0.
  • public_1 has two shards served by both HoraeDB_0 and HoraeDB_1.

Routing Rules

Provided with schema&shard declaration, routing rules can be defined and here is an example of prefix rule:

1
2
3
4
[[cluster_deployment.route_rules.prefix_rules]]
schema = 'public_0'
prefix = 'prod_'
shard = 0

This rule means that all the table with prod_ prefix belonging to public_0 should be routed to shard_0 of public_0, that is to say, HoraeDB_0. As for the other tables whose names are not prefixed by prod_ will be routed by hash to both shard_0 and shard_1 of public_0.

Besides prefix rule, we can also define a hash rule:

1
2
3
[[cluster_deployment.route_rules.hash_rules]]
schema = 'public_1'
shards = [0, 1]

This rule tells HoraeDB to route public_1’s tables to both shard_0 and shard_1 of public_1, that is to say, HoraeDB0 and HoraeDB_1. And actually this is default routing behavior if no such rule provided for schema public_1.

For now, we can provide the full example config for HoraeDB_0 and HoraeDB_1:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
[server]
bind_addr = "0.0.0.0"
http_port = 5440
grpc_port = 8831
mysql_port = 3307

[logger]
level = "info"

[tracing]
dir = "/tmp/horaedb_0"

[analytic.storage.object_store]
type = "Local"
data_dir = "/tmp/horaedb_0"

[analytic.wal]
type = "RocksDB"
data_dir = "/tmp/horaedb_0"

[cluster_deployment]
mode = "NoMeta"

[[cluster_deployment.topology.schema_shards]]
schema = 'public_0'
[[cluster_deployment.topology.schema_shards.shard_views]]
shard_id = 0
[cluster_deployment.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831
[[cluster_deployment.topology.schema_shards.shard_views]]
shard_id = 1
[cluster_deployment.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831

[[cluster_deployment.topology.schema_shards]]
schema = 'public_1'
[[cluster_deployment.topology.schema_shards.shard_views]]
shard_id = 0
[cluster_deployment.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831
[[cluster_deployment.topology.schema_shards.shard_views]]
shard_id = 1
[cluster_deployment.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 18831
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
[server]
bind_addr = "0.0.0.0"
http_port = 15440
grpc_port = 18831
mysql_port = 13307

[logger]
level = "info"

[tracing]
dir = "/tmp/horaedb_1"

[analytic.storage.object_store]
type = "Local"
data_dir = "/tmp/horaedb_1"

[analytic.wal]
type = "RocksDB"
data_dir = "/tmp/horaedb_1"

[cluster_deployment]
mode = "NoMeta"

[[cluster_deployment.topology.schema_shards]]
schema = 'public_0'
[[cluster_deployment.topology.schema_shards.shard_views]]
shard_id = 0
[cluster_deployment.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831
[[cluster_deployment.topology.schema_shards.shard_views]]
shard_id = 1
[cluster_deployment.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831

[[cluster_deployment.topology.schema_shards]]
schema = 'public_1'
[[cluster_deployment.topology.schema_shards.shard_views]]
shard_id = 0
[cluster_deployment.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 8831
[[cluster_deployment.topology.schema_shards.shard_views]]
shard_id = 1
[cluster_deployment.topology.schema_shards.shard_views.endpoint]
addr = '127.0.0.1'
port = 18831

Let’s name the two different config files as config_0.toml and config_1.toml but you should know in the real environment the different HoraeDB instances can be deployed across different machines, that is to say, there is no need to choose different ports and data directories for different HoraeDB instances so that all the HoraeDB instances can share one exactly same config file.

Start HoraeDBs

After the configs are prepared, what we should to do is to start HoraeDB container with the specific config.

Just run the commands below:

1
2
sudo docker run -d -t --name horaedb_0 -p 5440:5440 -p 8831:8831 -v $(pwd)/config_0.toml:/etc/horaedb/horaedb.toml horaedb/horaedb-server
sudo docker run -d -t --name horaedb_1 -p 15440:15440 -p 18831:18831 -v $(pwd)/config_1.toml:/etc/horaedb/horaedb.toml horaedb/horaedb-server

After the two containers are created and starting running, read and write requests can be served by the two-instances HoraeDB cluster.

2.2 - WithMeta

This guide shows how to deploy a HoraeDB cluster with HoraeMeta. And with the HoraeMeta, the whole HoraeDB cluster will feature: high availability, load balancing and horizontal scalability if the underlying storage used by HoraeDB is separated service.

Deploy HoraeMeta

Introduce

HoraeMeta is one of the core services of HoraeDB distributed mode, it is used to manage and schedule the HoraeDB cluster. By the way, the high availability of HoraeMeta is ensured by embedding ETCD. Also, the ETCD service is provided for HoraeDB servers to manage the distributed shard locks.

Build

  • Golang version >= 1.19.
  • run make build in root path of HoraeMeta.

Deploy

Config

At present, HoraeMeta supports specifying service startup configuration in two ways: configuration file and environment variable. We provide an example of configuration file startup. For details, please refer to config. The configuration priority of environment variables is higher than that of configuration files. When they exist at the same time, the environment variables shall prevail.

Dynamic or Static

Even with the HoraeMeta, the HoraeDB cluster can be deployed with a static or a dynamic topology. With a static topology, the table distribution is static after the cluster is initialized while with the dynamic topology, the tables can migrate between different HoraeDB nodes to achieve load balance or failover. However, the dynamic topology can be enabled only if the storage used by the HoraeDB node is remote, otherwise the data may be corrupted when tables are transferred to a different HoraeDB node when the data of HoraeDB is persisted locally.

Currently, the dynamic scheduling over the cluster topology is disabled by default in HoraeMeta, and in this guide, we won’t enable it because local storage is adopted here. If you want to enable the dynamic scheduling, the TOPOLOGY_TYPE can be set as dynamic (static by default), and after that, load balancing and failover will work. However, don’t enable it if what the underlying storage is local disk.

With the static topology, the params DEFAULT_CLUSTER_NODE_COUNT, which denotes the number of the HoraeDB nodes in the deployed cluster and should be set to the real number of machines for HoraeDB server, matters a lot because after cluster initialization the HoraeDB nodes can’t be changed any more.

Start HoraeMeta Instances

HoraeMeta is based on etcd to achieve high availability. In product environment, we usually deploy multiple nodes, but in local environment and testing, we can directly deploy a single node to simplify the entire deployment process.

  • Standalone
1
2
3
docker run -d --name horaemeta-server \
  -p 2379:2379 \
  ghcr.io/apache/horaemeta-server:nightly-20231225-ab067bf0
  • Cluster
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
wget https://horaedb.apache.org/config-horaemeta-cluster0.toml

docker run -d --network=host --name horaemeta-server0 \
  -v $(pwd)/config-horaemeta-cluster0.toml:/etc/horaemeta/horaemeta.toml \
  ghcr.io/apache/horaemeta-server:nightly-20231225-ab067bf0

wget https://horaedb.apache.org/config-horaemeta-cluster1.toml

docker run -d --network=host --name horaemeta-server1 \
  -v $(pwd)/config-horaemeta-cluster1.toml:/etc/horaemeta/horaemeta.toml \
  ghcr.io/apache/horaemeta-server:nightly-20231225-ab067bf0

wget https://horaedb.apache.org/config-horaemeta-cluster2.toml

docker run -d --network=host --name horaemeta-server2 \
  -v $(pwd)/config-horaemeta-cluster2.toml:/etc/horaemeta/horaemeta.toml \
  ghcr.io/apache/horaemeta-server:nightly-20231225-ab067bf0

And if the storage used by the HoraeDB is remote and you want to enable the dynamic schedule features of the HoraeDB cluster, the -e TOPOLOGY_TYPE=dynamic can be added to the docker run command.

Deploy HoraeDB

In the NoMeta mode, HoraeDB only requires the local disk as the underlying storage because the topology of the HoraeDB cluster is static. However, with HoraeMeta, the cluster topology can be dynamic, that is to say, HoraeDB can be configured to use a non-local storage service for the features of a distributed system: HA, load balancing, scalability and so on. And HoraeDB can be still configured to use a local storage with HoraeMeta, which certainly leads to a static cluster topology.

The relevant storage configurations include two parts:

  • Object Storage
  • WAL Storage

Note: If you are deploying HoraeDB over multiple nodes in a production environment, please set the environment variable for the server address as follows:

1
export HORAEDB_SERVER_ADDR="{server_address}:8831"

This address is used for communication between HoraeMeta and HoraeDB, please ensure it is valid.

Object Storage

Local Storage

Similarly, we can configure HoraeDB to use a local disk as the underlying storage:

1
2
3
[analytic.storage.object_store]
type = "Local"
data_dir = "/home/admin/data/horaedb"

OSS

Aliyun OSS can be also used as the underlying storage for HoraeDB, with which the data is replicated for disaster recovery. Here is a example config, and you have to replace the templates with the real OSS parameters:

1
2
3
4
5
6
7
[analytic.storage.object_store]
type = "Aliyun"
key_id = "{key_id}"
key_secret = "{key_secret}"
endpoint = "{endpoint}"
bucket = "{bucket}"
prefix = "{data_dir}"

S3

Amazon S3 can be also used as the underlying storage for HoraeDB. Here is a example config, and you have to replace the templates with the real S3 parameters:

1
2
3
4
5
6
7
8
[analytic.storage.object_store]
type = "S3"
region = "{region}"
key_id = "{key_id}"
key_secret = "{key_secret}"
endpoint = "{endpoint}"
bucket = "{bucket}"
prefix = "{prefix}"

WAL Storage

RocksDB

The WAL based on RocksDB is also a kind of local storage for HoraeDB, which is easy for a quick start:

1
2
3
[analytic.wal]
type = "RocksDB"
data_dir = "/home/admin/data/horaedb"

OceanBase

If you have deployed a OceanBase cluster, HoraeDB can use it as the WAL storage for data disaster recovery. Here is a example config for such WAL, and you have to replace the templates with real OceanBase parameters:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
[analytic.wal]
type = "Obkv"

[analytic.wal.data_namespace]
ttl = "365d"

[analytic.wal.obkv]
full_user_name = "{full_user_name}"
param_url = "{param_url}"
password = "{password}"

[analytic.wal.obkv.client]
sys_user_name = "{sys_user_name}"
sys_password = "{sys_password}"

Kafka

If you have deployed a Kafka cluster, HoraeDB can also use it as the WAL storage. Here is example config for it, and you have to replace the templates with real parameters of the Kafka cluster:

1
2
3
4
5
[analytic.wal]
type = "Kafka"

[analytic.wal.kafka.client]
boost_brokers = [ "{boost_broker1}", "{boost_broker2}" ]

Meta Client Config

Besides the storage configurations, HoraeDB must be configured to start in WithMeta mode and connect to the deployed HoraeMeta:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
[cluster_deployment]
mode = "WithMeta"

[cluster_deployment.meta_client]
cluster_name = 'defaultCluster'
meta_addr = 'http://{HoraeMetaAddr}:2379'
lease = "10s"
timeout = "5s"

[cluster_deployment.etcd_client]
server_addrs = ['http://{HoraeMetaAddr}:2379']

Compaction Offload

Compaction offload is also supported. To enable compaction offload, the corresponding compaction mode with node picker and endpoint should be configured.

  • node_picker: There are two types of node picker – Local and Remote(WIP).
    • When the Local is setted, the local compaction task would be offloaded to the specific remote compaction server, which decided by endpoint.
  • endpoint: The endpoint, in the form addr:port, indicating the grpc port of the remote compaction server.

Here is an example for it:

1
2
3
4
[analytic.compaction_mode]
compaction_mode = "Offload"
node_picker = "Local"
endpoint = "{RemoteCompactionServerAddr}:{RemoteCompactionServerGrpcPort}"

A Compaction Server, responsible for executing the compaction task, is also needed. Currently horaedb-server will act as this role, in the future we can move it to an independent service.

Complete Config of HoraeDB

With all the parts of the configurations mentioned above, a runnable complete config for HoraeDB can be made. In order to make the HoraeDB cluster runnable, we can decide to adopt RocksDB-based WAL and local-disk-based Object Storage without compaction offload:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
[server]
bind_addr = "0.0.0.0"
http_port = 5440
grpc_port = 8831

[logger]
level = "info"

[runtime]
read_thread_num = 20
write_thread_num = 16
background_thread_num = 12

[cluster_deployment]
mode = "WithMeta"

[cluster_deployment.meta_client]
cluster_name = 'defaultCluster'
meta_addr = 'http://127.0.0.1:2379'
lease = "10s"
timeout = "5s"

[cluster_deployment.etcd_client]
server_addrs = ['127.0.0.1:2379']

[analytic]
write_group_worker_num = 16
replay_batch_size = 100
max_replay_tables_per_batch = 128
write_group_command_channel_cap = 1024
sst_background_read_parallelism = 8

[analytic.manifest]
scan_batch_size = 100
snapshot_every_n_updates = 10000
scan_timeout = "5s"
store_timeout = "5s"

[analytic.wal]
type = "RocksDB"
data_dir = "/home/admin/data/horaedb"

[analytic.storage]
mem_cache_capacity = "20GB"
# 1<<8=256
mem_cache_partition_bits = 8

[analytic.storage.object_store]
type = "Local"
data_dir = "/home/admin/data/horaedb/"

[analytic.table_opts]
arena_block_size = 2097152
write_buffer_size = 33554432

[analytic.compaction]
schedule_channel_len = 16
schedule_interval = "30m"
max_ongoing_tasks = 8
memory_limit = "4G"

Let’s name this config file as config.toml. And the example configs, in which the templates must be replaced with real parameters before use, for remote storages are also provided:

Run HoraeDB cluster with HoraeMeta

Firstly, let’s start the HoraeMeta:

1
2
3
docker run -d --name horaemeta-server \
  -p 2379:2379 \
  ghcr.io/apache/horaemeta-server:nightly-20231225-ab067bf0

With the started HoraeMeta cluster, let’s start the HoraeDB instance: TODO: complete it later

3 - SDK

3.1 - Go

Installation

go get github.com/apache/incubator-horaedb-client-go

You can get latest version here.

How To Use

Init HoraeDB Client

1
2
3
	client, err := horaedb.NewClient(endpoint, horaedb.Direct,
		horaedb.WithDefaultDatabase("public"),
	)
option namedescription
defaultDatabaseusing database, database can be overwritten by ReqContext in single Write or SQLRequest
RPCMaxRecvMsgSizeconfigration for grpc MaxCallRecvMsgSize, default 1024 _ 1024 _ 1024
RouteMaxCacheSizeIf the maximum number of router cache size, router client whill evict oldest if exceeded, default is 10000

Notice:

  • HoraeDB currently only supports the default database public now, multiple databases will be supported in the future

Manage Table

HoraeDB uses SQL to manage tables, such as creating tables, deleting tables, or adding columns, etc., which is not much different from when you use SQL to manage other databases.

For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.

Of course, you can also use create table statement to manage the table more finely (such as adding indexes).

Example for creating table

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
	createTableSQL := `
		CREATE TABLE IF NOT EXISTS demo (
			name string TAG,
			value double,
			t timestamp NOT NULL,
			TIMESTAMP KEY(t)
		) ENGINE=Analytic with (enable_ttl=false)`

	req := horaedb.SQLQueryRequest{
		Tables: []string{"demo"},
		SQL:    createTableSQL,
	}
	resp, err := client.SQLQuery(context.Background(), req)

Example for droping table

1
2
3
4
5
6
	dropTableSQL := `DROP TABLE demo`
	req := horaedb.SQLQueryRequest{
		Tables: []string{"demo"},
		SQL:    dropTableSQL,
	}
	resp, err := client.SQLQuery(context.Background(), req)

How To Build Write Data

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
	points := make([]horaedb.Point, 0, 2)
	for i := 0; i < 2; i++ {
		point, err := horaedb.NewPointBuilder("demo").
			SetTimestamp(now).
			AddTag("name", horaedb.NewStringValue("test_tag1")).
			AddField("value", horaedb.NewDoubleValue(0.4242)).
			Build()
		if err != nil {
			panic(err)
		}
		points = append(points, point)
	}

Write Example

1
2
3
4
	req := horaedb.WriteRequest{
		Points: points,
	}
	resp, err := client.Write(context.Background(), req)

Query Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
	querySQL := `SELECT * FROM demo`
	req := horaedb.SQLQueryRequest{
		Tables: []string{"demo"},
		SQL:    querySQL,
	}
	resp, err := client.SQLQuery(context.Background(), req)
	if err != nil {
        panic(err)
	}
	fmt.Printf("query table success, rows:%+v\n", resp.Rows)

Example

You can find the complete example here.

3.2 - Java

Introduction

HoraeDB Client is a high-performance Java client for HoraeDB.

Requirements

  • Java 8 or later is required for compilation

Dependency

1
2
3
4
5
<dependency>
  <groupId>io.ceresdb</groupId>
  <artifactId>ceresdb-all</artifactId>
  <version>${CERESDB.VERSION}</version>
</dependency>

You can get latest version here.

Init HoraeDB Client

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
final CeresDBOptions opts = CeresDBOptions.newBuilder("127.0.0.1", 8831, DIRECT) // CeresDB default grpc port 8831,use DIRECT RouteMode
        .database("public") // use database for client, can be overridden by the RequestContext in request
        // maximum retry times when write fails
        // (only some error codes will be retried, such as the routing table failure)
        .writeMaxRetries(1)
        // maximum retry times when read fails
        // (only some error codes will be retried, such as the routing table failure)
        .readMaxRetries(1).build();

final CeresDBClient client = new CeresDBClient();
if (!client.init(opts)) {
    throw new IllegalStateException("Fail to start CeresDBClient");
}

The initialization requires at least three parameters:

  • Endpoint: 127.0.0.1
  • Port: 8831
  • RouteMode: DIRECT/PROXY

Here is the explanation of RouteMode. There are two kinds of RouteMode,The Direct mode should be adopted to avoid forwarding overhead if all the servers are accessible to the client. However, the Proxy mode is the only choice if the access to the servers from the client must go through a gateway. For more configuration options, see configuration

Notice: HoraeDB currently only supports the default database public now, multiple databases will be supported in the future;

Create Table Example

For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.

Of course, you can also use create table statement to manage the table more finely (such as adding indexes).

The following table creation statement(using the SQL API included in SDK )shows all field types supported by HoraeDB:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Create table manually, creating table schema ahead of data ingestion is not required
String createTableSql = "CREATE TABLE IF NOT EXISTS machine_table(" +
        "ts TIMESTAMP NOT NULL," +
        "city STRING TAG NOT NULL," +
        "ip STRING TAG NOT NULL," +
        "cpu DOUBLE NULL," +
        "mem DOUBLE NULL," +
        "TIMESTAMP KEY(ts)" + // timestamp column must be specified
        ") ENGINE=Analytic";

Result<SqlQueryOk, Err> createResult = client.sqlQuery(new SqlQueryRequest(createTableSql)).get();
if (!createResult.isOk()) {
        throw new IllegalStateException("Fail to create table");
}

Drop Table Example

Here is an example of dropping table:

1
2
3
4
5
6
String dropTableSql = "DROP TABLE machine_table";

Result<SqlQueryOk, Err> dropResult = client.sqlQuery(new SqlQueryRequest(dropTableSql)).get();
if (!dropResult.isOk()) {
        throw new IllegalStateException("Fail to drop table");
}

Write Data Example

Firstly, you can use PointBuilder to build HoraeDB points:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
List<Point> pointList = new LinkedList<>();
for (int i = 0; i < 100; i++) {
    // build one point
    final Point point = Point.newPointBuilder("machine_table")
            .setTimestamp(t0)
            .addTag("city", "Singapore")
            .addTag("ip", "10.0.0.1")
            .addField("cpu", Value.withDouble(0.23))
            .addField("mem", Value.withDouble(0.55))
            .build();
    points.add(point);
}

Then, you can use write interface to write data:

1
2
3
4
5
6
7
8
final CompletableFuture<Result<WriteOk, Err>> wf = client.write(new WriteRequest(pointList));
// here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture API
final Result<WriteOk, Err> writeResult = wf.get();
        Assert.assertTrue(writeResult.isOk());
        // `Result` class referenced the Rust language practice, provides rich functions (such as mapXXX, andThen) transforming the result value to improve programming efficiency. You can refer to the API docs for detail usage.
        Assert.assertEquals(3, writeResult.getOk().getSuccess());
        Assert.assertEquals(3, writeResult.mapOr(0, WriteOk::getSuccess).intValue());
        Assert.assertEquals(0, writeResult.mapOr(-1, WriteOk::getFailed).intValue());

See write

Query Data Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final SqlQueryRequest queryRequest = SqlQueryRequest.newBuilder()
        .forTables("machine_table") // table name is optional. If not provided, SQL parser will parse the `ssql` to get the table name and do the routing automaticly
        .sql("select * from machine_table where ts = %d", t0) //
        .build();
final CompletableFuture<Result<SqlQueryOk, Err>> qf = client.sqlQuery(queryRequest);
// here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture API
final Result<SqlQueryOk, Err> queryResult = qf.get();

Assert.assertTrue(queryResult.isOk());

final SqlQueryOk queryOk = queryResult.getOk();
Assert.assertEquals(1, queryOk.getRowCount());

// get rows as list
final List<Row> rows = queryOk.getRowList();
Assert.assertEquals(t0, rows.get(0).getColumn("ts").getValue().getTimestamp());
Assert.assertEquals("Singapore", rows.get(0).getColumn("city").getValue().getString());
Assert.assertEquals("10.0.0.1", rows.get(0).getColumn("ip").getValue().getString());
Assert.assertEquals(0.23, rows.get(0).getColumn("cpu").getValue().getDouble(), 0.0000001);
Assert.assertEquals(0.55, rows.get(0).getColumn("mem").getValue().getDouble(), 0.0000001);

// get rows as stream
final Stream<Row> rowStream = queryOk.stream();
rowStream.forEach(row -> System.out.println(row.toString()));

See read

Stream Write/Read Example

HoraeDB support streaming writing and reading,suitable for large-scale data reading and writing。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
long start = System.currentTimeMillis();
long t = start;
final StreamWriteBuf<Point, WriteOk> writeBuf = client.streamWrite("machine_table");
for (int i = 0; i < 1000; i++) {
        final Point streamData = Point.newPointBuilder("machine_table")
                .setTimestamp(t)
                .addTag("city", "Beijing")
                .addTag("ip", "10.0.0.3")
                .addField("cpu", Value.withDouble(0.42))
                .addField("mem", Value.withDouble(0.67))
                .build();
        writeBuf.writeAndFlush(Collections.singletonList(streamData));
        t = t+1;
}
final CompletableFuture<WriteOk> writeOk = writeBuf.completed();
Assert.assertEquals(1000, writeOk.join().getSuccess());

final SqlQueryRequest streamQuerySql = SqlQueryRequest.newBuilder()
        .sql("select * from %s where city = '%s' and ts >= %d and ts < %d", "machine_table", "Beijing", start, t).build();
final Result<SqlQueryOk, Err> streamQueryResult = client.sqlQuery(streamQuerySql).get();
Assert.assertTrue(streamQueryResult.isOk());
Assert.assertEquals(1000, streamQueryResult.getOk().getRowCount());

See streaming

3.3 - Python

Introduction

horaedb-client is the python client for HoraeDB.

Thanks to PyO3, the python client is actually a wrapper on the rust client.

The guide will give a basic introduction to the python client by example.

Requirements

  • Python >= 3.7

Installation

1
pip install horaedb-client

You can get latest version here.

Init HoraeDB Client

The client initialization comes first, here is a code snippet:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import asyncio
import datetime
from ceresdb_client import Builder, RpcContext, PointBuilder, ValueBuilder, WriteRequest, SqlQueryRequest, Mode, RpcConfig

rpc_config = RpcConfig()
rpc_config = RpcConfig()
rpc_config.thread_num = 1
rpc_config.default_write_timeout_ms = 1000

builder = Builder('127.0.0.1:8831', Mode.Direct)
builder.set_rpc_config(rpc_config)
builder.set_default_database('public')
client = builder.build()

Firstly, it’s worth noting that the imported packages are used across all the code snippets in this guide, and they will not be repeated in the following.

The initialization requires at least two parameters:

  • Endpoint: the server endpoint consisting of ip address and serving port, e.g. 127.0.0.1:8831;
  • Mode: The mode of the communication between client and server, and there are two kinds of Mode: Direct and Proxy.

Endpoint is simple, while Mode deserves more explanation. The Direct mode should be adopted to avoid forwarding overhead if all the servers are accessible to the client. However, the Proxy mode is the only choice if the access to the servers from the client must go through a gateway.

The default_database can be set and will be used if following rpc calling without setting the database in the RpcContext.

By configuring the RpcConfig, resource and performance of the client can be manipulated, and all of the configurations can be referred at here.

Create Table

For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.

Of course, you can also use create table statement to manage the table more finely (such as adding indexes).

Here is a example for creating table by the initialized client:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
async def async_query(client, ctx, req):
    await client.sql_query(ctx, req)

create_table_sql = 'CREATE TABLE IF NOT EXISTS demo ( \
    name string TAG, \
    value double, \
    t timestamp NOT NULL, \
    TIMESTAMP KEY(t)) ENGINE=Analytic with (enable_ttl=false)'

req = SqlQueryRequest(['demo'], create_table_sql)
rpc_ctx = RpcContext()
rpc_ctx.database = 'public'
rpc_ctx.timeout_ms = 100

event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(async_query(client, rpc_ctx, req))

RpcContext can be used to overwrite the default database and timeout defined in the initialization of the client.

Write Data

PointBuilder can be used to construct a point, which is actually a row in data set. The write request consists of multiple points.

The example is simple:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
async def async_write(client, ctx, req):
    return await client.write(ctx, req)

point_builder = PointBuilder('demo')
point_builder.set_timestamp(1000 * int(round(datetime.datetime.now().timestamp())))
point_builder.set_tag("name", ValueBuilder().string("test_tag1"))
point_builder.set_field("value", ValueBuilder().double(0.4242))
point = point_builder.build()

write_request = WriteRequest()
write_request.add_point(point)

event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(async_write(client, ctx, req))

Query Data

By sql_query interface, it is easy to retrieve the data from the server:

req = SqlQueryRequest(['demo'], 'select * from demo')
event_loop = asyncio.get_event_loop()
resp = event_loop.run_until_complete(async_query(client, ctx, req))

As the example shows, two parameters are needed to construct the SqlQueryRequest:

  • The tables involved by this sql query;
  • The query sql.

Currently, the first parameter is necessary for performance on routing.

With retrieved data, we can process it row by row and column by column:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# Access row by index in the resp.
for row_idx in range(0, resp.num_rows()):
    row_tokens = []
    row = resp.row_by_idx(row_idx)
    for col_idx in range(0, row.num_cols()):
        col = row.column_by_idx(col_idx)
        row_tokens.append(f"{col.name()}:{col.value()}#{col.data_type()}")
    print(f"row#{row_idx}: {','.join(row_tokens)}")

# Access row by iter in the resp.
for row in resp.iter_rows():
    row_tokens = []
    for col in row.iter_columns():
        row_tokens.append(f"{col.name()}:{col.value()}#{col.data_type()}")
    print(f"row: {','.join(row_tokens)}")

Drop Table

Finally, we can drop the table by the sql api, which is similar to the table creation:

1
2
3
4
5
6
drop_table_sql = 'DROP TABLE demo'

req = SqlQueryRequest(['demo'], drop_table_sql)

event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(async_query(client, rpc_ctx, req))

3.4 - Rust

Install

1
cargo add horaedb-client

You can get latest version here.

Init Client

At first, we need to init the client.

  • New builder for the client, and you must set endpoint and mode:
    • endpoint is a string which is usually like “ip/domain_name:port”.
    • mode is used to define the way to access horaedb server, detail about mode.
1
let mut builder = Builder::new("ip/domain_name:port", Mode::Direct/Mode::Proxy);
1
2
3
4
5
6
let rpc_config = RpcConfig {
    thread_num: Some(1),
    default_write_timeout: Duration::from_millis(1000),
    ..Default::default()
};
let builder = builder.rpc_config(rpc_config);
  • Set default_database, it will be used if following rpc calling without setting the database in the RpcContext(will be introduced in later):
1
    let builder = builder.default_database("public");
  • Finally, we build client from builder:
1
    let client = builder.build();

Manage Table

For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.

Of course, you can also use create table statement to manage the table more finely (such as adding indexes).

You can use the sql query interface to create or drop table, related setting will be introduced in sql query section.

  • Create table:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
let create_table_sql = r#"CREATE TABLE IF NOT EXISTS horaedb (
            str_tag string TAG,
            int_tag int32 TAG,
            var_tag varbinary TAG,
            str_field string,
            int_field int32,
            bin_field varbinary,
            t timestamp NOT NULL,
            TIMESTAMP KEY(t)) ENGINE=Analytic with
            (enable_ttl='false')"#;
let req = SqlQueryRequest {
    tables: vec!["horaedb".to_string()],
    sql: create_table_sql.to_string(),
};

let resp = client
    .sql_query(rpc_ctx, &req)
    .await
    .expect("Should succeed to create table");
  • Drop table:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
let drop_table_sql = "DROP TABLE horaedb";
let req = SqlQueryRequest {
    tables: vec!["horaedb".to_string()],
    sql: drop_table_sql.to_string(),
};

let resp = client
    .sql_query(rpc_ctx, &req)
    .await
    .expect("Should succeed to create table");

Write

We support to write with the time series data model like InfluxDB.

  • Build the point first by PointBuilder, the related data structure of tag value and field value in it is defined as Value, detail about Value:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
let test_table = "horaedb";
let ts = Local::now().timestamp_millis();
let point = PointBuilder::new(test_table.to_string())
        .timestamp(ts)
        .tag("str_tag".to_string(), Value::String("tag_val".to_string()))
        .tag("int_tag".to_string(), Value::Int32(42))
        .tag(
            "var_tag".to_string(),
            Value::Varbinary(b"tag_bin_val".to_vec()),
        )
        .field(
            "str_field".to_string(),
            Value::String("field_val".to_string()),
        )
        .field("int_field".to_string(), Value::Int32(42))
        .field(
            "bin_field".to_string(),
            Value::Varbinary(b"field_bin_val".to_vec()),
        )
        .build()
        .unwrap();
  • Add the point to write request:
1
2
let mut write_req = WriteRequest::default();
write_req.add_point(point);
  • New rpc_ctx, and it can also be defined on demand or just use the default value, detail about rpc ctx:

  • Finally, write to server by client.

1
2
3
4
5
let rpc_ctx = RpcContext {
    database: Some("public".to_string()),
    ..Default::default()
};
let resp = client.write(rpc_ctx, &write_req).await.expect("Should success to write");

Sql Query

We support to query data with sql.

  • Define related tables and sql in sql query request:
1
2
3
4
let req = SqlQueryRequest {
    tables: vec![table name 1,...,table name n],
    sql: sql string (e.g. select * from xxx),
};
  • Query by client:
1
let resp = client.sql_query(rpc_ctx, &req).await.expect("Should success to write");

Example

You can find the complete example in the project.

4 - Operation and Maintenance

This guide introduces the operation and maintenance of HoraeDB, including cluster installation, database&table operations, fault tolerance, disaster recovery, data import and export, etc.

4.1 - Block List

Add block list

If you want to reject query for a table, you can add table name to ‘read_block_list’.

Example

1
2
3
4
5
6
7
curl --location --request POST 'http://localhost:5000/admin/block' \
--header 'Content-Type: application/json' \
-d '{
    "operation":"Add",
    "write_block_list":[],
    "read_block_list":["my_table"]
}'

Response

1
2
3
4
{
  "write_block_list": [],
  "read_block_list": ["my_table"]
}

Set block list

You can use set operation to clear exist tables and set new tables to ‘read_block_list’ like following example.

Example

1
2
3
4
5
6
7
curl --location --request POST 'http://localhost:5000/admin/block' \
--header 'Content-Type: application/json' \
-d '{
    "operation":"Set",
    "write_block_list":[],
    "read_block_list":["my_table1","my_table2"]
}'

Response

1
2
3
4
{
  "write_block_list": [],
  "read_block_list": ["my_table1", "my_table2"]
}

Remove block list

You can remove tables from ‘read_block_list’ like following example.

Example

1
2
3
4
5
6
7
curl --location --request POST 'http://localhost:5000/admin/block' \
--header 'Content-Type: application/json' \
-d '{
    "operation":"Remove",
    "write_block_list":[],
    "read_block_list":["my_table1"]
}'

Response

1
2
3
4
{
  "write_block_list": [],
  "read_block_list": ["my_table2"]
}

4.2 - Cluster Operation

The Operations for HoraeDB cluster mode, it can only be used when HoraeMeta is deployed.

Operation Interface

You need to replace 127.0.0.1 with the actual project path.

  • Query table When tableNames is not empty, use tableNames for query. When tableNames is empty, ids are used for query. When querying with ids, schemaName is useless.
curl --location 'http://127.0.0.1:8080/api/v1/table/query' \
--header 'Content-Type: application/json' \
-d '{
    "clusterName":"defaultCluster",
    "schemaName":"public",
    "names":["demo1", "__demo1_0"],
}'

curl --location 'http://127.0.0.1:8080/api/v1/table/query' \
--header 'Content-Type: application/json' \
-d '{
    "clusterName":"defaultCluster",
    "ids":[0, 1]
}'
  • Query the route of table
curl --location --request POST 'http://127.0.0.1:8080/api/v1/route' \
--header 'Content-Type: application/json' \
-d '{
    "clusterName":"defaultCluster",
    "schemaName":"public",
    "table":["demo"]
}'
  • Query the mapping of shard and node
curl --location --request POST 'http://127.0.0.1:8080/api/v1/getNodeShards' \
--header 'Content-Type: application/json' \
-d '{
    "ClusterName":"defaultCluster"
}'
  • Query the mapping of table and shard If ShardIDs in the request is empty, query with all shardIDs in the cluster.
curl --location --request POST 'http://127.0.0.1:8080/api/v1/getShardTables' \
--header 'Content-Type: application/json' \
-d '{
    "clusterName":"defaultCluster",
    "shardIDs": [1,2]
}'
  • Drop table
curl --location --request POST 'http://127.0.0.1:8080/api/v1/dropTable' \
--header 'Content-Type: application/json' \
-d '{
    "clusterName": "defaultCluster",
    "schemaName": "public",
    "table": "demo"
}'
  • Transfer leader shard
curl --location --request POST 'http://127.0.0.1:8080/api/v1/transferLeader' \
--header 'Content-Type: application/json' \
-d '{
    "clusterName":"defaultCluster",
    "shardID": 1,
    "oldLeaderNodeName": "127.0.0.1:8831",
    "newLeaderNodeName": "127.0.0.1:18831"
}'
  • Split shard
curl --location --request POST 'http://127.0.0.1:8080/api/v1/split' \
--header 'Content-Type: application/json' \
-d '{
    "clusterName" : "defaultCluster",
    "schemaName" :"public",
    "nodeName" :"127.0.0.1:8831",
    "shardID" : 0,
    "splitTables":["demo"]
}'
  • Create cluster
curl --location 'http://127.0.0.1:8080/api/v1/clusters' \
--header 'Content-Type: application/json' \
--data '{
    "name":"testCluster",
    "nodeCount":3,
    "shardTotal":9,
    "topologyType":"static"
}'
  • Update cluster
curl --location --request PUT 'http://127.0.0.1:8080/api/v1/clusters/{NewClusterName}' \
--header 'Content-Type: application/json' \
--data '{
    "nodeCount":28,
    "shardTotal":128,
    "topologyType":"dynamic"
}'
  • List clusters
curl --location 'http://127.0.0.1:8080/api/v1/clusters'
  • Update enableSchedule
curl --location --request PUT 'http://127.0.0.1:8080/api/v1/clusters/{ClusterName}/enableSchedule' \
--header 'Content-Type: application/json' \
--data '{
    "enable":true
}'
  • Query enableSchedule
curl --location 'http://127.0.0.1:8080/api/v1/clusters/{ClusterName}/enableSchedule'
  • Update flow limiter
curl --location --request PUT 'http://127.0.0.1:8080/api/v1/flowLimiter' \
--header 'Content-Type: application/json' \
--data '{
    "limit":1000,
    "burst":10000,
    "enable":true
}'
  • Query information of flow limiter
curl --location 'http://127.0.0.1:8080/api/v1/flowLimiter'
  • List nodes of HoraeMeta cluster
curl --location 'http://127.0.0.1:8080/api/v1/etcd/member'
  • Move leader of HoraeMeta cluster
curl --location 'http://127.0.0.1:8080/api/v1/etcd/moveLeader' \
--header 'Content-Type: application/json' \
--data '{
    "memberName":"meta1"
}'
  • Add node of HoraeMeta cluster
curl --location --request PUT 'http://127.0.0.1:8080/api/v1/etcd/member' \
--header 'Content-Type: application/json' \
--data '{
    "memberAddrs":["http://127.0.0.1:42380"]
}'
  • Replace node of HoraeMeta cluster
curl --location 'http://127.0.0.1:8080/api/v1/etcd/member' \
--header 'Content-Type: application/json' \
--data '{
    "oldMemberName":"meta0",
    "newMemberAddr":["http://127.0.0.1:42380"]
}'

4.3 - Observability

HoraeDB is observable with Prometheus and Grafana.

Prometheus

Prometheus is a systems and service monitoring system.

Configuration

Save the following configuration into the prometheus.yml file. For example, in the tmp directory, /tmp/prometheus.yml.

Two HoraeDB http service are started on localhost:5440 and localhost:5441.

1
2
3
4
5
6
7
8
global:
  scrape_interval: 30s
scrape_configs:
  - job_name: horaedb-server
    static_configs:
      - targets: [your_ip:5440, your_ip:5441]
        labels:
          env: horaedbcluster

See details about configuration here.

Run

You can use docker to start Prometheus. The docker image information is here.

docker run \
    -d --name=prometheus \
    -p 9090:9090 \
    -v /tmp/prometheus.yml:/etc/prometheus/prometheus.yml \
    prom/prometheus:v2.41.0

For more detailed installation methods, refer to here.

Grafana

Grafana is an open and composable observability and data visualization platform.

Run

You can use docker to start grafana. The docker image information is here.

docker run -d --name=grafana -p 3000:3000 grafana/grafana:9.3.6

Default admin user credentials are admin/admin.

Grafana is available on http://127.0.0.1:3000.

For more detailed installation methods, refer to here.

Configure data source

  1. Hover the cursor over the Configuration (gear) icon.
  2. Select Data Sources.
  3. Select the Prometheus data source.

Note: The url of Prometheus is http://your_ip:9090.

See more details here.

Import grafana dashboard

dashboard json

HoraeDB Metrics

After importing the dashboard, you will see the following page:

Panels

  • tps: Number of cluster write requests.
  • qps: Number of cluster query requests.
  • 99th query/write duration: 99th quantile of write and query duration.
  • table query: Query group by table.
  • 99th write duration details by instance: 99th quantile of write duration group by instance.
  • 99th query duration details by instance: 99th quantile of query duration group by instance.
  • 99th write partition table duration: 99th quantile of write duration of partition table.
  • table rows: The rows of data written.
  • table rows by instance: The written rows by instance.
  • total tables to write: Number of tables with data written.
  • flush count: Number of HoraeDB flush.
  • 99th flush duration details by instance: 99th quantile of flush duration group by instance.
  • 99th write stall duration details by instance: 99th quantile of write stall duration group by instance.

4.4 - Table Operation

Query Table Information

Like Mysql’s information_schema.tables, HoraeDB provides system.public.tables to save tables information. Columns:

  • timestamp([TimeStamp])
  • catalog([String])
  • schema([String])
  • table_name([String])
  • table_id([Uint64])
  • engine([String])

Example

Query table information via table_name like this:

1
2
3
4
5
curl --location --request POST 'http://localhost:5000/sql' \
--header 'Content-Type: application/json' \
-d '{
    "query": "select * from system.public.tables where `table_name`=\"my_table\""
}'

Response

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
    "rows":[
        {
            "timestamp":0,
            "catalog":"horaedb",
            "schema":"public",
            "table_name":"my_table",
            "table_id":3298534886446,
            "engine":"Analytic"
        }
}

4.5 - Table Operation

HoraeDB supports standard SQL protocols and allows you to create tables and read/write data via http requests. More SQL

Create Table

Example

1
2
3
4
5
curl --location --request POST 'http://127.0.0.1:5000/sql' \
--header 'Content-Type: application/json' \
-d '{
    "query": "CREATE TABLE `demo` (`name` string TAG, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE=Analytic with (enable_ttl='\''false'\'')"
}'

Write Data

Example

1
2
3
4
5
curl --location --request POST 'http://127.0.0.1:5000/sql' \
--header 'Content-Type: application/json' \
-d '{
    "query": "INSERT INTO demo(t, name, value) VALUES(1651737067000, '\''horaedb'\'', 100)"
}'

Read Data

Example

1
2
3
4
5
curl --location --request POST 'http://127.0.0.1:5000/sql' \
--header 'Content-Type: application/json' \
-d '{
    "query": "select * from demo"
}'

Query Table Info

Example

1
2
3
4
5
curl --location --request POST 'http://127.0.0.1:5000/sql' \
--header 'Content-Type: application/json' \
-d '{
    "query": "show create table demo"
}'

Drop Table

Example

1
2
3
4
5
curl --location --request POST 'http://127.0.0.1:5000/sql' \
--header 'Content-Type: application/json' \
-d '{
    "query": "DROP TABLE demo"
}'

Route Table

Example

1
curl --location --request GET 'http://127.0.0.1:5000/route/{table_name}'

5 - Ecosystem

HoraeDB has an open ecosystem that encourages collaboration and innovation, allowing developers to use what suits them best.

5.1 - InfluxDB

InfluxDB is a time series database designed to handle high write and query loads. It is an integral component of the TICK stack. InfluxDB is meant to be used as a backing store for any use case involving large amounts of timestamped data, including DevOps monitoring, application metrics, IoT sensor data, and real-time analytics.

HoraeDB support InfluxDB v1.8 write and query API.

Warn: users need to add following config to server’s config in order to try InfluxDB write/query.

1
2
[server.default_schema_config]
default_timestamp_column_name = "time"

Write

1
2
3
4
5
6
curl -i -XPOST "http://localhost:5440/influxdb/v1/write" --data-binary '
demo,tag1=t1,tag2=t2 field1=90,field2=100 1679994647000
demo,tag1=t1,tag2=t2 field1=91,field2=101 1679994648000
demo,tag1=t11,tag2=t22 field1=90,field2=100 1679994647000
demo,tag1=t11,tag2=t22 field1=91,field2=101 1679994648000
'

Post payload is in InfluxDB line protocol format.

Measurement will be mapped to table in HoraeDB, and it will be created automatically in first write(Note: The default TTL is 7d, and points written exceed TTL will be discarded directly).

For example, when inserting data above, table below will be automatically created in HoraeDB:

1
2
3
4
5
6
7
8
9
CREATE TABLE `demo` (
    `tsid` uint64 NOT NULL,
    `time` timestamp NOT NULL,
    `field1` double,
    `field2` double,
    `tag1` string TAG,
    `tag2` string TAG,
    PRIMARY KEY (tsid, time),
    timestamp KEY (time))

Note

  • When InfluxDB writes data, the timestamp precision is nanosecond by default, HoraeDB only supports millisecond timestamp, user can specify the data precision by precision parameter, HoraeDB will automatically convert to millisecond precision internally.
  • Query string parameters such as db aren’t supported.

Query

1
 curl -G 'http://localhost:5440/influxdb/v1/query' --data-urlencode 'q=SELECT * FROM "demo"'

Query result is same with InfluxDB:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
{
  "results": [
    {
      "statement_id": 0,
      "series": [
        {
          "name": "demo",
          "columns": ["time", "field1", "field2", "tag1", "tag2"],
          "values": [
            [1679994647000, 90.0, 100.0, "t1", "t2"],
            [1679994647000, 90.0, 100.0, "t11", "t22"],
            [1679994648000, 91.0, 101.0, "t1", "t2"],
            [1679994648000, 91.0, 101.0, "t11", "t22"]
          ]
        }
      ]
    }
  ]
}

Usage in Grafana

HoraeDB can be used as InfluxDB data source in Grafana.

  • Select InfluxDB type when add data source
  • Then input http://{ip}:{5440}/influxdb/v1/ in HTTP URL. For local deployment, URL is http://localhost:5440/influxdb/v1/
  • Save & test

Note

Query string parameters such as epoch, db, pretty aren’t supported.

5.2 - OpenTSDB

OpenTSDB is a distributed and scalable time series database based on HBase.

Write

HoraeDB follows the OpenTSDB put write protocol.

summary and detailed are not yet supported.

curl --location 'http://localhost:5440/opentsdb/api/put' \
--header 'Content-Type: application/json' \
-d '[{
    "metric": "sys.cpu.nice",
    "timestamp": 1692588459000,
    "value": 18,
    "tags": {
       "host": "web01",
       "dc": "lga"
    }
},
{
    "metric": "sys.cpu.nice",
    "timestamp": 1692588459000,
    "value": 18,
    "tags": {
       "host": "web01"
    }
}]'
'

Metric will be mapped to table in HoraeDB, and it will be created automatically in first write(Note: The default TTL is 7d, and points written exceed TTL will be discarded directly).

For example, when inserting data above, table below will be automatically created in HoraeDB:

CREATE TABLE `sys.cpu.nice`(
    `tsid` uint64 NOT NULL,
    `timestamp` timestamp NOT NULL,
    `dc` string TAG,
    `host` string TAG,
    `value` bigint,
    PRIMARY KEY(tsid, timestamp),
    TIMESTAMP KEY(timestamp))
    ENGINE = Analytic
    WITH(arena_block_size = '2097152', compaction_strategy = 'default',
    compression = 'ZSTD', enable_ttl = 'true', num_rows_per_row_group = '8192',
    segment_duration = '2h', storage_format = 'AUTO', ttl = '7d',
    update_mode = 'OVERWRITE', write_buffer_size = '33554432')

Query

OpenTSDB query protocol is not currently supported, tracking issue.

5.3 - Prometheus

Prometheus is a popular cloud-native monitoring tool that is widely adopted by organizations due to its scalability, reliability, and scalability. It is used to scrape metrics from cloud-native services, such as Kubernetes and OpenShift, and stores it in a time-series database. Prometheus is also easily extensible, allowing users to extend its features and capabilities with other databases.

HoraeDB can be used as a long-term storage solution for Prometheus. Both remote read and remote write API are supported.

Config

You can configure Prometheus to use HoraeDB as a remote storage by adding following lines to prometheus.yml:

1
2
3
4
remote_write:
  - url: "http://<address>:<http_port>/prom/v1/write"
remote_read:
  - url: "http://<address>:<http_port>/prom/v1/read"

Each metric will be converted to one table in HoraeDB:

  • labels are mapped to corresponding string tag column
  • timestamp of sample is mapped to a timestamp timestmap column
  • value of sample is mapped to a double value column

For example, up metric below will be mapped to up table:

up{env="dev", instance="127.0.0.1:9090", job="prometheus-server"}

Its corresponding table in HoraeDB(Note: The TTL for creating a table is 7d, and points written exceed TTL will be discarded directly):

CREATE TABLE `up` (
    `timestamp` timestamp NOT NULL,
    `tsid` uint64 NOT NULL,
    `env` string TAG,
    `instance` string TAG,
    `job` string TAG,
    `value` double,
    PRIMARY KEY (tsid, timestamp),
    timestamp KEY (timestamp)
);

SELECT * FROM up;
tsidtimestampenvinstancejobvalue
126831624713096632781675824740880dev127.0.0.1:9090prometheus-server1