原文链接:https://calcite.apache.org/docs/tutorial.html

这是一个分步骤教程,它展示了如何构建和连接 Calcite。它使用一个简单的适配器,使得 CSV 文件目录看起来像是一个包含表的模式。Calcite 则完成了剩余工作,并提供了一个完整的 SQL 接口。

calcite-example-csv 是一个功能齐全的 Calcite 适配器,它可以读取 CSV 格式的文本文件。同时值得注意的是,几百行 Java 代码就足以提供完整的 SQL 查询功能。

CSV 也可以作为构建其他数据格式适配器的模板。尽管代码行数不多,但它涵盖了几个重要的概念:

  • 使用 SchemaFactorySchema 接口的用户自定义模式;
  • 在 JSON 格式的模型文件中声明模式;
  • 在 JSON 格式的模型文件中声明视图;
  • 使用 Table 接口的用户自定义表;
  • 确定表的记录类型;
  • Table 的简单实现——使用 ScannableTable 接口,直接枚举所有行;
  • 更高级的实现——实现 FilterableTable,可以根据简单的谓词过滤掉行;
  • Table 的高级实现——使用 TranslatableTable 的规划器规则转换为关系运算符;

下载和构建

你需要 Java(版本 8、9 或 10)和 Git

1
2
3
$ git clone https://github.com/apache/calcite.git
$ cd calcite/example/csv
$ ./sqlline

首次查询

现在让我们使用 sqlline 连接到 Calcite,sqlline 是一个包含在 Calcite 项目中的 SQL shell 功能。

1
2
$ ./sqlline
sqlline> !connect jdbc:calcite:model=src/test/resources/model.json admin admin

如果你运行的是 Windows,则命令为 sqlline.bat

执行一个元数据查询:

1
2
3
4
5
6
7
8
9
10
sqlline> !tables
+------------+--------------+-------------+---------------+----------+------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE |
+------------+--------------+-------------+---------------+----------+------+
| null | SALES | DEPTS | TABLE | null | null |
| null | SALES | EMPS | TABLE | null | null |
| null | SALES | HOBBIES | TABLE | null | null |
| null | metadata | COLUMNS | SYSTEM_TABLE | null | null |
| null | metadata | TABLES | SYSTEM_TABLE | null | null |
+------------+--------------+-------------+---------------+----------+------+

JDBC 专家们注意:sqlline 的 !tables 命令只是在背后执行了 DatabaseMetaData.getTables() 方法。它也提供了其他命令,可以用来查询 JDBC 元数据,例如 !columns!describe

正如你看见的,系统中有 5 张表: EMPSDEPTSHOBBIES 表在当前 SALES 模式中,COLUMNSTABLES 表在系统 metadata 模式中。系统表始终存在于 Calcite 中,而其他表则由模式的具体实现提供。在这个场景下,EMPSDEPTS 表是基于 resources/sales 目录下的 EMPS.csvDEPTS.csv 文件。

让我们对这些表执行一些查询,来展示 Calcite 提供的 SQL 完整实现。首先,进行表扫描:

1
2
3
4
5
6
7
8
9
10
sqlline> SELECT * FROM emps;
+--------+--------+---------+---------+----------------+--------+-------+---+
| EMPNO | NAME | DEPTNO | GENDER | CITY | EMPID | AGE | S |
+--------+--------+---------+---------+----------------+--------+-------+---+
| 100 | Fred | 10 | | | 30 | 25 | t |
| 110 | Eric | 20 | M | San Francisco | 3 | 80 | n |
| 110 | John | 40 | M | Vancouver | 2 | null | f |
| 120 | Wilma | 20 | F | | 1 | 5 | n |
| 130 | Alice | 40 | F | Vancouver | 2 | null | f |
+--------+--------+---------+---------+----------------+--------+-------+---+

再进行关联和分组查询:

1
2
3
4
5
6
7
8
9
sqlline> SELECT d.name, COUNT(*)
. . . .> FROM emps AS e JOIN depts AS d ON e.deptno = d.deptno
. . . .> GROUP BY d.name;
+------------+---------+
| NAME | EXPR$1 |
+------------+---------+
| Sales | 1 |
| Marketing | 2 |
+------------+---------+

最后,VALUES 运算符会返回一个单行,这是测试表达式和 SQL 内置函数的快捷方法:

1
2
3
4
5
6
sqlline> VALUES CHAR_LENGTH('Hello, ' || 'world!');
+---------+
| EXPR$0 |
+---------+
| 13 |
+---------+

Calcite 有许多其他 SQL 特性。我们没有时间在这里介绍它们。你可以再写一些查询来进行实验。

