1 - Go
安装
go get github.com/apache/incubator-horaedb-client-go
你可以在这里找到最新的版本 here.
如何使用
初始化客户端
1
2
3
| client, err := horaedb.NewClient(endpoint, horaedb.Direct,
horaedb.WithDefaultDatabase("public"), // Client所使用的database
)
|
参数名称 | 说明 |
---|
defaultDatabase | 所使用的 database,可以被单个 Write 或者 SQLRequest 请求中的 database 覆盖 |
RPCMaxRecvMsgSize | grpc MaxCallRecvMsgSize 配置, 默认是 1024 _ 1024 _ 1024 |
RouteMaxCacheSize | 如果 router 客户端中的 路由缓存超过了这个值,将会淘汰最不活跃的直至降低这个阈值, 默认是 10000 |
注意: HoraeDB 当前仅支持预创建的 public
database , 未来会支持多个 database。
管理表
HoraeDB 使用 SQL 来管理表格,比如创建表、删除表或者新增列等等,这和你在使用 SQL 管理其他的数据库时没有太大的区别。
为了方便使用,在使用 gRPC 的 write 接口进行写入时,如果某个表不存在,HoraeDB 会根据第一次的写入自动创建一个表。
当然你也可以通过 create table
语句来更精细化的管理的表(比如添加索引等)。
创建表的样例
1
2
3
4
5
6
7
8
9
10
11
12
13
| createTableSQL := `
CREATE TABLE IF NOT EXISTS demo (
name string TAG,
value double,
t timestamp NOT NULL,
TIMESTAMP KEY(t)
) ENGINE=Analytic with (enable_ttl=false)`
req := horaedb.SQLQueryRequest{
Tables: []string{"demo"},
SQL: createTableSQL,
}
resp, err := client.SQLQuery(context.Background(), req)
|
删除表的样例
1
2
3
4
5
6
| dropTableSQL := `DROP TABLE demo`
req := horaedb.SQLQueryRequest{
Tables: []string{"demo"},
SQL: dropTableSQL,
}
resp, err := client.SQLQuery(context.Background(), req)
|
构建写入数据
1
2
3
4
5
6
7
8
9
10
11
12
| points := make([]horaedb.Point, 0, 2)
for i := 0; i < 2; i++ {
point, err := horaedb.NewPointBuilder("demo").
SetTimestamp(now)).
AddTag("name", horaedb.NewStringValue("test_tag1")).
AddField("value", horaedb.NewDoubleValue(0.4242)).
Build()
if err != nil {
panic(err)
}
points = append(points, point)
}
|
写入数据
1
2
3
4
| req := horaedb.WriteRequest{
Points: points,
}
resp, err := client.Write(context.Background(), req)
|
查询数据
1
2
3
4
5
6
7
8
9
10
| querySQL := `SELECT * FROM demo`
req := horaedb.SQLQueryRequest{
Tables: []string{"demo"},
SQL: querySQL,
}
resp, err := client.SQLQuery(context.Background(), req)
if err != nil {
panic(err)
}
fmt.Printf("query table success, rows:%+v\n", resp.Rows)
|
示例
你可以在这里找到完整的示例。
2 - Java
介绍
HoraeDBClient 是 HoraeDB 的高性能 Java 版客户端。
环境要求
Java 8 及以上
依赖
1
2
3
4
5
| <dependency>
<groupId>io.ceresdb</groupId>
<artifactId>ceresdb-all</artifactId>
<version>${CERESDB.VERSION}</version>
</dependency>
|
最新的版本可以从这里获取。
初始化客户端
1
2
3
4
5
6
7
8
9
10
11
| // CeresDB options
final CeresDBOptions opts = CeresDBOptions.newBuilder("127.0.0.1", 8831, DIRECT) // 默认 gprc 端口号,DIRECT 模式
.database("public") // Client所使用的database,可被RequestContext的database覆盖
.writeMaxRetries(1) // 写入失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效)
.readMaxRetries(1) // 查询失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效)
.build();
final CeresDBClient client = new CeresDBClient();
if (!client.init(opts)) {
throw new IllegalStateException("Fail to start CeresDBClient");
}
|
客户端初始化至少需要三个参数:
- EndPoint: 127.0.0.1
- Port: 8831
- RouteMode: DIRECT/PROXY
这里重点解释下 RouteMode
参数,PROXY
模式用在客户端和服务端存在网络隔离,请求需要经过转发的场景;DIRECT
模式用在客户端和服务端网络连通的场景,节省转发的开销,具有更高的性能。
更多的参数配置详情见 configuration。
注意: HoraeDB 当前仅支持默认的 public
database , 未来会支持多个 database。
建表
为了方便使用,在使用 gRPC 的 write 接口进行写入时,如果某个表不存在,HoraeDB 会根据第一次的写入自动创建一个表。
当然你也可以通过 create table
语句来更精细化的管理的表(比如添加索引等)。
下面的建表语句(使用 SDK 的 SQL API)包含了 HoraeDB 支持的所有字段类型:
1
2
3
4
5
6
7
8
9
10
11
12
13
| String createTableSql = "CREATE TABLE IF NOT EXISTS machine_table(" +
"ts TIMESTAMP NOT NULL," +
"city STRING TAG NOT NULL," +
"ip STRING TAG NOT NULL," +
"cpu DOUBLE NULL," +
"mem DOUBLE NULL," +
"TIMESTAMP KEY(ts)" + // 建表时必须指定时间戳序列
") ENGINE=Analytic";
Result<SqlQueryOk, Err> createResult = client.sqlQuery(new SqlQueryRequest(createTableSql)).get();
if (!createResult.isOk()) {
throw new IllegalStateException("Fail to create table");
}
|
删表
下面是一个删表的示例:
1
2
3
4
5
6
| String dropTableSql = "DROP TABLE machine_table";
Result<SqlQueryOk, Err> dropResult = client.sqlQuery(new SqlQueryRequest(dropTableSql)).get();
if (!dropResult.isOk()) {
throw new IllegalStateException("Fail to drop table");
}
|
数据写入
首先我们需要构建数据,示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
| List<Point> pointList = new LinkedList<>();
for (int i = 0; i < 100; i++) {
// 构建单个Point
final Point point = Point.newPointBuilder("machine_table")
.setTimestamp(t0)
.addTag("city", "Singapore")
.addTag("ip", "10.0.0.1")
.addField("cpu", Value.withDouble(0.23))
.addField("mem", Value.withDouble(0.55))
.build();
points.add(point);
}
|
然后使用 write
接口写入数据,示例如下:
1
2
3
4
5
6
7
8
9
10
| final CompletableFuture<Result<WriteOk, Err>> wf = client.write(pointList);
// 这里用 `future.get` 只是方便演示,推荐借助 CompletableFuture 强大的 API 实现异步编程
final Result<WriteOk, Err> writeResult = wf.get();
Assert.assertTrue(writeResult.isOk());
Assert.assertEquals(3, writeResult.getOk().getSuccess());
// `Result` 类参考了 Rust 语言,提供了丰富的 mapXXX、andThen 类 function 方便对结果值进行转换,提高编程效率,欢迎参考 API 文档使用
Assert.assertEquals(3, writeResult.mapOr(0, WriteOk::getSuccess).intValue());
Assert.assertEquals(0, writeResult.getOk().getFailed());
Assert.assertEquals(0, writeResult.mapOr(-1, WriteOk::getFailed).intValue());
|
详情见 write
数据查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| final SqlQueryRequest queryRequest = SqlQueryRequest.newBuilder()
.forTables("machine_table") // 这里表名是可选的,如果未提供,SDK将自动解析SQL填充表名并自动路由
.sql("select * from machine_table where ts = %d", t0) //
.build();
final CompletableFuture<Result<SqlQueryOk, Err>> qf = client.sqlQuery(queryRequest);
// 这里用 `future.get` 只是方便演示,推荐借助 CompletableFuture 强大的 API 实现异步编程
final Result<SqlQueryOk, Err> queryResult = qf.get();
Assert.assertTrue(queryResult.isOk());
final SqlQueryOk queryOk = queryResult.getOk();
Assert.assertEquals(1, queryOk.getRowCount());
// 直接获取结果数组
final List<Row> rows = queryOk.getRowList();
Assert.assertEquals(t0, rows.get(0).getColumn("ts").getValue().getTimestamp());
Assert.assertEquals("Singapore", rows.get(0).getColumn("city").getValue().getString());
Assert.assertEquals("10.0.0.1", rows.get(0).getColumn("ip").getValue().getString());
Assert.assertEquals(0.23, rows.get(0).getColumn("cpu").getValue().getDouble(), 0.0000001);
Assert.assertEquals(0.55, rows.get(0).getColumn("mem").getValue().getDouble(), 0.0000001);
// 获取结果流
final Stream<Row> rowStream = queryOk.stream();
rowStream.forEach(row -> System.out.println(row.toString()));
|
详情见 read
流式读写
HoraeDB 支持流式读写,适用于大规模数据读写。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| long start = System.currentTimeMillis();
long t = start;
final StreamWriteBuf<Point, WriteOk> writeBuf = client.streamWrite("machine_table");
for (int i = 0; i < 1000; i++) {
final Point streamData = Point.newPointBuilder("machine_table")
.setTimestamp(t)
.addTag("city", "Beijing")
.addTag("ip", "10.0.0.3")
.addField("cpu", Value.withDouble(0.42))
.addField("mem", Value.withDouble(0.67))
.build();
writeBuf.writeAndFlush(Collections.singletonList(streamData));
t = t+1;
}
final CompletableFuture<WriteOk> writeOk = writeBuf.completed();
Assert.assertEquals(1000, writeOk.join().getSuccess());
final SqlQueryRequest streamQuerySql = SqlQueryRequest.newBuilder()
.sql("select * from %s where city = '%s' and ts >= %d and ts < %d", "machine_table", "Beijing", start, t).build();
final Result<SqlQueryOk, Err> streamQueryResult = client.sqlQuery(streamQuerySql).get();
Assert.assertTrue(streamQueryResult.isOk());
Assert.assertEquals(1000, streamQueryResult.getOk().getRowCount());
|
详情见 streaming
3 - Python
介绍
horaedb-client 是 HoraeDB python 客户端.
借助于 PyO3,python 客户端的实现实际上是基于 rust 客户端 的封装。
本手册将会介绍 python client 的一些基本用法,其中涉及到的完整示例,可以查看该示例代码.
环境要求
安装
1
| pip install horaedb-client
|
你可以在这里找到最新的版本 here.
初始化客户端
首先介绍下如何初始化客户端,代码示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
| import asyncio
import datetime
from ceresdb_client import Builder, RpcContext, PointBuilder, ValueBuilder, WriteRequest, SqlQueryRequest, Mode, RpcConfig
rpc_config = RpcConfig()
rpc_config = RpcConfig()
rpc_config.thread_num = 1
rpc_config.default_write_timeout_ms = 1000
builder = Builder('127.0.0.1:8831', Mode.Direct)
builder.set_rpc_config(rpc_config)
builder.set_default_database('public')
client = builder.build()
|
代码的最开始部分是依赖库的导入,在后面的示例中将省略这部分。
客户端初始化需要至少两个参数:
Endpoint
: 服务端地址,由 ip 和端口组成,例如 127.0.0.1:8831
;Mode
: 客户端和服务端通信模式,有两种模式可供选择: Direct
和 Proxy
。
这里重点介绍下通信模式 Mode
, 当客户端可以访问所有的服务器的时候,建议采用 Direct
模式,以减少转发开销;但是如果客户端访问服务器必须要经过网关,那么只能选择 Proxy
模式。
至于 default_database
,会在执行 RPC 请求时未通过 RpcContext
设置 database 的情况下,将被作为目标 database 使用。
最后,通过配置 RpcConfig
, 可以管理客户端使用的资源和调整其性能,所有的配置参数可以参考这里.
建表
为了方便使用,在使用 gRPC 的 write 接口进行写入时,如果某个表不存在,HoraeDB 会根据第一次的写入自动创建一个表。
当然你也可以通过 create table
语句来更精细化的管理的表(比如添加索引等)。
初始化客户端后,建表示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| def async_query(client, ctx, req):
await client.sql_query(ctx, req)
create_table_sql = 'CREATE TABLE IF NOT EXISTS demo ( \
name string TAG, \
value double, \
t timestamp NOT NULL, \
TIMESTAMP KEY(t)) ENGINE=Analytic with (enable_ttl=false)'
req = SqlQueryRequest(['demo'], create_table_sql)
rpc_ctx = RpcContext()
rpc_ctx.database = 'public'
rpc_ctx.timeout_ms = 100
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(async_query(client, rpc_ctx, req))
|
RpcContext
可以用来指定目标 database (可以覆盖在初始化的时候设置的 default_space) 和超时参数。
数据写入
可以使用 PointBuilder
来构建一个 point(实际上就是数据集的一行),多个 point 构成一个写入请求。
示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| async def async_write(client, ctx, req):
return await client.write(ctx, req)
point_builder = PointBuilder('demo')
point_builder.set_timestamp(1000 * int(round(datetime.datetime.now().timestamp())))
point_builder.set_tag("name", ValueBuilder().string("test_tag1"))
point_builder.set_field("value", ValueBuilder().double(0.4242))
point = point_builder.build()
write_request = WriteRequest()
write_request.add_point(point)
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(async_write(client, ctx, req))
|
数据查询
通过 sql_query
接口, 可以方便地从服务端查询数据:
req = SqlQueryRequest(['demo'], 'select * from demo')
event_loop = asyncio.get_event_loop()
resp = event_loop.run_until_complete(async_query(client, ctx, req))
如示例所展示, 构建 SqlQueryRequest
需要两个参数:
当前为了查询的性能,第一个参数是必须的。
查询到数据后,逐行逐列处理数据的示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| # Access row by index in the resp.
for row_idx in range(0, resp.num_rows()):
row_tokens = []
row = resp.row_by_idx(row_idx)
for col_idx in range(0, row.num_cols()):
col = row.column_by_idx(col_idx)
row_tokens.append(f"{col.name()}:{col.value()}#{col.data_type()}")
print(f"row#{row_idx}: {','.join(row_tokens)}")
# Access row by iter in the resp.
for row in resp.iter_rows():
row_tokens = []
for col in row.iter_columns():
row_tokens.append(f"{col.name()}:{col.value()}#{col.data_type()}")
print(f"row: {','.join(row_tokens)}")
|
删表
和创建表类似,我们可以使用 sql 来删除表:
1
2
3
4
5
6
| drop_table_sql = 'DROP TABLE demo'
req = SqlQueryRequest(['demo'], drop_table_sql)
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(async_query(client, rpc_ctx, req))
|
4 - Rust
安装
1
| cargo add horaedb-client
|
你可以在这里找到最新的版本 here.
初始化客户端
首先,我们需要初始化客户端。
- 创建客户端的 builder,你必须设置
endpoint
和 mode
:endpoint
是类似 “ip/domain_name:port” 形式的字符串。mode
用于指定访问 HoraeDB 服务器的方式,关于 mode 的详细信息。
1
| let mut builder = Builder::new("ip/domain_name:port", Mode::Direct/Mode::Proxy);
|
- 创建和设置
rpc_config
,可以按需进行定义或者直接使用默认值,更多详细参数请参考这里:
1
2
3
4
5
6
| let rpc_config = RpcConfig {
thread_num: Some(1),
default_write_timeout: Duration::from_millis(1000),
..Default::default()
};
let builder = builder.rpc_config(rpc_config);
|
- 设置
default_database
,这会在执行 RPC 请求时未通过 RpcContext 设置 database 的情况下,将被作为目标 database 使用。
1
| let builder = builder.default_database("public");
|
1
| let client = builder.build();
|
管理表
为了方便使用,在使用 gRPC 的 write 接口进行写入时,如果某个表不存在,HoraeDB 会根据第一次的写入自动创建一个表。
当然你也可以通过 create table
语句来更精细化的管理的表(比如添加索引等)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| let create_table_sql = r#"CREATE TABLE IF NOT EXISTS horaedb (
str_tag string TAG,
int_tag int32 TAG,
var_tag varbinary TAG,
str_field string,
int_field int32,
bin_field varbinary,
t timestamp NOT NULL,
TIMESTAMP KEY(t)) ENGINE=Analytic with
(enable_ttl='false')"#;
let req = SqlQueryRequest {
tables: vec!["horaedb".to_string()],
sql: create_table_sql.to_string(),
};
let resp = client
.sql_query(rpc_ctx, &req)
.await
.expect("Should succeed to create table");
|
1
2
3
4
5
6
7
8
9
10
| let drop_table_sql = "DROP TABLE horaedb";
let req = SqlQueryRequest {
tables: vec!["horaedb".to_string()],
sql: drop_table_sql.to_string(),
};
let resp = client
.sql_query(rpc_ctx, &req)
.await
.expect("Should succeed to create table");
|
写入数据
我们支持使用类似 InfluxDB 的时序数据模型进行写入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| let test_table = "horaedb";
let ts = Local::now().timestamp_millis();
let point = PointBuilder::new(test_table.to_string())
.timestamp(ts)
.tag("str_tag".to_string(), Value::String("tag_val".to_string()))
.tag("int_tag".to_string(), Value::Int32(42))
.tag(
"var_tag".to_string(),
Value::Varbinary(b"tag_bin_val".to_vec()),
)
.field(
"str_field".to_string(),
Value::String("field_val".to_string()),
)
.field("int_field".to_string(), Value::Int32(42))
.field(
"bin_field".to_string(),
Value::Varbinary(b"field_bin_val".to_vec()),
)
.build()
.unwrap();
|
- 将
point
添加到 write request
中:
1
2
| let mut write_req = WriteRequest::default();
write_req.add_point(point);
|
- 创建
rpc_ctx
,同样地可以按需设置或者使用默认值,rpc_ctx
的详细信息请参考这里:
1
2
3
4
| let rpc_ctx = RpcContext {
database: Some("public".to_string()),
..Default::default()
};
|
1
2
3
4
5
| let rpc_ctx = RpcContext {
database: Some("public".to_string()),
..Default::default()
};
let resp = client.write(rpc_ctx, &write_req).await.expect("Should success to write");
|
Sql query
我们支持使用 sql 进行数据查询。
- 在
sql query request
中指定相关的表和 sql 语句:
1
2
3
4
| let req = SqlQueryRequest {
tables: vec![table name 1,...,table name n],
sql: sql string (e.g. select * from xxx),
};
|
1
| let resp = client.sql_query(rpc_ctx, &req).await.expect("Should success to write");
|
示例
你可以在本项目的仓库中找到完整的例子。