This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

SDK 文档

1 - Go

安装

go get github.com/apache/incubator-horaedb-client-go

你可以在这里找到最新的版本 here.

如何使用

初始化客户端

1
2
3
	client, err := horaedb.NewClient(endpoint, horaedb.Direct,
		horaedb.WithDefaultDatabase("public"), // Client所使用的database
	)
参数名称说明
defaultDatabase所使用的 database,可以被单个 Write 或者 SQLRequest 请求中的 database 覆盖
RPCMaxRecvMsgSizegrpc MaxCallRecvMsgSize 配置, 默认是 1024 _ 1024 _ 1024
RouteMaxCacheSize如果 router 客户端中的 路由缓存超过了这个值,将会淘汰最不活跃的直至降低这个阈值, 默认是 10000

注意: HoraeDB 当前仅支持预创建的 public database , 未来会支持多个 database。

管理表

HoraeDB 使用 SQL 来管理表格,比如创建表、删除表或者新增列等等,这和你在使用 SQL 管理其他的数据库时没有太大的区别。

为了方便使用,在使用 gRPC 的 write 接口进行写入时,如果某个表不存在,HoraeDB 会根据第一次的写入自动创建一个表。

当然你也可以通过 create table 语句来更精细化的管理的表(比如添加索引等)。

创建表的样例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
	createTableSQL := `
		CREATE TABLE IF NOT EXISTS demo (
			name string TAG,
			value double,
			t timestamp NOT NULL,
			TIMESTAMP KEY(t)
		) ENGINE=Analytic with (enable_ttl=false)`

	req := horaedb.SQLQueryRequest{
		Tables: []string{"demo"},
		SQL:    createTableSQL,
	}
	resp, err := client.SQLQuery(context.Background(), req)

删除表的样例

1
2
3
4
5
6
	dropTableSQL := `DROP TABLE demo`
	req := horaedb.SQLQueryRequest{
		Tables: []string{"demo"},
		SQL:    dropTableSQL,
	}
	resp, err := client.SQLQuery(context.Background(), req)

构建写入数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
	points := make([]horaedb.Point, 0, 2)
	for i := 0; i < 2; i++ {
		point, err := horaedb.NewPointBuilder("demo").
			SetTimestamp(now)).
			AddTag("name", horaedb.NewStringValue("test_tag1")).
			AddField("value", horaedb.NewDoubleValue(0.4242)).
			Build()
		if err != nil {
			panic(err)
		}
		points = append(points, point)
	}

写入数据

1
2
3
4
	req := horaedb.WriteRequest{
		Points: points,
	}
	resp, err := client.Write(context.Background(), req)

查询数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
	querySQL := `SELECT * FROM demo`
	req := horaedb.SQLQueryRequest{
		Tables: []string{"demo"},
		SQL:    querySQL,
	}
	resp, err := client.SQLQuery(context.Background(), req)
	if err != nil {
        panic(err)
	}
	fmt.Printf("query table success, rows:%+v\n", resp.Rows)

示例

你可以在这里找到完整的示例。

2 - Java

介绍

HoraeDBClient 是 HoraeDB 的高性能 Java 版客户端。

环境要求

Java 8 及以上

依赖

1
2
3
4
5
<dependency>
  <groupId>io.ceresdb</groupId>
  <artifactId>ceresdb-all</artifactId>
  <version>${CERESDB.VERSION}</version>
</dependency>

最新的版本可以从这里获取。

初始化客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// CeresDB options
final CeresDBOptions opts = CeresDBOptions.newBuilder("127.0.0.1", 8831, DIRECT) // 默认 gprc 端口号,DIRECT 模式
        .database("public") // Client所使用的database,可被RequestContext的database覆盖
        .writeMaxRetries(1) // 写入失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效)
        .readMaxRetries(1)  // 查询失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效)
        .build();

final CeresDBClient client = new CeresDBClient();
if (!client.init(opts)) {
    throw new IllegalStateException("Fail to start CeresDBClient");
}

客户端初始化至少需要三个参数:

  • EndPoint: 127.0.0.1
  • Port: 8831
  • RouteMode: DIRECT/PROXY

这里重点解释下 RouteMode 参数,PROXY 模式用在客户端和服务端存在网络隔离,请求需要经过转发的场景;DIRECT 模式用在客户端和服务端网络连通的场景,节省转发的开销,具有更高的性能。 更多的参数配置详情见 configuration

注意: HoraeDB 当前仅支持默认的 public database , 未来会支持多个 database。

建表

为了方便使用,在使用 gRPC 的 write 接口进行写入时,如果某个表不存在,HoraeDB 会根据第一次的写入自动创建一个表。

当然你也可以通过 create table 语句来更精细化的管理的表(比如添加索引等)。

