安装 python 依赖
pip install pymysql
https://mirrors.aliyun.com/pypi/packages/e5/30/20467e39523d0cfc2b6227902d3687a16364307260c75e6a1cb4422b0c62/PyMySQL-1.1.0-py3-none-any.whl
ods 层建表语句
-- 存储业务库同步的数据(行存,禁止排序)
drop table if exists ods_stu_for;
create table if not exists ods_stu_for
(
id int comment '',
name varchar(23) comment '',
cost varchar(12) comment ''
)
engine=olap
comment 'olap'
distributed by hash(`id`) buckets 2
properties(
"replication_num" = "1",
"enable_duplicate_without_keys_by_default" = "true",
"light_schema_change" = "true",
"store_row_column" = "true"
);
-- ods 层历史数据表
drop table if exists ods_stu_his;
create table if not exists ods_stu_his
(
md5 varchar(32),
id int,
name varchar(23),
cost varchar(12),
start_dt date default current_date ,
end_dt date default '9999-12-31'
)
engine=olap
unique key (md5,id ,name,cost,start_dt)
comment 'olap'
distributed by hash(`id`) buckets 2
properties(
"replication_num" = "1"
);
-- 存放从业务库获取变化的数据(新增/修改数据)
drop table if exists ods_stu_inc;
create table if not exists ods_stu_inc
(
md5 varchar(32),
id int,
name varchar(23),
cost varchar(12),
start_dt varchar(12),
end_dt varchar(12)
)
engine=olap
unique key (md5,id ,name,cost,start_dt)
comment 'olap'
distributed by hash(`id`) buckets 2
properties(
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"store_row_column" = "true"
);
python 脚本
会生成 一条 inset 语句,将从业务库获取的数据同步到 ods 层历史表中;
python3 doris.py ods_stu
# coding: utf-8
import pymysql
import sys
# 检查是否传递了足够的参数
if len(sys.argv) != 3 :
print('''
请传递两参数
第一个参数: 表名(不包含后缀)
第二个参数: 关联条件,判断数据唯一性,业务表中主键或唯一建(如果包含空格使用双引号包裹)
''')
sys.exit(1)
# 获取参数
# 获取传入表名
table_name = sys.argv[1].strip()
# 关联条件
where = sys.argv[2].strip()
# Doris数据库连接参数
doris_host = 'cdh002'
doris_port = 9030 # 注意:这里应该是整数
doris_user = 'root'
doris_password = ''
doris_db = 'test'
# 建立连接
conn = pymysql.connect(host=doris_host,
port=doris_port,
user=doris_user,
passwd=doris_password,
db=doris_db)
# 创建游标对象
cursor = conn.cursor()
# 编写您的SQL查询
query1 = '''select TABLE_NAME , GROUP_CONCAT(COLUMN_NAME ORDER BY ORDINAL_POSITION ),
GROUP_CONCAT(CONCAT('nvl(',COLUMN_NAME,',''---'') ',COLUMN_NAME) ORDER BY ORDINAL_POSITION ) COLUMN_NAME,
GROUP_CONCAT(CONCAT('nvl(',COLUMN_NAME,',''---'') ') ORDER BY ORDINAL_POSITION ) COLUMN_NAME1
from information_schema.columns where TABLE_SCHEMA = 'test' and TABLE_NAME =\''''
query2 = '''_for\' GROUP by TABLE_NAME ;'''
query = query1 + table_name + query2
#print(query)
# 执行查询
cursor.execute(query)
# 获取查询结果
rows = cursor.fetchall()
b=rows[0][1]
d=rows[0][3]
query2='insert into ' + table_name +'_his select * from (with a as(select md5sum(' + d +') md5,' + b + ' from ' + table_name + '_for EXCEPT select md5,' + b + ' from ' + table_name + '_his), b as (select md5,' + b + ',START_DT,CURRENT_DATE() END_DT from a RIGHT SEMI join '+ table_name + '_his osh on '+ where +' AND osh.end_dt = \'9999-12-31\' ) select md5,' + b + ' ,CURRENT_DATE() START_DT,\'9999-12-31\' END_DT from a union all select md5,' + b + ', START_DT,END_DT from b) c;'
print(query2)
'''
cursor.execute(query2)
rows = cursor.fetchall()
for row in rows:
print(row)
'''
# 关闭游标和连接
cursor.close()
conn.close()