重庆网站整合营销,合肥网络推广公司哪家专业,化妆品营销型网站案例,电商素材网站一、OceanBase 数据库核心配置
1. 环境准备与版本要求
版本要求#xff1a;OceanBase CE 4.0 或 OceanBase EE 2.2组件依赖#xff1a;需部署 LogProxy 服务#xff08;社区版/企业版部署方式不同#xff09;兼容模式#xff1a;支持 MySQL 模式#xff08;默认#x…一、OceanBase 数据库核心配置
1. 环境准备与版本要求
版本要求OceanBase CE 4.0 或 OceanBase EE 2.2组件依赖需部署 LogProxy 服务社区版/企业版部署方式不同兼容模式支持 MySQL 模式默认和 Oracle 模式
2. 创建用户与权限配置
在 sys 租户创建管理用户社区版示例
-- 连接 sys 租户默认端口 2881
mysql -h127.0.0.1 -P2881 -urootsys -p-- 创建用户替换为实际用户名密码
CREATE USER ob_cdc_user IDENTIFIED BY Ob123456;
GRANT ALL PRIVILEGES ON *.* TO ob_cdc_user WITH GRANT OPTION;
FLUSH PRIVILEGES;在业务租户创建 CDC 用户
-- 切换到业务租户如 test_tenant
USE test_tenant;-- 创建 CDC 数据读取用户
CREATE USER flink_user IDENTIFIED BY Flink123;
GRANT SELECT ON test_db.* TO flink_user;
FLUSH PRIVILEGES;3. 获取关键配置信息
社区版获取 rootserver-list
-- 连接业务租户
mysql -h127.0.0.1 -P2881 -uflink_user -p-- 查询 rootserver 列表格式ip:rpc_port:sql_port
SHOW PARAMETERS LIKE rootservice_list;
-- 示例输出rootservice_list | 127.0.0.1:2882:2881企业版获取 config-url
SHOW PARAMETERS LIKE obconfig_url;
-- 示例输出obconfig_url | http://127.0.0.1:8080/services?ActionObRootServiceInfo...4. 部署 LogProxy 服务社区版快速启动
# 下载 LogProxy 二进制社区版
wget https://github.com/oceanbase/oblogproxy/releases/download/v2.2.7/oblogproxy-2.2.7.tar.gz
tar -zxvf oblogproxy-2.2.7.tar.gz# 编辑配置文件 oblogproxy.conf
vi oblogproxy/oblogproxy.conf
# 添加以下配置根据实际情况修改
[common]
rootservice_list 127.0.0.1:2882:2881
logproxy_port 2983
working_mode memory# 启动 LogProxy
cd oblogproxy
./oblogproxy -c oblogproxy.conf二、Flink 环境集成配置
1. 添加Maven依赖
!-- OceanBase CDC 连接器 --
dependencygroupIdcom.ververica/groupIdartifactIdflink-sql-connector-oceanbase-cdc/artifactIdversion3.0.1/versionscopeprovided/scope
/dependency!-- 企业版需添加OceanBase JDBC驱动 --
dependencygroupIdcom.oceanbase/groupIdartifactIdoceanbase-client/artifactIdversion2.4.2/version
/dependency2. SQL Client部署
下载 CDC 连接器 JAR flink-sql-connector-oceanbase-cdc-3.0.1.jar企业版需额外下载 OceanBase JDBC 驱动 oceanbase-client-2.4.2.jar将 JAR 包放入 $FLINK_HOME/lib/ 后重启 Flink 集群。
三、Flink SQL 表定义与参数详解
1. MySQL 模式建表示例含元数据
-- 配置checkpoint
SET execution.checkpointing.interval 5s;-- 创建OceanBase CDC表MySQL模式
CREATE TABLE ob_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),-- 元数据列tenant_name STRING METADATA FROM tenant_name VIRTUAL,db_name STRING METADATA FROM database_name VIRTUAL,table_name STRING METADATA FROM table_name VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM op_ts VIRTUAL,PRIMARY KEY(order_id) NOT ENFORCED
) WITH (connector oceanbase-cdc,scan.startup.mode initial,username flink_usertest_tenant#ob_cluster,password Flink123,tenant-name test_tenant,database-name test_db,table-name orders,hostname 127.0.0.1,port 2881,rootserver-list 127.0.0.1:2882:2881, -- 社区版必填logproxy.host 127.0.0.1,logproxy.port 2983,working-mode memory
);2. Oracle 模式建表示例
CREATE TABLE ob_orders_oracle (order_id INT,order_date TIMESTAMP(0),customer_name STRING,-- 元数据列tenant_name STRING METADATA FROM tenant_name VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM op_ts VIRTUAL
) WITH (connector oceanbase-cdc,scan.startup.mode initial,username flink_usertest_tenant#ob_cluster,password Flink123,tenant-name test_tenant,database-name test_db,table-name orders,hostname 127.0.0.1,port 2881,compatible-mode oracle, -- 关键设置Oracle兼容模式jdbc.driver com.oceanbase.jdbc.Driver, -- 企业版JDBC驱动config-url http://127.0.0.1:8080/..., -- 企业版必填logproxy.host 127.0.0.1,logproxy.port 2983
);3. 核心参数详解
参数名必选默认值类型说明connector是无String固定为oceanbase-cdcscan.startup.mode是无String启动模式initial快照日志、latest-offset仅最新、timestamp指定时间tenant-name是无String目标租户名称如test_tenantlogproxy.host是无StringLogProxy 服务IPlogproxy.port是无IntegerLogProxy 服务端口默认2983rootserver-list社区版是无String社区版rootserver列表格式ip:rpc_port:sql_portconfig-url企业版是无String企业版配置服务URLcompatible-mode否mysqlString兼容模式mysql默认、oraclejdbc.driver企业版是com.mysql.jdbc.DriverString企业版JDBC驱动类com.oceanbase.jdbc.Driver
四、环境验证与测试
1. 准备测试数据OceanBase MySQL模式
-- 连接业务租户
mysql -h127.0.0.1 -P2881 -uflink_user -p test_db-- 创建测试表
CREATE TABLE orders (order_id INT PRIMARY KEY,order_date TIMESTAMP,customer_name VARCHAR(100),price DECIMAL(10, 2)
);-- 插入数据
INSERT INTO orders VALUES
(1, 2023-01-01 10:00:00, Alice, 100.50),
(2, 2023-01-02 11:00:00, Bob, 200.75);
COMMIT;2. Flink SQL 验证
-- 查询OceanBase CDC表首次触发快照
SELECT * FROM ob_orders;-- 在OceanBase中更新数据
UPDATE orders SET price 150.00 WHERE order_id 1;
COMMIT;-- 观察Flink输出应显示变更记录op_ts为变更时间3. DataStream API 验证社区版
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.connectors.oceanbase.OceanBaseSource;
import org.apache.flink.cdc.connectors.oceanbase.source.RowDataOceanBaseDeserializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.VarcharType;public class OceanBaseSourceExample {public static void main(String[] args) throws Exception {// 定义表结构RowType physicalType RowType.of(RowType.Field.of(order_id, BigIntType.INSTANCE),RowType.Field.of(customer_name, VarcharType.of(100)));InternalTypeInfoRowData typeInfo InternalTypeInfo.of(physicalType);// 配置OceanBase SourceOceanBaseSourceRowData source OceanBaseSource.RowDatabuilder().rsList(127.0.0.1:2882:2881) // 社区版rootserver-list.startupMode(StartupMode.INITIAL).username(flink_usertest_tenant#ob_cluster).password(Flink123).tenantName(test_tenant).databaseName(test_db).tableName(orders).hostname(127.0.0.1).port(2881).logProxyHost(127.0.0.1).logProxyPort(2983).deserializer(RowDataOceanBaseDeserializationSchema.newBuilder().setPhysicalRowType(physicalType).setResultTypeInfo(typeInfo).build()).build();StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000);env.fromSource(source, null, OceanBase CDC Source).print();env.execute(OceanBase CDC Test);}
}五、常见问题与解决方案 LogProxy连接失败 ERROR: Failed to connect to LogProxy at 127.0.0.1:2983解决方案 确认LogProxy服务已启动且端口正确netstat -an | grep 2983检查logproxy.host和logproxy.port配置是否与LogProxy一致 权限不足社区版 ERROR: Access denied for user flink_user127.0.0.1解决方案 确认用户在业务租户有SELECT权限检查用户名格式是否正确usertenant#cluster 企业版Oracle模式配置错误 ERROR: incompatible-mode must be set for Oracle mode解决方案 显式设置compatible-mode oracle确保已添加oceanbase-client依赖并部署JDBC驱动 时间戳转换异常 解决方案显式设置时区server-time-zone Asia/Shanghai六、生产环境优化建议 LogProxy性能调优 设置working-mode memory内存模式适合高频变更调整obcdc.properties.batch_size如1024优化批量处理 高可用配置 部署多节点LogProxyFlink配置多个logproxy.host逗号分隔企业版使用config-url自动发现OB集群节点 监控与清理 定期清理LogProxy内存数据working-mode memory时# 重启LogProxy或通过API清理通过以上步骤可完成Flink OceanBase CDC的全流程配置与验证。生产环境中需特别注意社区版与企业版的配置差异、LogProxy服务稳定性及兼容模式的正确设置以确保数据一致性和系统稳定性。