模式发现

那么,Calcite 是如何发现这些表的呢?记住,Calcite 内核对 CSV 文件一无所知(作为一个没有存储层的数据库,Calcite 不了解任何文件格式)。Calcite 知道这些表,完全是因为我们告诉它去执行 calcite-example-csv 项目中的代码。

发现过程包含了几个步骤。首先,我们基于模型文件中的模式工厂类定义了一个模式。然后,模式工厂创建了一个模式,并且这个模式创建一些表,每个表都知道通过扫描 CSV 文件来获取数据。最后,在 Calcite 解析完查询并生成使用这些表的执行计划后,Calcite 会在执行查询时,调用这些表来读取数据。现在让我们更详细地了解这些步骤。

在 JDBC 连接字符串上,我们以 JSON 格式给出了模型的路径。下面是模型的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"version": "1.0",
"defaultSchema": "SALES",
"schemas": [
{
"name": "SALES",
"type": "custom",
"factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
"operand": {
"directory": "sales"
}
}
]
}

模型定义了一个名为 SALES 的单模式。这个模式由插件类 org.apache.calcite.adapter.csv.CsvSchemaFactory 提供支持,它是 calcite-example-csv 项目的一部分,并实现了 Calcite SchemaFactory 接口。它的 create 方法,通过从模型文件中传入的 directory 参数,实例化了模式:

1
2
3
4
5
6
7
8
9
10
11
public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
String directory = (String) operand.get("directory");
String flavorName = (String) operand.get("flavor");
CsvTable.Flavor flavor;
if (flavorName == null) {
flavor = CsvTable.Flavor.SCANNABLE;
} else {
flavor = CsvTable.Flavor.valueOf(flavorName.toUpperCase());
}
return new CsvSchema(new File(directory), flavor);
}

在模型的驱动下,模式工厂实例化了一个名为 SALES 的单模式。这个模式是 org.apache.calcite.adapter.csv.CsvSchema 的一个实例, 并实现了 Calcite Schema 接口。

模式的一项工作是生成一系列的表(它还可以生成子模式和表函数,但这些是高级功能,calcite-example-csv 不支持它们)。这些表实现了 Calcite Table 接口。CsvSchema 生成的表是 CsvTable 及其子类的实例。

下面是 CsvSchema 的相关代码,它重写了 AbstractSchema 基类中的 getTableMap() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
protected Map<String, Table> getTableMap() {
// Look for files in the directory ending in ".csv", ".csv.gz", ".json", ".json.gz".
File[] files = directoryFile.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
final String nameSansGz = trim(name, ".gz");
return nameSansGz.endsWith(".csv") || nameSansGz.endsWith(".json");
}
});
if (files == null) {
System.out.println("directory " + directoryFile + " not found");
files = new File[0];
}
// Build a map from table name to table; each file becomes a table.
final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
for (File file : files) {
String tableName = trim(file.getName(), ".gz");
final String tableNameSansJson = trimOrNull(tableName, ".json");
if (tableNameSansJson != null) {
JsonTable table = new JsonTable(file);
builder.put(tableNameSansJson, table);
continue;
}
tableName = trim(tableName, ".csv");
final Table table = createTable(file);
builder.put(tableName, table);
}
return builder.build();
}

/**
* Creates different sub-type of table based on the "flavor" attribute.
*/
private Table createTable(File file) {
switch (flavor) {
case TRANSLATABLE:
return new CsvTranslatableTable(file, null);
case SCANNABLE:
return new CsvScannableTable(file, null);
case FILTERABLE:
return new CsvFilterableTable(file, null);
default:
throw new AssertionError("Unknown flavor " + flavor);
}
}

这个模式扫描目录并查找所有名称以 .csv 结尾的文件,并为它们创建表。在这种场景下,目录是 sales ,目录下包含了文件 EMPS.csvDEPTS.csv,这些文件对应表 EMPSDEPTS

模式中的表和视图

注意,我们不需要在模型中定义任何表,模式自动生成了这些表。除了这些自动创建的表之外,你还可以使用模式中的 tables 属性,定义额外的表。让我们看看,如何创建一个重要且有用的表类型,即视图。

当你在写一个查询时,视图看起来就像一个表,但它不存储数据。它通过执行查询获取结果。在查询语句被计划执行时,视图将会被展开,因此查询优化器通常可以执行优化,例如,删除那些在最终结果中未使用的 SELECT 子句表达式。

