日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

背景

在大數(shù)據(jù)時(shí)代,存在大量基于數(shù)據(jù)的業(yè)務(wù)。數(shù)據(jù)需要在不同的系統(tǒng)之間流動(dòng)、整合。通常,核心業(yè)務(wù)系統(tǒng)的數(shù)據(jù)存在OLTP數(shù)據(jù)庫(kù)系統(tǒng)中,其它業(yè)務(wù)系統(tǒng)需要獲取OLTP系統(tǒng)中的數(shù)據(jù)。傳統(tǒng)的數(shù)倉(cāng)通過批量數(shù)據(jù)同步的方式,定期從OLTP系統(tǒng)中抽取數(shù)據(jù)。但是隨著業(yè)務(wù)需求的升級(jí),批量同步無論從實(shí)時(shí)性,還是對(duì)在線OLTP系統(tǒng)的抽取壓力,都無法滿足要求。需要實(shí)時(shí)從OLTP系統(tǒng)中獲取數(shù)據(jù)變更,實(shí)時(shí)同步到下游業(yè)務(wù)系統(tǒng)。

本文基于Oracle OGG,介紹一種將Oracle數(shù)據(jù)庫(kù)的數(shù)據(jù)實(shí)時(shí)同步到Kafka消息隊(duì)列的方法。

Kafka是一種高效的消息隊(duì)列實(shí)現(xiàn),通過訂閱kafka的消息隊(duì)列,下游系統(tǒng)可以實(shí)時(shí)獲取在線Oracle系統(tǒng)的數(shù)據(jù)變更情況,實(shí)現(xiàn)業(yè)務(wù)系統(tǒng)。

環(huán)境介紹

組件版本

基于OGG 實(shí)現(xiàn)Oracle到Kafka增量數(shù)據(jù)實(shí)時(shí)同步

 


整體架構(gòu)圖

基于OGG 實(shí)現(xiàn)Oracle到Kafka增量數(shù)據(jù)實(shí)時(shí)同步

 

名詞解釋

1.OGG Manager

OGG Manager用于配置和管理其它OGG組件,配置數(shù)據(jù)抽取、數(shù)據(jù)推送、數(shù)據(jù)復(fù)制,啟動(dòng)和停止相關(guān)組件,查看相關(guān)組件的運(yùn)行情況。

2.數(shù)據(jù)抽取(Extract)

抽取源端數(shù)據(jù)庫(kù)的變更(DML, DDL)。數(shù)據(jù)抽取主要分如下幾種類型:

本地抽取
從本地?cái)?shù)據(jù)庫(kù)捕獲增量變更數(shù)據(jù),寫入到本地Trail文件

數(shù)據(jù)推送(Data Pump)
從本地Trail文件讀取數(shù)據(jù),推送到目標(biāo)端。

初始數(shù)據(jù)抽取
從數(shù)據(jù)庫(kù)表中導(dǎo)出全量數(shù)據(jù),用于初次數(shù)據(jù)加載

3.數(shù)據(jù)推送(Data Pump)

Data Pump是一種特殊的數(shù)據(jù)抽取(Extract)類型,從本地Trail文件中讀取數(shù)據(jù),并通過網(wǎng)絡(luò)將數(shù)據(jù)發(fā)送到目標(biāo)端OGG

4.Trail文件

數(shù)據(jù)抽取從源端數(shù)據(jù)庫(kù)抓取到的事物變更信息會(huì)寫入到Trail文件。

5.數(shù)據(jù)接收(Collector)

數(shù)據(jù)接收程序運(yùn)行在目標(biāo)端機(jī)器,用于接收Data Pump發(fā)送過來的Trail日志,并將數(shù)據(jù)寫入到本地Trail文件。

6.數(shù)據(jù)復(fù)制(Replicat)

