using database, database can be overwritten by ReqContext in single Write or SQLRequest
RPCMaxRecvMsgSize
configration for grpc MaxCallRecvMsgSize, default 1024 _ 1024 _ 1024
RouteMaxCacheSize
If the maximum number of router cache size, router client whill evict oldest if exceeded, default is 10000
Notice:
HoraeDB currently only supports the default database public now, multiple databases will be supported in the future
Manage Table
HoraeDB uses SQL to manage tables, such as creating tables, deleting tables, or adding columns, etc., which is not much different from when you use SQL to manage other databases.
For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.
Of course, you can also use create table statement to manage the table more finely (such as adding indexes).
Example for creating table
1
2
3
4
5
6
7
8
9
10
11
12
13
createTableSQL :=`
CREATE TABLE IF NOT EXISTS demo (
name string TAG,
value double,
t timestamp NOT NULL,
TIMESTAMP KEY(t)
) ENGINE=Analytic with (enable_ttl=false)` req := horaedb.SQLQueryRequest{
Tables: []string{"demo"},
SQL: createTableSQL,
}
resp, err := client.SQLQuery(context.Background(), req)
finalCeresDBOptionsopts=CeresDBOptions.newBuilder("127.0.0.1",8831,DIRECT)// CeresDB default grpc port 8831,use DIRECT RouteMode.database("public")// use database for client, can be overridden by the RequestContext in request// maximum retry times when write fails// (only some error codes will be retried, such as the routing table failure).writeMaxRetries(1)// maximum retry times when read fails// (only some error codes will be retried, such as the routing table failure).readMaxRetries(1).build();finalCeresDBClientclient=newCeresDBClient();if(!client.init(opts)){thrownewIllegalStateException("Fail to start CeresDBClient");}
The initialization requires at least three parameters:
Endpoint: 127.0.0.1
Port: 8831
RouteMode: DIRECT/PROXY
Here is the explanation of RouteMode. There are two kinds of RouteMode,The Direct mode should be adopted to avoid forwarding overhead if all the servers are accessible to the client.
However, the Proxy mode is the only choice if the access to the servers from the client must go through a gateway.
For more configuration options, see configuration
Notice: HoraeDB currently only supports the default database public now, multiple databases will be supported in the future;
Create Table Example
For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.
Of course, you can also use create table statement to manage the table more finely (such as adding indexes).
The following table creation statement(using the SQL API included in SDK )shows all field types supported by HoraeDB:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Create table manually, creating table schema ahead of data ingestion is not requiredStringcreateTableSql="CREATE TABLE IF NOT EXISTS machine_table("+"ts TIMESTAMP NOT NULL,"+"city STRING TAG NOT NULL,"+"ip STRING TAG NOT NULL,"+"cpu DOUBLE NULL,"+"mem DOUBLE NULL,"+"TIMESTAMP KEY(ts)"+// timestamp column must be specified") ENGINE=Analytic";Result<SqlQueryOk,Err>createResult=client.sqlQuery(newSqlQueryRequest(createTableSql)).get();if(!createResult.isOk()){thrownewIllegalStateException("Fail to create table");}
Drop Table Example
Here is an example of dropping table:
1
2
3
4
5
6
StringdropTableSql="DROP TABLE machine_table";Result<SqlQueryOk,Err>dropResult=client.sqlQuery(newSqlQueryRequest(dropTableSql)).get();if(!dropResult.isOk()){thrownewIllegalStateException("Fail to drop table");}
Write Data Example
Firstly, you can use PointBuilder to build HoraeDB points:
1
2
3
4
5
6
7
8
9
10
11
12
List<Point>pointList=newLinkedList<>();for(inti=0;i<100;i++){// build one pointfinalPointpoint=Point.newPointBuilder("machine_table").setTimestamp(t0).addTag("city","Singapore").addTag("ip","10.0.0.1").addField("cpu",Value.withDouble(0.23)).addField("mem",Value.withDouble(0.55)).build();points.add(point);}
Then, you can use write interface to write data:
1
2
3
4
5
6
7
8
finalCompletableFuture<Result<WriteOk,Err>>wf=client.write(newWriteRequest(pointList));// here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture APIfinalResult<WriteOk,Err>writeResult=wf.get();Assert.assertTrue(writeResult.isOk());// `Result` class referenced the Rust language practice, provides rich functions (such as mapXXX, andThen) transforming the result value to improve programming efficiency. You can refer to the API docs for detail usage.Assert.assertEquals(3,writeResult.getOk().getSuccess());Assert.assertEquals(3,writeResult.mapOr(0,WriteOk::getSuccess).intValue());Assert.assertEquals(0,writeResult.mapOr(-1,WriteOk::getFailed).intValue());
finalSqlQueryRequestqueryRequest=SqlQueryRequest.newBuilder().forTables("machine_table")// table name is optional. If not provided, SQL parser will parse the `ssql` to get the table name and do the routing automaticly.sql("select * from machine_table where ts = %d",t0)//.build();finalCompletableFuture<Result<SqlQueryOk,Err>>qf=client.sqlQuery(queryRequest);// here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture APIfinalResult<SqlQueryOk,Err>queryResult=qf.get();Assert.assertTrue(queryResult.isOk());finalSqlQueryOkqueryOk=queryResult.getOk();Assert.assertEquals(1,queryOk.getRowCount());// get rows as listfinalList<Row>rows=queryOk.getRowList();Assert.assertEquals(t0,rows.get(0).getColumn("ts").getValue().getTimestamp());Assert.assertEquals("Singapore",rows.get(0).getColumn("city").getValue().getString());Assert.assertEquals("10.0.0.1",rows.get(0).getColumn("ip").getValue().getString());Assert.assertEquals(0.23,rows.get(0).getColumn("cpu").getValue().getDouble(),0.0000001);Assert.assertEquals(0.55,rows.get(0).getColumn("mem").getValue().getDouble(),0.0000001);// get rows as streamfinalStream<Row>rowStream=queryOk.stream();rowStream.forEach(row->System.out.println(row.toString()));
longstart=System.currentTimeMillis();longt=start;finalStreamWriteBuf<Point,WriteOk>writeBuf=client.streamWrite("machine_table");for(inti=0;i<1000;i++){finalPointstreamData=Point.newPointBuilder("machine_table").setTimestamp(t).addTag("city","Beijing").addTag("ip","10.0.0.3").addField("cpu",Value.withDouble(0.42)).addField("mem",Value.withDouble(0.67)).build();writeBuf.writeAndFlush(Collections.singletonList(streamData));t=t+1;}finalCompletableFuture<WriteOk>writeOk=writeBuf.completed();Assert.assertEquals(1000,writeOk.join().getSuccess());finalSqlQueryRequeststreamQuerySql=SqlQueryRequest.newBuilder().sql("select * from %s where city = '%s' and ts >= %d and ts < %d","machine_table","Beijing",start,t).build();finalResult<SqlQueryOk,Err>streamQueryResult=client.sqlQuery(streamQuerySql).get();Assert.assertTrue(streamQueryResult.isOk());Assert.assertEquals(1000,streamQueryResult.getOk().getRowCount());
Firstly, it’s worth noting that the imported packages are used across all the code snippets in this guide, and they will not be repeated in the following.
The initialization requires at least two parameters:
Endpoint: the server endpoint consisting of ip address and serving port, e.g. 127.0.0.1:8831;
Mode: The mode of the communication between client and server, and there are two kinds of Mode: Direct and Proxy.
Endpoint is simple, while Mode deserves more explanation. The Direct mode should be adopted to avoid forwarding overhead if all the servers are accessible to the client. However, the Proxy mode is the only choice if the access to the servers from the client must go through a gateway.
The default_database can be set and will be used if following rpc calling without setting the database in the RpcContext.
By configuring the RpcConfig, resource and performance of the client can be manipulated, and all of the configurations can be referred at here.
Create Table
For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.
Of course, you can also use create table statement to manage the table more finely (such as adding indexes).
Here is a example for creating table by the initialized client:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
asyncdefasync_query(client, ctx, req):
await client.sql_query(ctx, req)
create_table_sql ='CREATE TABLE IF NOT EXISTS demo ( \
name string TAG, \
value double, \
t timestamp NOT NULL, \
TIMESTAMP KEY(t)) ENGINE=Analytic with (enable_ttl=false)'req = SqlQueryRequest(['demo'], create_table_sql)
rpc_ctx = RpcContext()
rpc_ctx.database ='public'rpc_ctx.timeout_ms =100event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(async_query(client, rpc_ctx, req))
RpcContext can be used to overwrite the default database and timeout defined in the initialization of the client.
Write Data
PointBuilder can be used to construct a point, which is actually a row in data set. The write request consists of multiple points.
As the example shows, two parameters are needed to construct the SqlQueryRequest:
The tables involved by this sql query;
The query sql.
Currently, the first parameter is necessary for performance on routing.
With retrieved data, we can process it row by row and column by column:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Access row by index in the resp.for row_idx inrange(0, resp.num_rows()):
row_tokens = []
row = resp.row_by_idx(row_idx)
for col_idx inrange(0, row.num_cols()):
col = row.column_by_idx(col_idx)
row_tokens.append(f"{col.name()}:{col.value()}#{col.data_type()}")
print(f"row#{row_idx}: {','.join(row_tokens)}")
# Access row by iter in the resp.for row in resp.iter_rows():
row_tokens = []
for col in row.iter_columns():
row_tokens.append(f"{col.name()}:{col.value()}#{col.data_type()}")
print(f"row: {','.join(row_tokens)}")
Drop Table
Finally, we can drop the table by the sql api, which is similar to the table creation:
Set default_database, it will be used if following rpc calling without setting the database in the RpcContext(will be introduced in later):
1
letbuilder=builder.default_database("public");
Finally, we build client from builder:
1
letclient=builder.build();
Manage Table
For ease of use, when using gRPC’s write interface for writing, if a table does not exist, HoraeDB will automatically create a table based on the first write.
Of course, you can also use create table statement to manage the table more finely (such as adding indexes).
You can use the sql query interface to create or drop table, related setting will be introduced in sql query section.
Create table:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
letcreate_table_sql=r#"CREATE TABLE IF NOT EXISTS horaedb (
str_tag string TAG,
int_tag int32 TAG,
var_tag varbinary TAG,
str_field string,
int_field int32,
bin_field varbinary,
t timestamp NOT NULL,
TIMESTAMP KEY(t)) ENGINE=Analytic with
(enable_ttl='false')"#;letreq=SqlQueryRequest{tables: vec!["horaedb".to_string()],sql: create_table_sql.to_string(),};letresp=client.sql_query(rpc_ctx,&req).await.expect("Should succeed to create table");
Drop table:
1
2
3
4
5
6
7
8
9
10
letdrop_table_sql="DROP TABLE horaedb";letreq=SqlQueryRequest{tables: vec!["horaedb".to_string()],sql: drop_table_sql.to_string(),};letresp=client.sql_query(rpc_ctx,&req).await.expect("Should succeed to create table");
Write
We support to write with the time series data model like InfluxDB.
Build the point first by PointBuilder, the related data structure of tag value and field value in it is defined as Value, detail about Value:
New rpc_ctx, and it can also be defined on demand or just use the default value, detail about rpc ctx:
Finally, write to server by client.
1
2
3
4
5
letrpc_ctx=RpcContext{database: Some("public".to_string()),..Default::default()};letresp=client.write(rpc_ctx,&write_req).await.expect("Should success to write");
Sql Query
We support to query data with sql.
Define related tables and sql in sql query request: