Skip to main content

· 9 min read
百岁

前言

说到依赖反转原则百度百科是这样介绍的,如下:

在面向对象编程领域中,依赖反转原则(Dependency inversion principle,DIP)是指一种特定的解耦(传统的依赖关系创建在高层次上,而具体的策略设置则应用在低层次的模块上)形式,使得高层次的模块不依赖于低层次的模块的实现细节,依赖关系被颠倒(反转),从而使得低层次模块依赖于高层次模块的需求抽象。

大家应该会马上会联想到Spring Framework,在介绍Spring Framework框架常常会提及依赖反转原则,看上面的介绍估计会云里雾里,说得通俗一点,该原则的初衷要求服务提供者与服务调用者在代码实现层面实现解耦

为了加深理解,经常会提到的一个例子,以前古时候的包办婚姻,假如是男方到了适婚年龄,只要把自己的条件、要求告诉媒婆。接下来找合适对象的过程就交给媒婆就行了,男方只需要负责到时候入洞房就行了。对于男方来说, 婚姻和他是服务提供者和消费者的关系,由于引入了媒婆的角色,男方省去了谈婚论嫁的麻烦过程,只需要专注于核心业务-入洞房。最终使得整个过程显得简单高效,形式格外优雅。

因此,依赖反转原则DIP作为一个朴素的原则存在,可以应用到软件设计领域每一个流程环节当中,而不仅仅适用于Spring Framework当中。

本文就以依赖反转原则DIP在TIS增量实时数据通道的设计、实现过程中如何利用这一原则来优化设计、实现流程进行阐述。

实现实时增量数据管道需求

在TIS中为用户提供了基于Flink端到端的实时增量数据通道功能,市面上已经提供了基于Flink和Flink-CDC的实时流同步工具,从用户反馈来看已经很方便了,那为什么还要通过TIS来 使用Flink-CDC呢?

这是一个非常好的问题,要回答这个问题,首先我们需要从用户的角度了解用户到底需要什么?然后从需求出发设计并且构建出用户体验达到极致的产品。

大数据流计算领域,用户的核心需求是:

  1. 可追溯操作历史的控制系统,这样可以方便回滚历史操作。
  2. 不关心算子实现细节,流计算的使用者往往是对Flink不了解的数据分析人员,所以在产品使用体验上需要屏蔽底层技术细节。
  3. 可扩展的端类型:Flink-CDC从3.0版本支持的Connectors,只支持了有限个数的基于增量监听CDC技术的Source端 ,和少量Sink端实现,如:Doris和StarRocks的Sink端类型。还远远没有达到用户实际生产场景下的端类型。所以,需要提供在更高层次上,通过便捷方式扩展Source和Sink端类型的手段。

TIS正式为了弥补以上三个使用Flink-CDC框架中的不足而开发的。

具体实现

下面具体对以上第2点进行进行说明,配置并且触发执行基于Flink-CDC的数据管道具体通过以下步骤完成

编辑

构建DataStreamSource步骤中,通过调用Flink-CDC提供的API代码,可以方便订阅到如MySQL的增量更新消息,如下代码:

public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// enable checkpoint
env.enableCheckpointing(3000);

env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

env.execute("Print MySQL Snapshot + Binlog");
}

通过以上流计算的流程中可以使用创建出MySqlSource<String> mySqlSourceSource加入到各种算子中去进行计算。

使用SQL的方式将Stream Source 注册为Flink Table:

CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
'scan.startup.mode' = 'earliest-offset', -- Start from earliest offset
'scan.startup.mode' = 'latest-offset', -- Start from latest offset
'scan.startup.mode' = 'specific-offset', -- Start from specific offset
'scan.startup.mode' = 'timestamp', -- Start from timestamp
'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Binlog filename under specific offset startup mode
'scan.startup.specific-offset.pos' = '4', -- Binlog position under specific offset mode
'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- GTID set under specific offset startup mode
'scan.startup.timestamp-millis' = '1667232000000' -- Timestamp under timestamp startup mode
...
)

以上是Flink-CDC提供的标准化的Demo案例。

在这里我们重新用依赖反转原则来思考一下,这个构建流程是否有违背该原则?确实,从用户的角度来说,用户只关心最终构建出来的MySqlSource<String>实例,至于构建该实例的过程用户并不关心,所以在设计过程需要将 MySqlSource<String>实例构建过程与它的调用者之间进行解耦合。

是时候发挥TIS的作用了,TIS需要发挥实例容器的作用,由TIS根据用户配置的Source端参数自动地创建MySqlSource<String>实例, 在运行时自动注入到执行流程中。

  • 配置Source/Sink Connector
  • 直接引用TIS注入的SourceStream实例
  • 当用户选择Flink SQL类型脚本,直接引用已经注册完成的Table名即可

以上具体提供注入实例的封装工厂是:

以上两段代码的执行逻辑类似Spring FactoryBean 执行逻辑,实现容器预定的扩展工厂接口,运行期由容器负责初始化,继而将实例注入到需要反向依赖的实例中。

总结

本文介绍了利用依赖反转原则在TIS中实现实时增量通道的优化方法,可使最终用户最大限度地关注流式计算核心业务本身,其他的琐碎的与实例初始化相关的工作都交给TIS来完成即可。

与此类似的功能优化,在TIS实现过程中还有很多,会在日后的博客分享中陆续发表。

· 8 min read
百岁

前言

MongoDB是一个基于分布式文件存储的开源数据库系统,其内容存储格式为BSON(一种类json的二进制形式),其主要特点是高性能、易部署、易使用,存储数据支持高度事务性且支持完全索引,包括地理空间索引、散列索引和全文索引,还有一个比较大的优势是其适应于海量数据的存储,其数据被分散在不同的服务器上,以自动分区数据。

MongoDB在实际生产环境中有很多应用场景,利用器BSON数据结构可以在运行期动态扩展数据Schema。

MongoDB被TIS整合进了数据集成方案,通过TIS中可以方便对MongoDB端进行读取或者写入操作,方便实现对MongoDB的数据迁移、实时容灾备份、异构数据端(如:Doris)实时同步实现复杂OLAP操作。

本文就实际操作过程中,发现从MongoDB中不能预先读取表Schema,对此进行了优化,并且对此优化过程作以详细介绍。

发现瓶颈

MongoDB作为文档型数据库的代表,区别于传统关系型数据库MySQL,MongoDB的表数据结构Schema在运行期是可变的,而不是像MySQL那样通过Create Table DDL预先定义好表Schema。因此,在为MongoDB做数据集成操作时带来一个麻烦事儿,需要通过手工配置的方式 为读取MongoDB的表作为依据。例如,用户通过Alibaba DataX来读取MongoDB需编写DataX Reader任务配置:

https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md
  {
"job": {
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["127.0.0.1:27017"],
"dbName": "tag_per_data",
"collectionName": "tag_data12",
"column": [
{
"name": "unique_id",
"type": "string"
},
{
"name": "sid",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "auction_id",
"type": "string"
},
{
"name": "content_type",
"type": "string"
},
{
"name": "pool_type",
"type": "string"
}
]
}
},
"writer": {
"name": "odpswriter"
}
}
]
}
}

以上配置文件中 reader mongodbreader需要配置对应表的列枚举信息,列中存在BSON类型的列,还存在拆列的问题,会更加复杂,配置过程虽然简单,但还是很容易会出错,特别在配置type属性时。

优化

思路

优化思路,是否可以通过MongoDB的JDBC客户端,通过反射的方式得到表的列信息列表。然后通过模版机制(如:velocity)自动生成DataX配置文件中column配置。

尝试通过MongoDB Client API读取表Schema元数据信息,我们可以尝试从collection中读取一条记录,然后通过解析记录获得Schema记录,如下:

  var schemaObj = db.users.findOne();

遍历记录的所有列

  void printSchema(obj) {
for (var key in obj) {
print(indent, key, typeof obj[key]) ;
}
};

可以将Schema打印出,如下:

这非常酷,而用户自定义Collection往往存在子属性,希望将这些子属性进行拆接打平,可以导入到下游目标端中。

我们可以优化以上代码:

function printSchema(obj, indent) {
for (var key in obj) {
print(indent, key, typeof obj[key]) ;
if (typeof obj[key] == "object") {
printSchema(obj[key], indent + "\t")
}
}
};
printSchema(schemaObj,"");

重新执行以上代码,将会打印:

1_nWI-bWoJHWE1WU2Mgk8z-A.webp

在TIS中具体实现

TIS中读取MongoDB表Schema实现方式沿袭以上思路,另外添加额外的工序:

  1. 在控制台中设置尝试读取的记录数,由于MongoDB Collection中每条记录的列数量和类型不一定相同的,可以尝试读取多条Collection中记录,将每条记录Schema进行Merge最终获得Schema。
  2. 读取每条记录类型为BsonType.DOCUMENT的列类型,将内部子列与父列通过'.'号就行连接,打平成为新的列,例如:"user.name","user.age"
  3. 将获得到的Schema在前台展示,用户可以通过在表单中对Schema结构进行微调,以确认最终的Schema结构。

以下为 MongoColumnMetaData.java GitHub中路径代码中的片段:

