This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

SDK

1 - Go

Installation

go get github.com/apache/incubator-horaedb-client-go

You can get latest version here.

How To Use

Init HoraeDB Client

1
2
3
	client, err := horaedb.NewClient(endpoint, horaedb.Direct,
		horaedb.WithDefaultDatabase("public"),
	)
option namedescription
defaultDatabaseusing database, database can be overwritten by ReqContext in single Write or SQLRequest
RPCMaxRecvMsgSizeconfigration for grpc MaxCallRecvMsgSize, default 1024 _ 1024 _ 1024
RouteMaxCacheSizeIf the maximum number of router cache size, router client whill evict oldest if exceeded, default is 10000

Notice:

  • HoraeDB currently only supports the default database public now, multiple databases will be supported in the future

Manage Table

HoraeDB uses SQL to manage tables, such as creating tables, deleting tables, or adding columns, etc., which is not much different from when you use SQL to manage other databases.

For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.

Of course, you can also use create table statement to manage the table more finely (such as adding indexes).

Example for creating table

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
	createTableSQL := `
		CREATE TABLE IF NOT EXISTS demo (
			name string TAG,
			value double,
			t timestamp NOT NULL,
			TIMESTAMP KEY(t)
		) ENGINE=Analytic with (enable_ttl=false)`

	req := horaedb.SQLQueryRequest{
		Tables: []string{"demo"},
		SQL:    createTableSQL,
	}
	resp, err := client.SQLQuery(context.Background(), req)

Example for droping table

1
2
3
4
5
6
	dropTableSQL := `DROP TABLE demo`
	req := horaedb.SQLQueryRequest{
		Tables: []string{"demo"},
		SQL:    dropTableSQL,
	}
	resp, err := client.SQLQuery(context.Background(), req)

How To Build Write Data

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
	points := make([]horaedb.Point, 0, 2)
	for i := 0; i < 2; i++ {
		point, err := horaedb.NewPointBuilder("demo").
			SetTimestamp(now).
			AddTag("name", horaedb.NewStringValue("test_tag1")).
			AddField("value", horaedb.NewDoubleValue(0.4242)).
			Build()
		if err != nil {
			panic(err)
		}
		points = append(points, point)
	}

Write Example

1
2
3
4
	req := horaedb.WriteRequest{
		Points: points,
	}
	resp, err := client.Write(context.Background(), req)

Query Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
	querySQL := `SELECT * FROM demo`
	req := horaedb.SQLQueryRequest{
		Tables: []string{"demo"},
		SQL:    querySQL,
	}
	resp, err := client.SQLQuery(context.Background(), req)
	if err != nil {
        panic(err)
	}
	fmt.Printf("query table success, rows:%+v\n", resp.Rows)

Example

You can find the complete example here.

2 - Java

Introduction

HoraeDB Client is a high-performance Java client for HoraeDB.

Requirements

  • Java 8 or later is required for compilation

Dependency

1
2
3
4
5
<dependency>
  <groupId>io.ceresdb</groupId>
  <artifactId>ceresdb-all</artifactId>
  <version>${CERESDB.VERSION}</version>
</dependency>

You can get latest version here.

Init HoraeDB Client

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
final CeresDBOptions opts = CeresDBOptions.newBuilder("127.0.0.1", 8831, DIRECT) // CeresDB default grpc port 8831,use DIRECT RouteMode
        .database("public") // use database for client, can be overridden by the RequestContext in request
        // maximum retry times when write fails
        // (only some error codes will be retried, such as the routing table failure)
        .writeMaxRetries(1)
        // maximum retry times when read fails
        // (only some error codes will be retried, such as the routing table failure)
        .readMaxRetries(1).build();

final CeresDBClient client = new CeresDBClient();
if (!client.init(opts)) {
    throw new IllegalStateException("Fail to start CeresDBClient");
}

The initialization requires at least three parameters:

  • Endpoint: 127.0.0.1
  • Port: 8831
  • RouteMode: DIRECT/PROXY

Here is the explanation of RouteMode. There are two kinds of RouteMode,The Direct mode should be adopted to avoid forwarding overhead if all the servers are accessible to the client. However, the Proxy mode is the only choice if the access to the servers from the client must go through a gateway. For more configuration options, see configuration

Notice: HoraeDB currently only supports the default database public now, multiple databases will be supported in the future;

Create Table Example

For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.

Of course, you can also use create table statement to manage the table more finely (such as adding indexes).

