Apache Calcite 简介

在本教程中,我们将了解Apache Calcite。它是一个功能强大的数据管理框架,可用于与数据访问有关的各种用例。
它专注于从任何来源检索数据,而不是存储数据。此外,其查询优化功能可以实现更快、更高效的数据检索。

什么是Calcite
为新数据库构建查询引擎需要数年时间。然而,Calcite 帮助我们立即开始使用开箱即用的可扩展SQL解析器、验证器和优化器。方解石已用于构建HerdDB、Apache Druid、MapD等数据库。

由于 Calcite 能够与多个数据库集成,因此它被广泛用于构建数据仓库和商业智能工具,例如Apache Kyline、Apache Wayang、阿里巴巴 MaxCompute等。

Calcite 是Apache Kafka、Apache Apex和Flink等流媒体平台不可或缺的组件,这些平台有助于构建可以呈现和分析实时源的工具。

Apache Calcite 提供现成的适配器来与Cassandra、Elasticsearch、MongoDB等第三方数据源集成。

CSV 适配器案例
让我们看一个示例,其中我们将使用 SQL 查询从 CSV 文件读取数据。让我们首先导入在pom.xml文件中使用文件适配器所需的必要Maven 依赖项:

<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-core</artifactId>
    <version>1.34</version>
</dependency>
<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-file</artifactId>
    <version>1.34</version>
</dependency>


接下来,让我们在model.json中定义模型:

{
  "version": "1.0",
 
"defaultSchema": "TRADES",
 
"schemas": [
    {
     
"name": "TRADES",
     
"type": "custom",
     
"factory": "org.apache.calcite.adapter.file.FileSchemaFactory",
     
"operand": {
       
"directory": "trades"
      }
    }
  ]
}

model.json中指定的FileSchemaFactory会查找CSV 文件的trades目录并创建虚拟TRADES模式jsons并创建一个虚拟。随后, trades目录下的 CSV 文件将被视为表格。

在继续查看文件适配器的运行情况之前,让我们先看一下trade.csvcsv文件,我们将使用方解石适配器查询该文件:文件,我们将使用方解石适配器查询该文件:

tradeid:int,product:string,qty:int
232312123,"RFTXC",100
232312124,
"RFUXC",200
232312125,
"RFSXC",1000

CSV 文件包含三列:tradeid、Product和qty。此外,列标题还指定数据类型。CSV 文件中总共有 3 条交易记录。CSV 文件包含三列:数据类型。CSV 文件中总共有 3 条交易记录。CSV 文件。

最后,让我们看看如何使用 Calcite 适配器获取记录:

@Test
void whenCsvSchema_thenQuerySuccess() throws SQLException {
    Properties info = new Properties();
    info.put("model", getPath("model.json"));
    try (Connection connection = DriverManager.getConnection(
"jdbc:calcite:", info);) {
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery(
"select * from trades.trade");
        assertEquals(3, resultSet.getMetaData().getColumnCount());
        List<Integer> tradeIds = new ArrayList<>();
        while (resultSet.next()) {
            tradeIds.add(resultSet.getInt(
"tradeid"));
        }
        assertEquals(3, tradeIds.size());
    }
}

Calcite 适配器采用模型属性来创建模仿文件系统的虚拟模式。然后,它使用通常的 JDBC 语义从trade.csv文件中获取记录。

文件适配器不仅可以读取 CSV 文件,还可以读取HTML 和 JSON 文件。此外,为了处理 CSV 文件,Apache Calcite 还提供了一个特殊的 CSV 适配器,用于处理使用CSVSchemaFactory 的高级用例。

Java 对象的内存中 SQL 操作
与 CSV 适配器示例类似,让我们看另一个示例,在 Apache Calcite 的帮助下,我们将在 Java 对象上运行 SQL 查询。

假设CompanySchema类中 有两个Employee 和Department类数组:

public class CompanySchema {
    public Employee employees;
    public Department departments;
}

现在,让我们看一下Employee类:

public class Employee {
    public String name;
    public String id;
    public String deptId;
    public Employee(String name, String id, String deptId) {
        this.name = name;
        this.id = id;
        this.deptId = deptId;
    }
}

与Employee类类似,我们定义Department类:

public class Department {
    public String deptId;
    public String deptName;
    public Department(String deptId, String deptName) {
        this.deptId = deptId;
        this.deptName = deptName;
    }
}