下面是一个定义视图的模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"version": "1.0",
"defaultSchema": "SALES",
"schemas": [
{
"name": "SALES",
"type": "custom",
"factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
"operand": {
"directory": "sales"
},
"tables": [
{
"name": "FEMALE_EMPS",
"type": "view",
"sql": "SELECT * FROM emps WHERE gender = 'F'"
}
]
}
]
}

"type": "view" 这行将 FEMALE_EMPS 标记为视图,而不是常规表或自定义表。JSON 并不能简单地书写长字符串,因此 Calcite 支持另一种可选的语法。如果你的视图有很长的 SQL 语句,你可以将单个字符串改为多行列表:

1
2
3
4
5
6
7
8
{
"name": "FEMALE_EMPS",
"type": "view",
"sql": [
"SELECT * FROM emps",
"WHERE gender = 'F'"
]
}

现在,我们已经定义了一个视图,我们可以像使用表一样,在查询中使用它:

1
2
3
4
5
6
sqlline> SELECT e.name, d.name FROM female_emps AS e JOIN depts AS d on e.deptno = d.deptno;
+--------+------------+
| NAME | NAME |
+--------+------------+
| Wilma | Marketing |
+--------+------------+

自定义表

自定义表是那些由用户自定义的代码驱动的表。他们不需要存在于自定义模式中。

model-with-custom-table.json 模型文件中,有一个自定义表的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"version": "1.0",
"defaultSchema": "CUSTOM_TABLE",
"schemas": [
{
"name": "CUSTOM_TABLE",
"tables": [
{
"name": "EMPS",
"type": "custom",
"factory": "org.apache.calcite.adapter.csv.CsvTableFactory",
"operand": {
"file": "sales/EMPS.csv.gz",
"flavor": "scannable"
}
}
]
}
]
}

我们可以使用常规的方式查询自定义表:

1
2
3
4
5
6
7
8
9
10
11
sqlline> !connect jdbc:calcite:model=src/test/resources/model-with-custom-table.json admin admin
sqlline> SELECT empno, name FROM custom_table.emps;
+--------+--------+
| EMPNO | NAME |
+--------+--------+
| 100 | Fred |
| 110 | Eric |
| 110 | John |
| 120 | Wilma |
| 130 | Alice |
+--------+--------+

这个模式是一个常规模式,包含了一个由 org.apache.calcite.adapter.csv.CsvTableFactory 提供支持的自定义表,它实现了 Calcite TableFactory 接口。它的 create 方法,根据从模型文件中传入的 file 参数,实例化了 CsvScannableTable

1
2
3
4
5
6
public CsvTable create(SchemaPlus schema, String name, Map<String, Object> map, RelDataType rowType) {
String fileName = (String) map.get("file");
final File file = new File(fileName);
final RelProtoDataType protoRowType = rowType != null ? RelDataTypeImpl.proto(rowType) : null;
return new CsvScannableTable(file, protoRowType);
}

实现自定义表,通常是实现自定义模式的一个更简单方法。这两种方法可能最终都会创建类似的 Table 接口实现,但对于自定义表,你不需要实现元数据发现。CsvTableFactory 创建一个 CsvScannableTable,就像 CsvSchema 所做的那样,但表的实现不会扫描文件系统来查找 .csv 文件。

自定义表需要模型的开发者做更多的工作,需要明确指定每个表及其文件,但也给开发者提供了更多的控制权,例如,为每个表提供不同的参数。

模型中的注释

模型可以使用 /* ... */// 语法来包含注释:

1
2
3
4
5
6
7
8
9
10
{
"version":"1.0",
/* Multi-line
comment. */
"defaultSchema":"CUSTOM_TABLE",
// Single-line comment.
"schemas":[
..
]
}

注释不是标准的 JSON,而是一种无害的扩展。

使用优化器规则优化查询

到目前为止,我们看到的表实现都是可以接受的,只要表不包含大量数据。但是,如果你的客户的表有一百列以及一百万行,你肯定更愿意看到系统在每个查询时,不要检索出所有的数据。你可能希望 Calcite 与适配器协商,并找到一种更有效的数据访问方式。

这种协商就是查询优化的一种简单形式。Calcite 通过添加 优化器规则 来支持查询优化。优化器规则在查询解析树中查找模式(例如某种表解析树顶部的投影),并使用一组新的优化节点来替换树中匹配的节点。

优化器规则像模式和表一样,也是可扩展的。因此,如果你有一个想要通过 SQL 访问的数据存储,你可以首先定义自定义表或模式,然后定义一些规则来提高访问的效率。