/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/MongoColumnMetaData.java#L69
    public static void parseMongoDocTypes(boolean parseChildDoc, List<String> parentKeys //
, Map<String, MongoColumnMetaData> colsSchema, BsonDocument bdoc) {
int index = 0;
BsonValue val;
String key;
MongoColumnMetaData colMeta;
List<String> keys = null;

for (Map.Entry<String, BsonValue> entry : bdoc.entrySet()) {
val = entry.getValue();
keys = ListUtils.union(parentKeys, Collections.singletonList(entry.getKey()));
key = String.join(MongoCMeta.KEY_MONOG_NEST_PROP_SEPERATOR, keys);
colMeta = colsSchema.get(key);
if (colMeta == null) {
colMeta = new MongoColumnMetaData(index++, key, val.getBsonType(), 0,
(val.getBsonType() == BsonType.OBJECT_ID));
colsSchema.put(key, colMeta);
} else {
if (colMeta.getMongoFieldType() != BsonType.STRING //
&& !val.isNull() && colMeta.getMongoFieldType() != val.getBsonType()) {
//TODO: 前后两次类型不同
// 则直接将类型改成String类型
colMeta = new MongoColumnMetaData(index++, key, BsonType.STRING);
colsSchema.put(key, colMeta);
}
}
if (!val.isNull()) {
if (colMeta.getMongoFieldType() == BsonType.DOCUMENT && val.isDocument()) {
parseMongoDocTypes(true, keys, parseChildDoc ? colsSchema : colMeta.docTypeFieldEnum, val.asDocument());
}
if (colMeta.getMongoFieldType() == BsonType.STRING) {
colMeta.setMaxStrLength(val.asString().getValue().length());
}
colMeta.incrContainValCount();
}

}
}

MongoDB Reader 页面设置预读记录数,尝试读取Collection记录条数以分析出Collection的Schema,为确保最终Schema准确,可以适当将预读记录数设置得大一些。

通过TIS解析得到的Schema会在以下页面中展示结果,用户可进行微调,确定是否需要该列,变更字段类型等。

通过以上优化,最终完成数据同步通道定义,可以有效避免用户手动输入MongoDB表 Schema,达到了最大限度地降低了出错概率,且提高了工作效率

总结

数据集成领域,有大部分的端类型是和MongoDB类似属于 Schemaless的,例如Kafka,Redis,基于文件的Hdfs,FTP等等。不像MySQL这样的具有明确预定义Schema的数据源可以通过读取MetaData的方式得到Schema,从而进行自动化操作。

TIS的初衷是构建一款高度傻瓜化的DataOps数据集成软件,面向一线非技术人员,他们精通业务,在具体操作过程中不需要了解具体MongoDB中的字段类型,有哪些字段。 整个操作流程,只需要轻点鼠标,TIS会帮助用户自动生成所需配置。

借鉴这种操作思路,可以扩展到其他Schemaless的数据端读取流程上,例如Kafka,Redis,基于文件的Hdfs,FTP,将会极大地提高执行数据集成的效率。

· 3 min read
百岁

前言

在TIS 4.0.0 版本主要的功能是将原先但节点运行的组件扩展到分布式云环境中,以下类图中有三个组件需要依赖到ServerPortExport组件,

  1. Kubernete Powerjob Server
  2. Kubernete Flink Session
  3. Kubernete Flink Application

ServerPortExport 组件负责在K8S组件(ReplicaSet)发布过程中将目标端口以不同的方式发布(Ingress,LoadBalance,NodePort)

编辑

遇到问题

ServerPortExport 组件聚合到不同的组件中,在具体运行过程中需要根据聚合类的不同有不同的初始值,

例如,聚合在K8SDataXPowerJobServer类中初始值为7700,而当聚合在BasicFlinkK8SClusterCfg中的初始值为8081,当然,直观来说,最简单的办法是根据聚合到不同的类,创建不同的ServerPortExport的子类从而来设置不同的初始值, 但这会创建出大量的冗余代码,所以,并不可取。

解决办法

在运行期,根据所在聚合类的Descriptor来动态设置 ServerPortExport.serverPort 属性的值

编辑

具体需要做以下功能:

  1. 创建 DefaultExportPortProvider接口,get方法返回对应的端口默认值
  2. BasicFlinkK8SClusterCfgK8SDataXPowerJobServer对应的 Descriptor分别实现以上接口
  3. 在运行期将Descriptor序列化成Json步骤中,需要将Descriptor实例与当前运行的线程绑定,这部分功能在Json序列化过程中执行,为此需要添加新类DescriptorsJSONResult
  4. 为类DescriptorsJSONResult注册到Json序列化注册器中
    JsonUtil.java
       ObjectWriter descSerializer = new ObjectWriter() {
    @Override
    public void write(JSONWriter jsonWriter, Object object, Object fieldName, Type fieldType, long features) {
    DescriptorsJSONResult value = (DescriptorsJSONResult) object;
    Objects.requireNonNull(value, "callable of " + fieldName + " can not be null");
    jsonWriter.writeRaw(value.toJSONString());
    }
    };
    com.alibaba.fastjson2.JSON.register(DescriptorsJSONResult.class, descSerializer);
  5. 通过ServerPortExport.json配置描述文件,设置属性serverPort的默认值
    ServerPortExport.json
      {
    "serverPort": {
    "help": "SpringBoot配置,HTTP端口号,默认7700,不建议更改",
    "dftVal": "com.qlangtech.tis.plugin.datax.powerjob.ServerPortExport.dftExportPort():uncache_true"
    }
    }
    ServerPortExport.java
      public static Integer dftExportPort() {
    return ((DefaultExportPortProvider)
    DescriptorsJSONResult.getRootDescInstance()).get();
    }

总结

通过以上步骤,就可以将ServerPortExport根据所在聚合类不同将属性serverPort初始化成不同的默认值。 以此作为一个例子,可以在TIS中相同需求可以推而广之。