The following table creation statement(using the SQL API included in SDK )shows all field types supported by HoraeDB:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Create table manually, creating table schema ahead of data ingestion is not required
String createTableSql = "CREATE TABLE IF NOT EXISTS machine_table(" +
        "ts TIMESTAMP NOT NULL," +
        "city STRING TAG NOT NULL," +
        "ip STRING TAG NOT NULL," +
        "cpu DOUBLE NULL," +
        "mem DOUBLE NULL," +
        "TIMESTAMP KEY(ts)" + // timestamp column must be specified
        ") ENGINE=Analytic";

Result<SqlQueryOk, Err> createResult = client.sqlQuery(new SqlQueryRequest(createTableSql)).get();
if (!createResult.isOk()) {
        throw new IllegalStateException("Fail to create table");
}

Drop Table Example

Here is an example of dropping table:

1
2
3
4
5
6
String dropTableSql = "DROP TABLE machine_table";

Result<SqlQueryOk, Err> dropResult = client.sqlQuery(new SqlQueryRequest(dropTableSql)).get();
if (!dropResult.isOk()) {
        throw new IllegalStateException("Fail to drop table");
}

Write Data Example

Firstly, you can use PointBuilder to build HoraeDB points:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
List<Point> pointList = new LinkedList<>();
for (int i = 0; i < 100; i++) {
    // build one point
    final Point point = Point.newPointBuilder("machine_table")
            .setTimestamp(t0)
            .addTag("city", "Singapore")
            .addTag("ip", "10.0.0.1")
            .addField("cpu", Value.withDouble(0.23))
            .addField("mem", Value.withDouble(0.55))
            .build();
    points.add(point);
}

Then, you can use write interface to write data:

1
2
3
4
5
6
7
8
final CompletableFuture<Result<WriteOk, Err>> wf = client.write(new WriteRequest(pointList));
// here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture API
final Result<WriteOk, Err> writeResult = wf.get();
        Assert.assertTrue(writeResult.isOk());
        // `Result` class referenced the Rust language practice, provides rich functions (such as mapXXX, andThen) transforming the result value to improve programming efficiency. You can refer to the API docs for detail usage.
        Assert.assertEquals(3, writeResult.getOk().getSuccess());
        Assert.assertEquals(3, writeResult.mapOr(0, WriteOk::getSuccess).intValue());
        Assert.assertEquals(0, writeResult.mapOr(-1, WriteOk::getFailed).intValue());

See write

Query Data Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final SqlQueryRequest queryRequest = SqlQueryRequest.newBuilder()
        .forTables("machine_table") // table name is optional. If not provided, SQL parser will parse the `ssql` to get the table name and do the routing automaticly
        .sql("select * from machine_table where ts = %d", t0) //
        .build();
final CompletableFuture<Result<SqlQueryOk, Err>> qf = client.sqlQuery(queryRequest);
// here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture API
final Result<SqlQueryOk, Err> queryResult = qf.get();

Assert.assertTrue(queryResult.isOk());

final SqlQueryOk queryOk = queryResult.getOk();
Assert.assertEquals(1, queryOk.getRowCount());

// get rows as list
final List<Row> rows = queryOk.getRowList();
Assert.assertEquals(t0, rows.get(0).getColumn("ts").getValue().getTimestamp());
Assert.assertEquals("Singapore", rows.get(0).getColumn("city").getValue().getString());
Assert.assertEquals("10.0.0.1", rows.get(0).getColumn("ip").getValue().getString());
Assert.assertEquals(0.23, rows.get(0).getColumn("cpu").getValue().getDouble(), 0.0000001);
Assert.assertEquals(0.55, rows.get(0).getColumn("mem").getValue().getDouble(), 0.0000001);

// get rows as stream
final Stream<Row> rowStream = queryOk.stream();
rowStream.forEach(row -> System.out.println(row.toString()));

See read

Stream Write/Read Example

HoraeDB support streaming writing and reading,suitable for large-scale data reading and writing。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
long start = System.currentTimeMillis();
long t = start;
final StreamWriteBuf<Point, WriteOk> writeBuf = client.streamWrite("machine_table");
for (int i = 0; i < 1000; i++) {
        final Point streamData = Point.newPointBuilder("machine_table")
                .setTimestamp(t)
                .addTag("city", "Beijing")
                .addTag("ip", "10.0.0.3")
                .addField("cpu", Value.withDouble(0.42))
                .addField("mem", Value.withDouble(0.67))
                .build();
        writeBuf.writeAndFlush(Collections.singletonList(streamData));
        t = t+1;
}
final CompletableFuture<WriteOk> writeOk = writeBuf.completed();
Assert.assertEquals(1000, writeOk.join().getSuccess());