让我们通过一个实战来加深理解,使用优化器规则访问 CSV 文件中的部分列。下面有两个非常相似的模式,我们执行相同的查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
sqlline> !connect jdbc:calcite:model=src/test/resources/model.json admin admin
sqlline> explain plan for select name from emps;
+-----------------------------------------------------+
| PLAN |
+-----------------------------------------------------+
| EnumerableCalcRel(expr#0..9=[{inputs}], NAME=[$t1]) |
| EnumerableTableScan(table=[[SALES, EMPS]]) |
+-----------------------------------------------------+
sqlline> !connect jdbc:calcite:model=src/test/resources/smart.json admin admin
sqlline> explain plan for select name from emps;
+-----------------------------------------------------+
| PLAN |
+-----------------------------------------------------+
| EnumerableCalcRel(expr#0..9=[{inputs}], NAME=[$t1]) |
| CsvTableScan(table=[[SALES, EMPS]]) |
+-----------------------------------------------------+

是什么导致了执行计划的差异?让我们跟着证据的线索走。在 smart.json 模型文件中,只有一行:

1
flavor: "translatable"

这个配置会使用 flavor = TRANSLATABLE 来创建 CsvSchema,它的 createTable 方法创建了 CsvTranslatableTable 而不是 CsvScannableTable

CsvTranslatableTable 实现了 TranslatableTable.toRel() 方法,用来创建 CsvTableScan。表扫描是查询操作树的叶子节点。通常实现是 EnumerableTableScan,但我们创建了一个独特的子类型,它将导致规则触发。

下面是完整的规则实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class CsvProjectTableScanRule extends RelRule<CsvProjectTableScanRule.Config> {

/**
* Creates a CsvProjectTableScanRule.
*/
protected CsvProjectTableScanRule(Config config) {
super(config);
}

@Override
public void onMatch(RelOptRuleCall call) {
final LogicalProject project = call.rel(0);
final CsvTableScan scan = call.rel(1);
int[] fields = getProjectFields(project.getProjects());
if (fields == null) {
// Project contains expressions more complex than just field references.
return;
}
call.transformTo(new CsvTableScan(scan.getCluster(), scan.getTable(), scan.csvTable, fields));
}

private int[] getProjectFields(List<RexNode> exps) {
final int[] fields = new int[exps.size()];
for (int i = 0; i < exps.size(); i++) {
final RexNode exp = exps.get(i);
if (exp instanceof RexInputRef) {
fields[i] = ((RexInputRef) exp).getIndex();
} else {
return null; // not a simple projection
}
}
return fields;
}

/**
* Rule configuration.
*/
public interface Config extends RelRule.Config {

Config DEFAULT = EMPTY.withOperandSupplier(b0 -> b0.operand(LogicalProject.class)
.oneInput(b1 -> b1.operand(CsvTableScan.class).noInputs())).as(Config.class);

@Override
default CsvProjectTableScanRule toRule() {
return new CsvProjectTableScanRule(this);
}
}
}

规则的默认实例驻留在 CsvRules 的持有类中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Planner rules relating to the CSV adapter.
*/
public abstract class CsvRules {

private CsvRules() {
}

/**
* Rule that matches a {@link org.apache.calcite.rel.core.Project} on
* a {@link CsvTableScan} and pushes down projects if possible.
*/
public static final CsvProjectTableScanRule PROJECT_SCAN = CsvProjectTableScanRule.Config.DEFAULT.toRule();
}

在默认配置类中(Config 接口中的 DEFAULT 字段),对 withOperandSupplier 方法的调用声明了关系表达式的匹配模式,这个匹配模式会导致规则的触发。如果优化器发现 LogicalProject 的唯一输入是一个没有输入的 CsvTableScan,它将调用这个规则。

规则的变体是可能存在的。例如,不同的规则实例可能会在 CsvTableScan 上匹配到 EnumerableProject

onMatch 方法生成一个新的关系表达式,并调用 RelOptRuleCall.transformTo() 来表明规则已经成功触发。

查询优化过程

有很多关于 Calcite 查询优化器是多么巧妙的说法,但是我们不会在这里谈论它。巧妙是设计用来减轻你的负担——优化器规则的开发者。

首先,Calcite 不会按照指定的顺序触发规则。查询优化过程按照分支树的众多分支执行,就像下棋程序检查许多可能的位移顺序一样。如果规则 A 和 B 都匹配了查询操作树的给定部分,则 Calcite 可以同时触发。

其次,Calcite 基于成本在多个计划中进行选择,但成本模型并不能阻止规则的触发,这个操作在短期内看起来似乎代价更大。

许多优化器都有一个线性优化方案。如上所述,在面对规则 A 和规则 B 这样的选择时,线性优化器需要立即选择。它可能有诸如 将规则 A 应用于整棵树,然后将规则 B 应用于整棵树 之类的策略,或者使用基于成本的策略,应用代价最小的规则。

Calcite 不需要进行这样的妥协。这使得组合各种规则集合变得简单。如果你想要将 识别物化视图的规则从 CSV 和 JDBC 源系统读取数据的规则 结合起来,你只要将所有规则的集合提供给 Calcite 并告诉它去执行即可。

Calcite 确实使用了成本模型。成本模型决定最终使用哪个计划,有时会修剪搜索树以防止搜索空间爆炸,但它从不强迫你在规则 A 和规则 B 之间进行选择。这点很重要,因为它避免了陷入在搜索空间中不是全局最佳的局部最小值。

此外,如你所想,成本模型是可插拔的,它所依赖的表和查询操作统计也是可插拔的,但那些都是后面的主题。

JDBC 适配器

JDBC 适配器将 JDBC 数据源中的模式映射为 Calcite 模式。

例如,下面这个模式从 MySQL foodmart 数据库中读取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"version": "1.0",
"defaultSchema": "FOODMART",
"schemas": [
{
"name": "FOODMART",
"type": "custom",
"factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
"operand": {
"jdbcDriver": "com.mysql.jdbc.Driver",
"jdbcUrl": "jdbc:mysql://localhost/foodmart",
"jdbcUser": "foodmart",
"jdbcPassword": "foodmart"
}
}
]
}

FoodMart 数据库,使用过 Mondrian OLAP 引擎的人应该比较熟悉,因为它是 Mondrian 的主要测试数据集。要加载数据集,请按照 Mondrian 安装说明 进行操作。

当前限制:JDBC 适配器当前只下推了表扫描操作,所有其他处理(过滤连接聚合 等)都发生在 Calcite 中。我们的目标是将尽可能多的处理下推到源系统,例如:语法转换、数据类型和内置函数,这些都是我们在做的。如果 Calcite 查询是基于单个 JDBC 数据库的表,原则上整个查询应该转到数据库上执行。如果表是来自多个 JDBC 数据源,或者 JDBC 和非 JDBC 的混合数据源,Calcite 将尽可能使用最有效的分布式查询方法。

克隆 JDBC 适配器

克隆 JDBC 适配器会创建一个混合数据库。数据来自 JDBC 数据库,但在第一次访问每个表时会将数据读入内存表。Calcite 基于这些内存表获取查询结果,内存表实际上是数据库的缓存。

例如,以下模型从 MySQL foodmart 数据库读取表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"version": "1.0",
"defaultSchema": "FOODMART_CLONE",
"schemas": [
{
"name": "FOODMART_CLONE",
"type": "custom",
"factory": "org.apache.calcite.adapter.clone.CloneSchema$Factory",
"operand": {
"jdbcDriver": "com.mysql.jdbc.Driver",
"jdbcUrl": "jdbc: mysql: //localhost/foodmart",
"jdbcUser": "foodmart",
"jdbcPassword": "foodmart"
}
}
]
}