下面的建表语句(使用 SDK 的 SQL API)包含了 HoraeDB 支持的所有字段类型:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
String createTableSql = "CREATE TABLE IF NOT EXISTS machine_table(" +
        "ts TIMESTAMP NOT NULL," +
        "city STRING TAG NOT NULL," +
        "ip STRING TAG NOT NULL," +
        "cpu DOUBLE NULL," +
        "mem DOUBLE NULL," +
        "TIMESTAMP KEY(ts)" + // 建表时必须指定时间戳序列
        ") ENGINE=Analytic";

Result<SqlQueryOk, Err> createResult = client.sqlQuery(new SqlQueryRequest(createTableSql)).get();
if (!createResult.isOk()) {
        throw new IllegalStateException("Fail to create table");
}

删表

下面是一个删表的示例:

1
2
3
4
5
6
String dropTableSql = "DROP TABLE machine_table";

Result<SqlQueryOk, Err> dropResult = client.sqlQuery(new SqlQueryRequest(dropTableSql)).get();
if (!dropResult.isOk()) {
        throw new IllegalStateException("Fail to drop table");
}

数据写入

首先我们需要构建数据,示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
List<Point> pointList = new LinkedList<>();
for (int i = 0; i < 100; i++) {
    // 构建单个Point
    final Point point = Point.newPointBuilder("machine_table")
            .setTimestamp(t0)
            .addTag("city", "Singapore")
            .addTag("ip", "10.0.0.1")
            .addField("cpu", Value.withDouble(0.23))
            .addField("mem", Value.withDouble(0.55))
            .build();
    points.add(point);
}

然后使用 write 接口写入数据,示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
final CompletableFuture<Result<WriteOk, Err>> wf = client.write(pointList);
// 这里用 `future.get` 只是方便演示,推荐借助 CompletableFuture 强大的 API 实现异步编程
final Result<WriteOk, Err> writeResult = wf.get();

Assert.assertTrue(writeResult.isOk());
Assert.assertEquals(3, writeResult.getOk().getSuccess());
// `Result` 类参考了 Rust 语言,提供了丰富的 mapXXX、andThen 类 function 方便对结果值进行转换,提高编程效率,欢迎参考 API 文档使用
Assert.assertEquals(3, writeResult.mapOr(0, WriteOk::getSuccess).intValue());
Assert.assertEquals(0, writeResult.getOk().getFailed());
Assert.assertEquals(0, writeResult.mapOr(-1, WriteOk::getFailed).intValue());

详情见 write

数据查询

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final SqlQueryRequest queryRequest = SqlQueryRequest.newBuilder()
        .forTables("machine_table") // 这里表名是可选的,如果未提供,SDK将自动解析SQL填充表名并自动路由
        .sql("select * from machine_table where ts = %d", t0) //
        .build();
final CompletableFuture<Result<SqlQueryOk, Err>> qf = client.sqlQuery(queryRequest);
// 这里用 `future.get` 只是方便演示,推荐借助 CompletableFuture 强大的 API 实现异步编程
final Result<SqlQueryOk, Err> queryResult = qf.get();

Assert.assertTrue(queryResult.isOk());

final SqlQueryOk queryOk = queryResult.getOk();
Assert.assertEquals(1, queryOk.getRowCount());

// 直接获取结果数组
final List<Row> rows = queryOk.getRowList();
Assert.assertEquals(t0, rows.get(0).getColumn("ts").getValue().getTimestamp());
Assert.assertEquals("Singapore", rows.get(0).getColumn("city").getValue().getString());
Assert.assertEquals("10.0.0.1", rows.get(0).getColumn("ip").getValue().getString());
Assert.assertEquals(0.23, rows.get(0).getColumn("cpu").getValue().getDouble(), 0.0000001);
Assert.assertEquals(0.55, rows.get(0).getColumn("mem").getValue().getDouble(), 0.0000001);

// 获取结果流
final Stream<Row> rowStream = queryOk.stream();
rowStream.forEach(row -> System.out.println(row.toString()));

详情见 read

流式读写

HoraeDB 支持流式读写,适用于大规模数据读写。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
long start = System.currentTimeMillis();
long t = start;
final StreamWriteBuf<Point, WriteOk> writeBuf = client.streamWrite("machine_table");
for (int i = 0; i < 1000; i++) {
        final Point streamData = Point.newPointBuilder("machine_table")
                .setTimestamp(t)
                .addTag("city", "Beijing")
                .addTag("ip", "10.0.0.3")
                .addField("cpu", Value.withDouble(0.42))
                .addField("mem", Value.withDouble(0.67))
                .build();
        writeBuf.writeAndFlush(Collections.singletonList(streamData));
        t = t+1;
}
final CompletableFuture<WriteOk> writeOk = writeBuf.completed();
Assert.assertEquals(1000, writeOk.join().getSuccess());