final SqlQueryRequest streamQuerySql = SqlQueryRequest.newBuilder()
        .sql("select * from %s where city = '%s' and ts >= %d and ts < %d", "machine_table", "Beijing", start, t).build();
final Result<SqlQueryOk, Err> streamQueryResult = client.sqlQuery(streamQuerySql).get();
Assert.assertTrue(streamQueryResult.isOk());
Assert.assertEquals(1000, streamQueryResult.getOk().getRowCount());

See streaming

3 - Python

Introduction

horaedb-client is the python client for HoraeDB.

Thanks to PyO3, the python client is actually a wrapper on the rust client.

The guide will give a basic introduction to the python client by example.

Requirements

  • Python >= 3.7

Installation

1
pip install horaedb-client

You can get latest version here.

Init HoraeDB Client

The client initialization comes first, here is a code snippet:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import asyncio
import datetime
from ceresdb_client import Builder, RpcContext, PointBuilder, ValueBuilder, WriteRequest, SqlQueryRequest, Mode, RpcConfig

rpc_config = RpcConfig()
rpc_config = RpcConfig()
rpc_config.thread_num = 1
rpc_config.default_write_timeout_ms = 1000

builder = Builder('127.0.0.1:8831', Mode.Direct)
builder.set_rpc_config(rpc_config)
builder.set_default_database('public')
client = builder.build()

Firstly, it’s worth noting that the imported packages are used across all the code snippets in this guide, and they will not be repeated in the following.

The initialization requires at least two parameters:

  • Endpoint: the server endpoint consisting of ip address and serving port, e.g. 127.0.0.1:8831;
  • Mode: The mode of the communication between client and server, and there are two kinds of Mode: Direct and Proxy.

Endpoint is simple, while Mode deserves more explanation. The Direct mode should be adopted to avoid forwarding overhead if all the servers are accessible to the client. However, the Proxy mode is the only choice if the access to the servers from the client must go through a gateway.

The default_database can be set and will be used if following rpc calling without setting the database in the RpcContext.

By configuring the RpcConfig, resource and performance of the client can be manipulated, and all of the configurations can be referred at here.

Create Table

For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.

Of course, you can also use create table statement to manage the table more finely (such as adding indexes).

Here is a example for creating table by the initialized client:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
async def async_query(client, ctx, req):
    await client.sql_query(ctx, req)

create_table_sql = 'CREATE TABLE IF NOT EXISTS demo ( \
    name string TAG, \
    value double, \
    t timestamp NOT NULL, \
    TIMESTAMP KEY(t)) ENGINE=Analytic with (enable_ttl=false)'

req = SqlQueryRequest(['demo'], create_table_sql)
rpc_ctx = RpcContext()
rpc_ctx.database = 'public'
rpc_ctx.timeout_ms = 100

event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(async_query(client, rpc_ctx, req))

RpcContext can be used to overwrite the default database and timeout defined in the initialization of the client.

Write Data

PointBuilder can be used to construct a point, which is actually a row in data set. The write request consists of multiple points.

The example is simple:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
async def async_write(client, ctx, req):
    return await client.write(ctx, req)

point_builder = PointBuilder('demo')
point_builder.set_timestamp(1000 * int(round(datetime.datetime.now().timestamp())))
point_builder.set_tag("name", ValueBuilder().string("test_tag1"))
point_builder.set_field("value", ValueBuilder().double(0.4242))
point = point_builder.build()

write_request = WriteRequest()
write_request.add_point(point)

event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(async_write(client, ctx, req))

Query Data

By sql_query interface, it is easy to retrieve the data from the server:

req = SqlQueryRequest(['demo'], 'select * from demo')
event_loop = asyncio.get_event_loop()
resp = event_loop.run_until_complete(async_query(client, ctx, req))

As the example shows, two parameters are needed to construct the SqlQueryRequest:

  • The tables involved by this sql query;
  • The query sql.

Currently, the first parameter is necessary for performance on routing.

With retrieved data, we can process it row by row and column by column:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# Access row by index in the resp.
for row_idx in range(0, resp.num_rows()):
    row_tokens = []
    row = resp.row_by_idx(row_idx)
    for col_idx in range(0, row.num_cols()):
        col = row.column_by_idx(col_idx)
        row_tokens.append(f"{col.name()}:{col.value()}#{col.data_type()}")
    print(f"row#{row_idx}: {','.join(row_tokens)}")