數(shù)據(jù)復(fù)制運(yùn)行在目標(biāo)端機(jī)器,從Trail文件讀取數(shù)據(jù)變更,并將變更數(shù)據(jù)應(yīng)用到目標(biāo)端數(shù)據(jù)存儲(chǔ)系統(tǒng)。本案例中,數(shù)據(jù)復(fù)制將數(shù)據(jù)推送到kafka消息隊(duì)列。

7.檢查點(diǎn)(Checkpoint)

檢查點(diǎn)用于記錄數(shù)據(jù)庫(kù)事物變更。

 

操作步驟

源端Oracle配置

1.檢查歸檔

使用OGG,需要在源端開啟歸檔日志

SQL> archive log list;
 
    Database log mode              Archive Mode
 
    Automatic archival             Enabled
 
    Archive destination            /u01/App/oracle/product/12.2.0/db_1/dbs/arch
 
    Oldest online log sequence     2576
 
    Next log sequence to archive   2577
 
    Current log sequence           2577

2.檢查數(shù)據(jù)庫(kù)配置

SQL> select force_logging, supplemental_log_data_min from v$database;
 
FORCE_LOGG SUPPLEMENTAL_LOG_DATA_MI
 
---------- ------------------------
 
YES        YES

如果沒有開啟輔助日志,需要開啟:

SQL> alter database force logging;
 
SQL> alter database add supplemental log data;

3.開啟goldengate復(fù)制參數(shù)

SQL> alter system set enable_goldengate_replication = true;

4.創(chuàng)建源端Oracle賬號(hào)

SQL> create tablespace tbs_ogg datafile '/oradata/dtstack/tbs_ogg.dbf' size 1024M autoextend on;
 
SQL> create user ggsadmin identified by oracle default tablespace tbs_ogg;
 
SQL> grant dba to ggsadmin;

5.創(chuàng)建測(cè)試表

SQL> create table baiyang.ora_to_kfk as select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id < 500;
 
SQL> alter table baiyang.ora_to_kfk add constraint pk_kfk_obj primary key(object_id);
 
SQL> select count(*) from baiyang.ora_to_kfk;
    COUNT(*)
 
----------
 
        436

源端OGG配置

1.檢查源端OGG環(huán)境

cd /oradata/oggorcl/ogg
 
./ggsci
 
GGSCI (dtproxy) 1> info all
 
Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     STOPPED

2.創(chuàng)建相關(guān)文件夾

GGSCI (dtproxy) 2> create subdirs
 
    Creating subdirectories under current directory /oradata/oggorcl/ogg
 
   
    Parameter file                 /oradata/oggorcl/ogg/dirprm: created.
 
    Report file                    /oradata/oggorcl/ogg/dirrpt: created.
 
    Checkpoint file                /oradata/oggorcl/ogg/dirchk: created.
 
    Process status files           /oradata/oggorcl/ogg/dirpcs: created.
 
    SQL script files               /oradata/oggorcl/ogg/dirsql: created.
 
    Database definitions files     /oradata/oggorcl/ogg/dirdef: created.
 
    Extract data files             /oradata/oggorcl/ogg/dirdat: created.
 
    Temporary files                /oradata/oggorcl/ogg/dirtmp: created.
 
    Credential store files         /oradata/oggorcl/ogg/dircrd: created.
 
    Masterkey wallet files         /oradata/oggorcl/ogg/dirwlt: created.
 
    Dump files                     /oradata/oggorcl/ogg/dirdmp: created

3.配置源端Manager

GGSCI (dtproxy) 4> dblogin userid ggsadmin password oracle
 
    Successfully logged into database.
 
GGSCI (dtproxy as ggsadmin@dtstack) 5> edit param ./globals

添加

oggschema ggsadmin
 
GGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr

添加

PORT 7810 --默認(rèn)監(jiān)聽端口
 
DYNAMICPORTLIST  7811-7820 --動(dòng)態(tài)端口列表
 
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --進(jìn)程有問題,每3分鐘重啟一次,一共重啟五次
 
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7  --*/
 