final SqlQueryRequest streamQuerySql = SqlQueryRequest.newBuilder()
        .sql("select * from %s where city = '%s' and ts >= %d and ts < %d", "machine_table", "Beijing", start, t).build();
final Result<SqlQueryOk, Err> streamQueryResult = client.sqlQuery(streamQuerySql).get();
Assert.assertTrue(streamQueryResult.isOk());
Assert.assertEquals(1000, streamQueryResult.getOk().getRowCount());

详情见 streaming

3 - Python

介绍

horaedb-clientHoraeDB python 客户端.

借助于 PyO3,python 客户端的实现实际上是基于 rust 客户端 的封装。

本手册将会介绍 python client 的一些基本用法,其中涉及到的完整示例,可以查看该示例代码.

环境要求

  • Python >= 3.7

安装

1
pip install horaedb-client

你可以在这里找到最新的版本 here.

初始化客户端

首先介绍下如何初始化客户端,代码示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import asyncio
import datetime
from ceresdb_client import Builder, RpcContext, PointBuilder, ValueBuilder, WriteRequest, SqlQueryRequest, Mode, RpcConfig

rpc_config = RpcConfig()
rpc_config = RpcConfig()
rpc_config.thread_num = 1
rpc_config.default_write_timeout_ms = 1000

builder = Builder('127.0.0.1:8831', Mode.Direct)
builder.set_rpc_config(rpc_config)
builder.set_default_database('public')
client = builder.build()

代码的最开始部分是依赖库的导入,在后面的示例中将省略这部分。

客户端初始化需要至少两个参数:

  • Endpoint: 服务端地址,由 ip 和端口组成,例如 127.0.0.1:8831;
  • Mode: 客户端和服务端通信模式,有两种模式可供选择: DirectProxy

这里重点介绍下通信模式 Mode, 当客户端可以访问所有的服务器的时候,建议采用 Direct 模式,以减少转发开销;但是如果客户端访问服务器必须要经过网关,那么只能选择 Proxy 模式。

至于 default_database,会在执行 RPC 请求时未通过 RpcContext 设置 database 的情况下,将被作为目标 database 使用。

最后,通过配置 RpcConfig, 可以管理客户端使用的资源和调整其性能,所有的配置参数可以参考这里.

建表

为了方便使用,在使用 gRPC 的 write 接口进行写入时,如果某个表不存在,HoraeDB 会根据第一次的写入自动创建一个表。

当然你也可以通过 create table 语句来更精细化的管理的表(比如添加索引等)。

初始化客户端后,建表示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def async_query(client, ctx, req):
    await client.sql_query(ctx, req)

create_table_sql = 'CREATE TABLE IF NOT EXISTS demo ( \
    name string TAG, \
    value double, \
    t timestamp NOT NULL, \
    TIMESTAMP KEY(t)) ENGINE=Analytic with (enable_ttl=false)'

req = SqlQueryRequest(['demo'], create_table_sql)
rpc_ctx = RpcContext()
rpc_ctx.database = 'public'
rpc_ctx.timeout_ms = 100

event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(async_query(client, rpc_ctx, req))

RpcContext 可以用来指定目标 database (可以覆盖在初始化的时候设置的 default_space) 和超时参数。

数据写入

可以使用 PointBuilder 来构建一个 point(实际上就是数据集的一行),多个 point 构成一个写入请求。

示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
async def async_write(client, ctx, req):
    return await client.write(ctx, req)

point_builder = PointBuilder('demo')
point_builder.set_timestamp(1000 * int(round(datetime.datetime.now().timestamp())))
point_builder.set_tag("name", ValueBuilder().string("test_tag1"))
point_builder.set_field("value", ValueBuilder().double(0.4242))
point = point_builder.build()

write_request = WriteRequest()
write_request.add_point(point)

event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(async_write(client, ctx, req))

数据查询

通过 sql_query 接口, 可以方便地从服务端查询数据:

req = SqlQueryRequest(['demo'], 'select * from demo')
event_loop = asyncio.get_event_loop()
resp = event_loop.run_until_complete(async_query(client, ctx, req))

如示例所展示, 构建 SqlQueryRequest 需要两个参数:

  • 查询 sql 中涉及到的表;
  • 查询 sql.

当前为了查询的性能,第一个参数是必须的。

查询到数据后,逐行逐列处理数据的示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# Access row by index in the resp.
for row_idx in range(0, resp.num_rows()):
    row_tokens = []
    row = resp.row_by_idx(row_idx)
    for col_idx in range(0, row.num_cols()):
        col = row.column_by_idx(col_idx)
        row_tokens.append(f"{col.name()}:{col.value()}#{col.data_type()}")
    print(f"row#{row_idx}: {','.join(row_tokens)}")

