如何理解flink1.11中的JDBCCatalog

今天就跟大家聊聊有关如何理解flink 1.11 中的JDBC Catalog,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

为方正等地区用户提供了全套网页设计制作服务,及方正网站建设行业解决方案。主营业务为网站设计制作、网站设计、方正网站设计,以传统方式定制建设网站,并提供域名空间备案等一条龙服务,秉承以专业、用心的态度为用户提供真诚的服务。我们深信只要达到每一位用户的要求,就会得到认可,从而选择与我们长期合作。这样,我们也可以走得更远!

 

背景

1.11.0 之前,用户如果依赖 Flink 的 source/sink 读写关系型数据库或读取 changelog 时,必须要手动创建对应的 schema。但是这样会有一个问题,当数据库中的 schema 发生变化时,也需要手动更新对应的 Flink 任务以保持类型匹配,任何不匹配都会造成运行时报错使作业失败。这个操作冗余且繁琐,体验极差。

实际上对于任何和 Flink 连接的外部系统都可能有类似的上述问题,在 1.11.0 中重点解决了和关系型数据库对接的这个问题。提供了 JDBC catalog 的基础接口以及 Postgres catalog 的实现,这样方便后续实现与其它类型的关系型数据库的对接。

1.11.0 版本后,用户使用 Flink SQL 时可以自动获取表的 schema 而不再需要输入 DDL。除此之外,任何 schema 不匹配的错误都会在编译阶段提前进行检查报错,避免了之前运行时报错造成的作业失败。

 

示例

目前对于jdbc catalog,flink仅提供了postgres catalog,我们基于postgres的catalog讲解一下如何使用flink的catalog ,

  • 引入pom
   
            org.postgresql
            postgresql
            42.2.5
        


 
  • 新建PostgresCatalog    
    目前flink通过一个静态类来创建相相应的jdbc  catalog,对于PostgresCatalog,没有提供public类型的构造方法。

通过JdbcCatalogUtils.createCatalog构造PostgresCatalog时这五个参数都是必填项,其中baseUrl要求是不能带有数据库名的

  String catalogName = "mycatalog";
  String defaultDatabase = "postgres";
  String username = "postgres";
  String pwd = "postgres";
  String baseUrl = "jdbc:postgresql://localhost:5432/";

  PostgresCatalog postgresCatalog = (PostgresCatalog) JdbcCatalogUtils.createCatalog(
    catalogName,
    defaultDatabase,
    username,
    pwd,
    baseUrl);
 

访问postgres 数据库指定表名的时候完整的路径名应该是以下格式:

..``
 

其中schema默认是public,如果是使用缺省值,public是可以省略的。比如下面的查询语句:

SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
 

如果非缺省schema,则不能被省略:

SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
 
  • 常见操作

我们PostgresCatalog将注册到StreamTableEnvironment 的变量tEnv中,然后就可以用tEnv进行一些操作了。

 tEnv.registerCatalog(postgresCatalog.getName(), postgresCatalog);
  tEnv.useCatalog(postgresCatalog.getName());
 
  1. 列出来所有的数据库:
        System.out.println("list databases :");
  String[] databases = tEnv.listDatabases();
  Stream.of(databases).forEach(System.out::println);
 
  1. 列出来所有的table
     tEnv.useDatabase(defaultDatabase);
  System.out.println("list tables :");
  String[] tables = tEnv.listTables(); // 也可以使用  postgresCatalog.listTables(defaultDatabase);
  Stream.of(tables).forEach(System.out::println);
 
  1. 列出所有函数
        System.out.println("list functions :");
  String[] functions = tEnv.listFunctions();
  Stream.of(functions).forEach(System.out::println);
 
  1. 获取table的schema
 CatalogBaseTable catalogBaseTable = postgresCatalog.getTable(new ObjectPath(
    defaultDatabase,
    "table1"));

  TableSchema tableSchema = catalogBaseTable.getSchema();
  System.out.println("tableSchema --------------------- :");
  System.out.println(tableSchema);
 
  1. 查询表的数据
  List results = Lists.newArrayList(tEnv.sqlQuery("select * from table1")
                                             .execute()
                                             .collect());
  results.stream().forEach(System.out::println);
 
  1. 插入数据
tEnv.executeSql("insert into table1 values (3,'c')");
 

完整的代码请参考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/catalog/PostgresCatalogTest.java

 

源码解析

 

AbstractJdbcCatalog

这个类主要是对jdbc catalog一些公共的操作做了抽象.目前实现了实际功能的只有一个方法:getPrimaryKey,其他方式主要是对于Catalog的一些其他实现类做了特殊处理,比如类似create table 或者 alter table是不支持的,listView只是返回一个空列表,因为我们使用jdbc catalog主要是来做一些DML操作。

 @Override
 public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
  throw new UnsupportedOperationException();
 }

 @Override
 public List listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
  return Collections.emptyList();
 }
   

PostgresCatalog

在这里面,主要是实现了一些常用的操作数据库的方法,比如getTable、listTables、listDatabases等等,其实简单的来说就是从postgres元数据库里查询出来相应的信息,然后组装成flink的相关对象,返回给调用方。以一个简单的方法listDatabases为例:

从元数据表pg_database中查询所有的tablename,然后去掉内置的数据库,也就是template0和template1,然后封装到一个list对象里,返回。

 @Override
 public List listDatabases() throws CatalogException {
  List pgDatabases = new ArrayList<>();

  try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {

   PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");

   ResultSet rs = ps.executeQuery();

   while (rs.next()) {
    String dbName = rs.getString(1);
    if (!builtinDatabases.contains(dbName)) {
     pgDatabases.add(rs.getString(1));
    }
   }

   return pgDatabases;
  } catch (Exception e) {
   throw new CatalogException(
    String.format("Failed listing database in catalog %s", getName()), e);
  }
 }
 

有不兼容的地方需要做一些转换,比如getTable方法,有些数据类型是不匹配的,要做一些类型的匹配,如postgres里面的serial和int4都会转成flink的int类型,具体的参考下PostgresCatalog#fromJDBCType方法。

 private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
  String pgType = metadata.getColumnTypeName(colIndex);

  int precision = metadata.getPrecision(colIndex);
  int scale = metadata.getScale(colIndex);

  switch (pgType) {
   case PG_BOOLEAN:
    return DataTypes.BOOLEAN();
   case PG_BOOLEAN_ARRAY:
    return DataTypes.ARRAY(DataTypes.BOOLEAN());
   case PG_BYTEA:
    return DataTypes.BYTES();
    .........................

看完上述内容,你们对如何理解flink 1.11 中的JDBC Catalog有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注创新互联行业资讯频道,感谢大家的支持。


当前标题:如何理解flink1.11中的JDBCCatalog
网页链接:http://myzitong.com/article/pspdgs.html