LAGREPORTHOURS 1 --每隔一小時(shí)檢查一次傳輸延遲情況
 
LAGINFOMINUTES 30 --傳輸延時(shí)超過30分鐘將寫入錯(cuò)誤日志
 
LAGCRITICALMINUTES 45 --傳輸延時(shí)超過45分鐘將寫入警告日志
 
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 --定期清理trail文件
 
ACCESSRULE, PROG *, IPADDR 172.*.*.*, ALLOW --設(shè)定172網(wǎng)段可連接

添加同步的表

GGSCI (dtproxy as ggsadmin@dtstack) 9> add trandata baiyang.ora_to_kfk
 
Oracle Goldengate marked following column as key columns on table BAIYANG.ORA_TO_KFK: OBJECT_ID.
 
GGSCI (dtproxy as ggsadmin@dtstack) 10> info trandata baiyang.ora_to_kfk
 
Prepared CSN for table BAIYANG.ORA_TO_KFK: 192881239

目標(biāo)端OGG配置
1.目標(biāo)端檢查環(huán)境

GGSCI (172-16-101-242) 1> info all
 
    Program     Status      Group       Lag at Chkpt  Time Since Chkpt
 
    MANAGER     STOPPED 

2.創(chuàng)建目錄

GGSCI (172-16-101-242) 2> create subdirs
 
    Creating subdirectories under current directory /app/ogg
 
    Parameter file                 /app/ogg/dirprm: created.
 
    Report file                    /app/ogg/dirrpt: created.
 
    Checkpoint file                /app/ogg/dirchk: created.
 
    Process status files           /app/ogg/dirpcs: created.
 
    SQL script files               /app/ogg/dirsql: created.
 
    Database definitions files     /app/ogg/dirdef: created.
 
    Extract data files             /app/ogg/dirdat: created.
 
    Temporary files                /app/ogg/dirtmp: created.
 
    Credential store files         /app/ogg/dircrd: created.
 
    Masterkey wallet files         /app/ogg/dirwlt: created.
 
Dump files                     /app/ogg/dirdmp: created.

3.目標(biāo)端Manager配置

GGSCI (172-16-101-242) 3> edit params mgr

添加

PORT 7810
 
    DYNAMICPORTLIST 7811-7820
 
    AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
 
    PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
    
    GGSCI (172-16-101-242) 4> edit  param  ./GLOBALS
CHECKPOINTTABLE ggsadmin.checkpoint


全量數(shù)據(jù)同步

1.配置源端數(shù)據(jù)初始化

配置源端初始化進(jìn)程

GGSCI (dtproxy as ggsadmin@dtstack) 15> add extract initkfk,sourceistable
 

配置源端初始化參數(shù)

GGSCI (dtproxy as ggsadmin@dtstack) 16> edit params initkfk

添加

EXTRACT initkfk
    SETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
    USERID ggsadmin,PASSWORD oracle
    RMTHOST 172.16.101.242, MGRPORT 7810
    RMTFILE ./dirdat/ekfk,maxfiles 999, megabytes 500
table baiyang.ora_to_kfk;


2.源端生成表結(jié)構(gòu)define文件

GGSCI (dtproxy as ggsadmin@dtstack) 17> edit param define_kfk

添加

defsfile /oradata/oggorcl/ogg/dirdef/define_kfk.txt
 
    userid ggsadmin,password oracle
 
    table baiyang.ora_to_kfk;

執(zhí)行

$./defgen paramfile dirprm/define_kfk.prm
 
-- Definitions generated for 1 table in /oradata/oggorcl/ogg/dirdef/define_kfk.txt

將此文件傳輸?shù)侥繕?biāo)段dirdef文件夾

scp /oradata/oggorcl/ogg/dirdef/define_kfk.txt 172.16.101.242:/app/ogg/dirdef/define_kfk.txt

