如何使用ogg将Oracle数据传输到flume刷到kafka
本篇内容主要讲解“如何使用ogg将Oracle数据传输到flume刷到kafka”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何使用ogg将Oracle数据传输到flume刷到kafka”吧!
本篇内容主要讲解“如何使用ogg将Oracle数据传输到flume刷到kafka”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何使用ogg将Oracle数据传输到flume刷到kafka”吧!
为确山等地区用户提供了全套网页设计制作服务,及确山网站建设行业解决方案。主营业务为网站设计制作、网站建设、确山网站设计,以传统方式定制建设网站,并提供域名空间备案等一条龙服务,秉承以专业、用心的态度为用户提供真诚的服务。我们深信只要达到每一位用户的要求,就会得到认可,从而选择与我们长期合作。这样,我们也可以走得更远!
源端测试:
服务器环境部署:
命令步骤如下:
[root@test ~]# groupadd oinstall
[root@test ~]# groupadd dba
[root@test ~]# useradd -g oinstall -G dba oracle
[root@test ~]#
修改权限:
[root@test ~]# chown -R oracle:oinstall /data
[root@test ~]#
2. 设置全局java环境变量
[root@test ~]# cat /etc/redhat-release
CentOS release 6.4 (Final)
[root@test ~]#
[oracle@test data]$ tar -zxvf jdk-8u60-linux-x64.tar.gz
在root下执行配置:
设置java环境变量:
vi /etc/profile
###jdk
export JAVA_HOME=/data/jdk1.8.0_60
export JAVA_BIN=/data/jdk1.8.0_60/bin
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME JAVA_BIN PATH CLASSPATH
export LD_LIBRARY_PATH=/data/jdk1.8.0_60/jre/lib/amd64/server:$LD_LIBRARY_PATH
切换Oracle用户核对:
[root@test ~]# su - oracle
[oracle@test ~]$ java -version
java version "1.8.0_60"
Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)
[oracle@test ~]$
如果不生效:
修改java环境变量:
alternatives --install /usr/bin/java java /data/jdk1.8.0_60/bin/java 100
alternatives --install /usr/bin/jar jar /data/jdk1.8.0_60/bin/jar 100
alternatives --install /usr/bin/javac javac /data/jdk1.8.0_60/bin/javac 100
update-alternatives --install /usr/bin/javac javac /data/jdk1.8.0_60/bin/javac 100
# /usr/sbin/alternatives --config java
[root@test1 data]# /usr/sbin/alternatives --config java
There are 4 programs which provide 'java'.
Selection Command
-----------------------------------------------
1 /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java
*+ 2 /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java
3 /usr/lib/jvm/jre-1.5.0-gcj/bin/java
4 /data/jdk1.8.0_60/bin/java
Enter to keep the current selection[+], or type selection number: 4
[root@test1 data]# /usr/sbin/alternatives --config java
There are 4 programs which provide 'java'.
Selection Command
-----------------------------------------------
1 /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java
* 2 /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java
3 /usr/lib/jvm/jre-1.5.0-gcj/bin/java
+ 4 /data/jdk1.8.0_60/bin/java
Enter to keep the current selection[+], or type selection number:
[root@test1 data]#
[root@test1 data]# java -version
java version "1.8.0_60"
Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)
[root@test1 data]#
修改flume 参数配置:
[oracle@test1 conf]$ cat flume-conf.properties
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
agent.sources = r1
agent.channels = fileChannel
agent.sinks = kafkaSink
# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = seq
# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = fileChannel
#
agent.sources.r1.type = avro
agent.sources.r1.port = 14141
agent.sources.r1.bind = 192.168.88.66
agent.sources.r1.channels = fileChannel
# Each sink's type must be defined
agent.sinks.loggerSink.type = logger
#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel
#kafka sink
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.topic = my_schema
agent.sinks.kafkaSink.brokerList = 192.168.88.1:9092,192.168.88.2:9092,192.168.88.3:9092,192.168.88.4:9092
agent.sinks.kafkaSink.requiredAcks = 1
agent.sinks.kafkaSink.batchSize = 20
agent.sinks.kafkaSink.channel = fileChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100
#File Channel
agent.channels.fileChannel.type = file
agent.channels.fileChannel.transactionCapacity = 20000000
agent.channels.fileChannel.capacity = 50000000
agent.channels.fileChannel.maxFileSize = 2147483648
agent.channels.fileChannel.minimumRequiredSpace = 52428800
agent.channels.fileChannel.keep-alive = 3
agent.channels.fileChannel.checkpointInterval = 20000
agent.channels.fileChannel.checkpointDir = /data/apache-flume-1.6.0-bin/CheckpointDir
agent.channels.fileChannel.dataDirs = /data/apache-flume-1.6.0-bin/DataDir
[oracle@test1 conf]$
############配置OGG
主库在
源库创建新的抽取进程:
dblogin userid goldengate, password goldengate
add extract EXTJMS,tranlog, threads 2,begin now
add exttrail /data/goldengate/dirdat/kf, extract EXTJMS megabytes 200
add schematrandata my_schema
add trandata my_schema.*
原抽取进程:
extract EXTJMS
setenv (ORACLE_SID="testdb")
setenv (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
userid goldengate, password goldengate
TRANLOGOPTIONS DBLOGREADER
exttrail /data/goldengate/dirdat/kf
discardfile /data/goldengate/dirrpt/EXTJMS.dsc,append
THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000
numfiles 3000
CHECKPOINTSECS 20
DISCARDROLLOVER AT 05:30
dynamicresolution
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
RecoveryOptions OverwriteMode
ddl &
include mapped &
exclude objtype 'TRIGGER' &
exclude objtype 'PROCEDURE' &
exclude objtype 'FUNCTION' &
exclude objtype 'PACKAGE' &
exclude objtype 'PACKAGE BODY' &
exclude objtype 'TYPE' &
exclude objtype 'GRANT' &
exclude instr 'GRANT' &
exclude objtype 'DATABASE LINK' &
exclude objtype 'CONSTRAINT' &
exclude objtype 'JOB' &
exclude instr 'ALTER SESSION' &
exclude INSTR 'AS SELECT' &
exclude INSTR 'REPLACE SYNONYM' &
EXCLUDE OBJNAME "my_schema.DBMS_TABCOMP_TEMP_CMP" &
EXCLUDE OBJNAME "my_schema.DBMS_TABCOMP_TEMP_UNCMP"
FETCHOPTIONS NOUSESNAPSHOT, USELATESTVERSION, MISSINGROW REPORT
TABLEEXCLUDE *.DBMS_TABCOMP_TEMP*;
--extract table user
TABLE my_schema.*;
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE INDEX) COLUMNS;
Database altered.
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (all) COLUMNS;
Database altered.
SQL> select SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI ,FORCE_LOGGING from v$database;
SUPPLEME SUP SUP FOR
-------- --- --- ---
YES YES YES YES
SQL>
源端添加新的pump进程:
在my_schema源库测试添加pump进程:
添加pump进程:
添加新的pump:
add extract EDPKF,exttrailsource /data/goldengate/dirdat/kf, begin now
edit param EDPKF
EXTRACT EDPKF
setenv (NLS_LANG = AMERICAN_AMERICA.AL32UTF8)
PASSTHRU
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
RecoveryOptions OverwriteMode
RMTHOST 192.168.88.66, MGRPORT 7839
RMTTRAIL /data/ogg_for_bigdata/dirdat/kp
DISCARDFILE ./dirrpt/EDPKF.dsc,APPEND,MEGABYTES 5
TABLE my_schema.* ;
add rmttrail /data/ogg_for_bigdata/dirdat/kp, extract EDPKF megabytes 200
edit param defgen
userid goldengate, password goldengate
defsfile dirdef/my_schema.def
TABLE my_schema.*;
传递定义文件:
./defgen paramfile ./dirprm/defgen.prm
目标端直接端
mgr:
PORT 7839
DYNAMICPORTLIST 7840-7850
--AUTOSTART replicat *
--AUTORESTART replicat *,RETRIES 5,WAITMINUTES 2
AUTORESTART ER *, RETRIES 3, WAITMINUTES 5, RESETMINUTES 10
PURGEOLDEXTRACTS /data/ogg_for_bigdata/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
添加 UE DATA PUMP:
使用版本:
Version 12.1.2.1.4 20470586 OGGCORE_12.1.2.1.0OGGBP_PLATFORMS_150303.1209
ADD EXTRACT LOANFLM, EXTTRAILSOURCE /data/ogg_for_bigdata/dirdat/kp
edit param JMSFLM
GGSCI (localhost.localdomain) 18> view param JMSFLM
EXTRACT JMSFLM
SETENV (GGS_USEREXIT_CONF ="dirprm/JMSFLM.props")
GetEnv (JAVA_HOME)
GetEnv (PATH)
GetEnv (LD_LIBRARY_PATH)
SourceDefs dirdef/my_schema.def
CUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBefores
GetUpdateBefores
NoCompressDeletes
NoCompressUpdates
NoTcpSourceTimer
TABLEEXCLUDE my_schema.MV*;
TABLE my_schema.*;
--alter prodjms extseqno 736, extrba 0
注释: 在目标端完全可以不安装Oracle数据库,可以和flume环境放在一起,最终刷数据到kafka的服务器接收消息。
本案例是 通过flume中转实现的,完全没有问题。
当然也可以直接将数据传输到kafka处理消息,原理都是一样的。
未来更多的大数据融合也是一个不错的方案,无论是,,hdfs等都可以完美结合。
参数文件:
$ cat JMSFLM.props
gg.handlerlist=flumehandler
gg.handler.flumehandler.type=com.goldengate.delivery.handler.flume.FlumeHandler
gg.handler.flumehandler.host=192.168.88.66
gg.handler.flumehandler.port=14141
gg.handler.flumehandler.rpcType=avro
gg.handler.flumehandler.delimiter=\u0001
gg.handler.flumehandler.mode=op
gg.handler.flumehandler.includeOpType=true
# Indicates if the operation timestamp should be included as part of output in the delimited separated values
# true - Operation timestamp will be included in the output
# false - Operation timestamp will not be included in the output
# Default :- true
#gg.handler.flumehandler.includeOpTimestamp=true
#gg.handler.name.deleteOpKey=D
#gg.handler.name.updateOpKey=U
#gg.handler.name.insertOpKey=I
#gg.handler.name.pKUpdateOpKey=P
#gg.handler.name.includeOpType=true
# Optional properties to use the transaction grouping functionality
#gg.handler.flumehandler.maxGroupSize=1000
#gg.handler.flumehandler.minGroupSize=1000
### native library config ###
goldengate.userexit.nochkpt=TRUE
goldengate.userexit.timestamp=utc
goldengate.log.logname=cuserexit
goldengate.log.level=DEBUG
goldengate.log.tofile=true
goldengate.userexit.writers=javawriter
goldengate.log.level.JAVAUSEREXIT=DEBUG
#gg.brokentrail=true
gg.report.time=30sec
gg.classpath=/data/ogg_for_bigdata/dirprm/flumejar/*:/data/apache-flume-1.6.0-bin/lib/*
javawriter.stats.full=TRUE
javawriter.stats.display=TRUE
javawriter.bootoptions=-Xmx81920m -Xms20480m -Djava.class.path=/data/ogg_for_bigdata/ggjava/ggjava.jar -Dlog4j.configuration=/data/ogg_for_bigdata/cfg/log4j.properties
本文名称:如何使用ogg将Oracle数据传输到flume刷到kafka
转载来源:http://myzitong.com/article/cgddpc.html