另一种技巧是在现有模式之上构建克隆模式。你可以使用 source 属性来引用模型中之前定义的模式,就像下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
"version": "1.0",
"defaultSchema": "FOODMART_CLONE",
"schemas": [
{
"name": "FOODMART",
"type": "custom",
"factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
"operand": {
"jdbcDriver": "com.mysql.jdbc.Driver",
"jdbcUrl": "jdbc: mysql: //localhost/foodmart",
"jdbcUser": "foodmart",
"jdbcPassword": "foodmart"
}
},
{
"name": "FOODMART_CLONE",
"type": "custom",
"factory": "org.apache.calcite.adapter.clone.CloneSchema$Factory",
"operand": {
"source": "FOODMART"
}
}
]
}

你可以使用这种方法在任何类型的模式基础上创建克隆模式,不仅仅是 JDBC。

克隆适配器并不是万能的。我们计划开发更复杂的缓存策略,以及更完整和更高效的内存表实现,但现在克隆 JDBC 适配器展示了什么是可行的,并允许我们去尝试初始实现。

更多主题

还有很多其他方法来扩展 Calcite,但是这些在教程中没有涉及。适配器规范 描述了所有涉及到的 API

写在最后

笔者因为工作原因接触到 Calcite,前期学习过程中,深感 Calcite 学习资料之匮乏,因此创建了 Calcite 从入门到精通知识星球,希望能够将学习过程中的资料和经验沉淀下来,为更多想要学习 Calcite 的朋友提供一些帮助。

Calcite 从入门到精通