3.配置目標(biāo)端數(shù)據(jù)初始化進(jìn)程

配置目標(biāo)端初始化進(jìn)程

GGSCI (172-16-101-242) 3> ADD replicat initkfk,specialrun
 
GGSCI (172-16-101-242) 6> edit params initkfk

添加

SPECIALRUN
 
    end runtime
 
    setenv(NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
 
    targetdb libfile libggJAVA.so set property=./dirprm/kafka.props
 
    SOURCEDEFS ./dirdef/define_kfk.txt
 
    EXTFILE ./dirdat/ekfk000000
 
    reportcount every 1 minutes, rate
 
    grouptransops 10000
 
map baiyang.ora_to_kfk,target baiyang.ora_to_kfk;


4.配置kafka相關(guān)參數(shù)
vi ./dirprm/kafka.props

添加

gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=test_ogg
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/data/kafka_2.12-2.2.0/libs/*:/app/ogg/:/app/ogg/lib/*  --*/

vi custom_kafka_producer.properties

添加

bootstrap.servers=172.16.101.242:9092
 
acks=1
 
compression.type=gzip
 
reconnect.backoff.ms=1000
 
value.serializer=org.Apache.kafka.common.serialization.ByteArraySerializer
 
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
 
batch.size=102400
 
linger.ms=10000


5.源端開啟全量數(shù)據(jù)抽取
源端

GGSCI (dtproxy) 20>  start mgr
 
GGSCI (dtproxy) 21>  start initkfk

6.目標(biāo)端全量數(shù)據(jù)應(yīng)用

GGSCI (172-16-101-242) 13> start mgr
 
./replicat paramfile ./dirprm/initkfk.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD

7.kafka數(shù)據(jù)驗(yàn)證

使用kafka客戶端工具查看topic的數(shù)據(jù)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_ogg --from-beginning
 
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:55.946000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"C_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":2,"DATA_OBJECT_ID":2,"OBJECT_TYPE":"CLUSTER"}}
 
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:56.289000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"I_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":3,"DATA_OBJECT_ID":3,"OBJECT_TYPE":"INDEX"}}

全量數(shù)據(jù)已經(jīng)同步到目標(biāo)kafka topic
增量數(shù)據(jù)同步

1.源端抽取進(jìn)程配置

GGSCI (dtproxy) 9> edit param extkfk

添加

dynamicresolution
 
SETENV (ORACLE_SID = "dtstack")
 
SETENV (NLS_LANG = "american_america.AL32UTF8")
 
userid ggsadmin,password oracle
 
exttrail ./dirdat/to
 
table baiyang.ora_to_kfk;

添加extract進(jìn)程

GGSCI (dtproxy) 10> add extract extkfk,tranlog,begin now

添加trail文件的定義與extract進(jìn)程綁定

GGSCI (dtproxy) 11> add exttrail ./dirdat/to,extract extkfk

2.源端數(shù)據(jù)推送進(jìn)程配置

配置源端推送進(jìn)程

GGSCI (dtproxy) 12> edit param pupkfk

添加

extract pupkfk
 
passthru
 
dynamicresolution
 
userid ggsadmin,password oracle
 
rmthost 172.16.101.242 mgrport 7810
 
rmttrail ./dirdat/to
 
table baiyang.ora_to_kfk;

添加extract進(jìn)程

GGSCI (dtproxy) 13>  add extract pupkfk,exttrailsource /oradata/oggorcl/ogg/dirdat/to

添加trail文件的定義與extract進(jìn)程綁定

GGSCI (dtproxy) 14>  add rmttrail ./dirdat/to,extract pupkfk

3.配置目標(biāo)端恢復(fù)進(jìn)程

配置目標(biāo)端恢復(fù)進(jìn)程

edit param repkfk

添加

REPLICAT repkfk
 
SOURCEDEFS ./dirdef/define_kfk.txt
 