# Access row by iter in the resp.
for row in resp.iter_rows():
    row_tokens = []
    for col in row.iter_columns():
        row_tokens.append(f"{col.name()}:{col.value()}#{col.data_type()}")
    print(f"row: {','.join(row_tokens)}")

Drop Table

Finally, we can drop the table by the sql api, which is similar to the table creation:

1
2
3
4
5
6
drop_table_sql = 'DROP TABLE demo'

req = SqlQueryRequest(['demo'], drop_table_sql)

event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(async_query(client, rpc_ctx, req))

4 - Rust

Install

1
cargo add horaedb-client

You can get latest version here.

Init Client

At first, we need to init the client.

  • New builder for the client, and you must set endpoint and mode:
    • endpoint is a string which is usually like “ip/domain_name:port”.
    • mode is used to define the way to access horaedb server, detail about mode.
1
let mut builder = Builder::new("ip/domain_name:port", Mode::Direct/Mode::Proxy);
1
2
3
4
5
6
let rpc_config = RpcConfig {
    thread_num: Some(1),
    default_write_timeout: Duration::from_millis(1000),
    ..Default::default()
};
let builder = builder.rpc_config(rpc_config);
  • Set default_database, it will be used if following rpc calling without setting the database in the RpcContext(will be introduced in later):
1
    let builder = builder.default_database("public");
  • Finally, we build client from builder:
1
    let client = builder.build();

Manage Table

For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.

Of course, you can also use create table statement to manage the table more finely (such as adding indexes).

You can use the sql query interface to create or drop table, related setting will be introduced in sql query section.

  • Create table:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
let create_table_sql = r#"CREATE TABLE IF NOT EXISTS horaedb (
            str_tag string TAG,
            int_tag int32 TAG,
            var_tag varbinary TAG,
            str_field string,
            int_field int32,
            bin_field varbinary,
            t timestamp NOT NULL,
            TIMESTAMP KEY(t)) ENGINE=Analytic with
            (enable_ttl='false')"#;
let req = SqlQueryRequest {
    tables: vec!["horaedb".to_string()],
    sql: create_table_sql.to_string(),
};

let resp = client
    .sql_query(rpc_ctx, &req)
    .await
    .expect("Should succeed to create table");
  • Drop table:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
let drop_table_sql = "DROP TABLE horaedb";
let req = SqlQueryRequest {
    tables: vec!["horaedb".to_string()],
    sql: drop_table_sql.to_string(),
};

let resp = client
    .sql_query(rpc_ctx, &req)
    .await
    .expect("Should succeed to create table");

Write

We support to write with the time series data model like InfluxDB.

  • Build the point first by PointBuilder, the related data structure of tag value and field value in it is defined as Value, detail about Value:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
let test_table = "horaedb";
let ts = Local::now().timestamp_millis();
let point = PointBuilder::new(test_table.to_string())
        .timestamp(ts)
        .tag("str_tag".to_string(), Value::String("tag_val".to_string()))
        .tag("int_tag".to_string(), Value::Int32(42))
        .tag(
            "var_tag".to_string(),
            Value::Varbinary(b"tag_bin_val".to_vec()),
        )
        .field(
            "str_field".to_string(),
            Value::String("field_val".to_string()),
        )
        .field("int_field".to_string(), Value::Int32(42))
        .field(
            "bin_field".to_string(),
            Value::Varbinary(b"field_bin_val".to_vec()),
        )
        .build()
        .unwrap();
  • Add the point to write request:
1
2
let mut write_req = WriteRequest::default();
write_req.add_point(point);
  • New rpc_ctx, and it can also be defined on demand or just use the default value, detail about rpc ctx:

  • Finally, write to server by client.

1
2
3
4
5
let rpc_ctx = RpcContext {
    database: Some("public".to_string()),
    ..Default::default()
};
let resp = client.write(rpc_ctx, &write_req).await.expect("Should success to write");

Sql Query

We support to query data with sql.

  • Define related tables and sql in sql query request:
1
2
3
4
let req = SqlQueryRequest {
    tables: vec![table name 1,...,table name n],
    sql: sql string (e.g. select * from xxx),
};
  • Query by client:
1
let resp = client.sql_query(rpc_ctx, &req).await.expect("Should success to write");

Example

You can find the complete example in the project.