# Access row by iter in the resp.
for row in resp.iter_rows():
    row_tokens = []
    for col in row.iter_columns():
        row_tokens.append(f"{col.name()}:{col.value()}#{col.data_type()}")
    print(f"row: {','.join(row_tokens)}")

删表

和创建表类似,我们可以使用 sql 来删除表:

1
2
3
4
5
6
drop_table_sql = 'DROP TABLE demo'

req = SqlQueryRequest(['demo'], drop_table_sql)

event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(async_query(client, rpc_ctx, req))

4 - Rust

安装

1
cargo add horaedb-client

你可以在这里找到最新的版本 here.

初始化客户端

首先,我们需要初始化客户端。

  • 创建客户端的 builder,你必须设置 endpointmode
    • endpoint 是类似 “ip/domain_name:port” 形式的字符串。
    • mode 用于指定访问 HoraeDB 服务器的方式,关于 mode 的详细信息
1
let mut builder = Builder::new("ip/domain_name:port", Mode::Direct/Mode::Proxy);
  • 创建和设置 rpc_config,可以按需进行定义或者直接使用默认值,更多详细参数请参考这里
1
2
3
4
5
6
let rpc_config = RpcConfig {
    thread_num: Some(1),
    default_write_timeout: Duration::from_millis(1000),
    ..Default::default()
};
let builder = builder.rpc_config(rpc_config);
  • 设置 default_database,这会在执行 RPC 请求时未通过 RpcContext 设置 database 的情况下,将被作为目标 database 使用。
1
    let builder = builder.default_database("public");
  • 最后,我们从 builder 中创建客户端:
1
    let client = builder.build();

管理表

为了方便使用,在使用 gRPC 的 write 接口进行写入时,如果某个表不存在,HoraeDB 会根据第一次的写入自动创建一个表。

当然你也可以通过 create table 语句来更精细化的管理的表(比如添加索引等)。

  • 建表:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
let create_table_sql = r#"CREATE TABLE IF NOT EXISTS horaedb (
            str_tag string TAG,
            int_tag int32 TAG,
            var_tag varbinary TAG,
            str_field string,
            int_field int32,
            bin_field varbinary,
            t timestamp NOT NULL,
            TIMESTAMP KEY(t)) ENGINE=Analytic with
            (enable_ttl='false')"#;
let req = SqlQueryRequest {
    tables: vec!["horaedb".to_string()],
    sql: create_table_sql.to_string(),
};

let resp = client
    .sql_query(rpc_ctx, &req)
    .await
    .expect("Should succeed to create table");
  • 删表:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
let drop_table_sql = "DROP TABLE horaedb";
let req = SqlQueryRequest {
    tables: vec!["horaedb".to_string()],
    sql: drop_table_sql.to_string(),
};

let resp = client
    .sql_query(rpc_ctx, &req)
    .await
    .expect("Should succeed to create table");

写入数据

我们支持使用类似 InfluxDB 的时序数据模型进行写入。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
let test_table = "horaedb";
let ts = Local::now().timestamp_millis();
let point = PointBuilder::new(test_table.to_string())
        .timestamp(ts)
        .tag("str_tag".to_string(), Value::String("tag_val".to_string()))
        .tag("int_tag".to_string(), Value::Int32(42))
        .tag(
            "var_tag".to_string(),
            Value::Varbinary(b"tag_bin_val".to_vec()),
        )
        .field(
            "str_field".to_string(),
            Value::String("field_val".to_string()),
        )
        .field("int_field".to_string(), Value::Int32(42))
        .field(
            "bin_field".to_string(),
            Value::Varbinary(b"field_bin_val".to_vec()),
        )
        .build()
        .unwrap();
  • point 添加到 write request 中:
1
2
let mut write_req = WriteRequest::default();
write_req.add_point(point);
  • 创建 rpc_ctx,同样地可以按需设置或者使用默认值,rpc_ctx 的详细信息请参考这里
1
2
3
4
let rpc_ctx = RpcContext {
    database: Some("public".to_string()),
    ..Default::default()
};
  • 最后,利用客户端写入数据到服务器:
1
2
3
4
5
let rpc_ctx = RpcContext {
    database: Some("public".to_string()),
    ..Default::default()
};
let resp = client.write(rpc_ctx, &write_req).await.expect("Should success to write");

Sql query

我们支持使用 sql 进行数据查询。

  • sql query request 中指定相关的表和 sql 语句:
1
2
3
4
let req = SqlQueryRequest {
    tables: vec![table name 1,...,table name n],
    sql: sql string (e.g. select * from xxx),
};
  • 利用客户端进行查询:
1
let resp = client.sql_query(rpc_ctx, &req).await.expect("Should success to write");

示例

你可以在本项目的仓库中找到完整的例子