targetdb libfile libggjava.so set property=./dirprm/kafka.props
 
REPORTCOUNT EVERY 1 MINUTES, RATE
 
GROUPTRANSOPS 10000
 
MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk;

添加trail文件到replicate進(jìn)程

add replicat repkfk exttrail ./dirdat/to,checkpointtable ggsadmin.checkpoint

4.源端開啟實(shí)時(shí)數(shù)據(jù)抓取

./ggsci
 
GGSCI (dtproxy) 5> start extkfk
 
Sending START request to MANAGER ...
 
EXTRACT EXTKFK starting

GGSCI (dtproxy) 6> start pupkfk
 
Sending START request to MANAGER ...
 
EXTRACT PUPKFK starting
  
GGSCI (dtproxy) 7> status all
 
Program     Status      Group       Lag at Chkpt  Time Since Chkpt
 
MANAGER     RUNNING
 
EXTRACT     RUNNING     EXTKFK      00:00:00      00:00:10
 
EXTRACT     RUNNING     PUPKFK      00:00:00      00:00:00

5.目標(biāo)端開啟實(shí)時(shí)數(shù)據(jù)同步

./ggsci
 
GGSCI (172-16-101-242) 7> start replicat repkfk
 
Sending START request to MANAGER ...
 
REPLICAT REPKFK starting
  
GGSCI (172-16-101-242) 8> info all
 
Program     Status      Group       Lag at Chkpt  Time Since Chkpt
  
MANAGER     RUNNING
 
REPLICAT    RUNNING     REPKFK      00:00:00      00:00:00

6.測(cè)試增量數(shù)據(jù)同步

Oracle插入增量數(shù)據(jù)

SQL> insert into baiyang.ora_to_kfk  select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id >500 and  object_id < 1000;
 
SQL> commit;
 
SQL> select count(*) from baiyang.ora_to_kfk;
COUNT(*)
 
----------
 
    905

查看Kafka消息隊(duì)列消費(fèi)數(shù)據(jù)

{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042000","pos":"00000000000000075298","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS","SUBOBJECT_NAME":null,"OBJECT_ID":998,"DATA_OBJECT_ID":998,"OBJECT_TYPE":"TABLE"}}
 
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042001","pos":"00000000000000075459","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS_I","SUBOBJECT_NAME":null,"OBJECT_ID":999,"DATA_OBJECT_ID":999,"OBJECT_TYPE":"INDEX"}}

源端Oracle刪除數(shù)據(jù)

SQL> delete from baiyang.ora_to_kfk ;
 
906 rows deleted.
 
SQL> commit;

查看kafka消息隊(duì)列消費(fèi)數(shù)據(jù)

{"table":"BAIYANG.ORA_TO_KFK","op_type":"D","op_ts":"2019-11-11 21:13:11.166184","current_ts":"2019-11-11T21:13:17.449007","pos":"00000000000000216645","before":{"OWNER":"x1","OBJECT_NAME":"SSSSS","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}

源端插入數(shù)據(jù)

SQL> insert into  baiyang.ora_to_kfk values('漢字', 'y1', 'z1', 111000,2000,'x1');
 
1 row created.
 
SQL> commit;

查看kafka消息隊(duì)列消費(fèi)數(shù)據(jù)

{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:14:21.167454","current_ts":"2019-11-11T21:14:26.497000","pos":"00000000000000216794","after":{"OWNER":"漢字","OBJECT_NAME":"y1","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}

總結(jié)

使用OGG可以方便地將Oracle的數(shù)據(jù)變更情況實(shí)時(shí)同步到Kafka消息隊(duì)列。下游業(yè)務(wù)系統(tǒng)通過訂閱kafka的消息隊(duì)列,能方便地實(shí)現(xiàn)各類實(shí)時(shí)數(shù)據(jù)的應(yīng)用。

分享到:
標(biāo)簽:Oracle
用戶無頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定