假设有三个部门:财务、营销和人力资源。我们将对CompanySchema对象运行查询以查找每个部门的员工人数:

@Test
void whenQueryEmployeesObject_thenSuccess() throws SQLException {
    Properties info = new Properties();
    info.setProperty("lex", "JAVA");
    Connection connection = DriverManager.getConnection(
"jdbc:calcite:", info);
    CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
    SchemaPlus rootSchema = calciteConnection.getRootSchema();
    Schema schema = new ReflectiveSchema(companySchema);
    rootSchema.add(
"company", schema);
    Statement statement = calciteConnection.createStatement();
    String query =
"select dept.deptName, count(emp.id) "
      +
"from company.employees as emp "
      +
"join company.departments as dept "
      +
"on (emp.deptId = dept.deptId) "
      +
"group by dept.deptName";
    assertDoesNotThrow(() -> {
        ResultSet resultSet = statement.executeQuery(query);
        while (resultSet.next()) {
            logger.info(
"Dept Name:" + resultSet.getString(1)
              +
" No. of employees:" + resultSet.getInt(2));
        }
    });
}

有趣的是,该方法运行良好并且也获取结果。在该方法中,Apache Calcite 类ReflectiveSchema帮助创建CompanySchema对象的架构。然后,它运行 SQL 查询并使用标准 JDBC 语义获取记录。

此外,这个例子证明,无论来源如何,Calcite 都可以使用 SQL 语句从任何地方获取数据。

查询处理
查询处理是 Apache calcite 的核心功能。
标准 JDBC 驱动程序或 SQL 客户端对数据库执行查询。而Apache Calcite 在解析和验证查询后,会智能地优化它们以实现高效执行、节省资源并提高性能。

SQL 解析器和验证器
作为解析过程的一部分,解析器将 SQL 查询转换为称为 AST(抽象语法树)的树状结构。

假设对教师和部门两个表进行 SQL 查询:

Select Teacher.name, Department.name 
From Teacher join 
Department On (Department.deptid = Teacher.deptid)
Where Department.name = 'Science'

关系表达式生成器
随后,在验证步骤之后,关系表达式生成器使用一些常见的关系运算符转换语法树:

LogicalTableScan:从表中读取数据
LogicalFilter:根据条件选择行
LogicalProject:选择要包含的特定列
LogicalJoin:根据匹配值组合两个表中的行
考虑前面显示的 AST,从中导出的相应逻辑关系表达式将是:

LogicalProject(
    projects=[
        $0.name AS name0,
        $1.name AS name1
    ],
    input=LogicalFilter(
        condition=[
            ($1.name = 'Science')
        ],
        input=LogicalJoin(
            condition=[
                ($0.deptid = $1.deptid)
            ],
            left=LogicalTableScan(table=[[Teacher]]),
            right=LogicalTableScan(table=[[Department]])
        )
    )
)

在关系表达式中,$0和$1代表表Teacher和Department。本质上,它是一个数学表达式,有助于理解将执行哪些操作来获得结果。但是,它没有与执行相关的信息。

查询优化器
然后,Calcite 优化器对关系表达式应用优化。一些常见的优化包括:

  • 谓词下推:将过滤器推到尽可能靠近数据源的位置,以减少获取的数据量
  • 连接重新排序:重新排列连接顺序以最小化中间结果并提高效率
  • 投影下推:下推投影以避免处理不必要的列
  • 索引使用:识别和利用索引来加速数据检索

 查询计划器、生成器和执行器

  • 优化后,Calcite 查询规划器创建一个执行计划来执行优化的查询。执行计划指定查询引擎获取和处理数据所采取的确切步骤。这也称为特定于后端查询引擎的物理计划。
  • 然后,Calcite 查询生成器以特定于所选执行引擎的语言生成代码。
  • 最后,Executor连接数据库执行最终的查询。

结论
在本文中,我们探讨了 Apache Calcite 的功能,它可以快速为数据库配备标准化 SQL 解析器、验证器和优化器。因此,Calcite 使供应商无需开发长达数年的查询引擎,使他们能够优先考虑后端存储。此外,Calcite 的现成适配器简化了与不同数据库的连接,有助于开发统一的集成接口。

此外,通过利用 Calcite,数据库开发人员可以加快上市时间并提供强大、多功能的 SQL 功能。