MySql Oracle数据互导简例(P

发布时间:2019-08-26 07:20:45编辑:auto阅读(1512)

    工作需要,简单的写了一个互导的小功能,对于量大的数据处理还有待优化

    多的不说了,直接上代码,需要的可以看看代码注释

    欢迎批评指正 :)


    ##coding=utf8
    #单线程,分批导入
    #Author : Jeen @ 2013-7
    import os,sys,MySQLdb,cx_Oracle,math,time  # python version 2.7.5
    os.environ['NLS_LANG']="AMERICAN_AMERICA.AL32UTF8"
    reload(sys)
    sys.setdefaultencoding('utf8')
    fromto = 'mysql2oracle' # mysql2oracle : Export Data FROM Mysql To Oracle
    #fromto = 'oracle2mysql' # oracle2mysql : Export Data FROM Oracle To Mysql
    from_tables = [ #导出的 表 名称
    'P1_WJ_TEST_LANG_SHOP',
    'P1_WJ_TEST_LANG_PRODUCT',
    'P1_WJ_TEST_LANG_PRODUCT_IMAGE'
    ] # Data Export From
    to_tables = from_tables # Data Import To #导入的 表 名称
    #注意:表名需按顺序对应 表名可以不一样  但表结构必须一致  如果需要保留 ID 请修改 get_columns 方法中的相关字符串替换代码
    rows_limit = 1000    #分批导入  建议设置不要超过 8000
    time_sleep = 1      #分批导入之后等待  n 秒
    #-Mysql Database Config
    mysql_host = '127.0.0.1'
    mysql_user = 'spider'
    mysql_pass = 'spider'
    mysql_dbname = 'spider'
    mysql_port = 3306
    #-Oracle Database Config
    oracle_host = '192.168.1.222'
    oracle_user = 'spider'
    oracle_pass = 'spider'
    oracle_sid = 'xe'#服务名
    oracle_port = 1521
    def get_columns(results,coltype) :  #构建部分查询语句的方法
            if len(results) == 0 : return ''
            t = ''
            for result in results :
                t += ', `'+''.join(result) +'`' #mysql 中需要用到的特殊字符
            mysql_columns = t.replace(', `ID`, ','') #移除自增的 ID 列
            oracle_columns = mysql_columns.replace('`','') #Oracle中  去除特殊符号
            if coltype == 'from_mysql' :
                return mysql_columns
            elif coltype == 'from_oracle' :
                return oracle_columns
            elif coltype == 'to_mysql' :
                t = oracle_columns.split(',')
                for i in range(len(t)) :
                    t[i] = '%s'
                return  ' ('+mysql_columns+') values ('+','.join(t)+') ' #构建mysql 插入 SQL 片段
            elif coltype == 'to_oracle' :
                t = oracle_columns.split(',')
                for i in range(len(t)) :
                    t[i] = ':'+str(i)
                return  ' ('+oracle_columns+') values ('+','.join(t)+') ' #构建Oracle 插入 SQL 片段
            else :
                return ''
    try :
        # connect to Mysql
        mysql_conn = MySQLdb.connect(host=mysql_host,user=mysql_user,passwd=mysql_pass,port=mysql_port,db=mysql_dbname,charset='utf8')
        # connect to Oracle
        oracle_dsn = cx_Oracle.makedsn(oracle_host,oracle_port,oracle_sid)
        oracle_conn = cx_Oracle.connect(oracle_user,oracle_pass,oracle_dsn)
        #完成数据库连接
        if fromto == 'mysql2oracle' : # Data from Mysql to Oracle
            for i in range(len(from_tables)) : #循环载入需要导的表
                from_tb = from_tables[i] #导出表
                to_tb = to_tables[i] #导入表
                mysql_cur = mysql_conn.cursor()
                #sql = "select COLUMN_NAME from information_schema.columns where table_name='"+from_tb+"'"
                # 获取 mysql 表 列名, 如希望使用上述SQL方法  请重构 columns_name 的值
                sql = "select * from "+from_tb+" limit 0"
                mysql_cur.execute(sql); #print mysql_cur.description
                columns_name = [col[0] for col in mysql_cur.description] ; #print columns_name
                columns = get_columns(columns_name,'from_mysql'); #print columns
                if columns == '' : raise EOFError('columns error')
                sql = "select count(*) from "+from_tb; #print sql;#统计行数
                mysql_cur.execute(sql)
                results = mysql_cur.fetchone()
                rows_count = results[0]
                mysql_cur.close()
                print 'Total %s rows from Table:%s found in Mysql:%s .\r\nThis program will select %s time(s)(limit %s rows per time) \r\n ... | Start ...' % (rows_count,from_tb,mysql_dbname,int(math.ceil(float(rows_count)/rows_limit)),rows_limit)
                j = 0
                #分批导入
                while j*rows_limit < rows_count :
                    mysql_cur = mysql_conn.cursor()
                    #构建查询语句
                    sql = "select %s from %s limit %s,%s" % (columns,from_tb,j*rows_limit,rows_limit);  #print sql;
                    mysql_cur.execute(sql) #执行查询
                    results = mysql_cur.fetchall() #保存查询结果
                    mysql_cur.close()
                    for record in results: #循环读取查询结果
                        oracle_cur = oracle_conn.cursor()
                        sql = "insert into " + to_tb + get_columns(columns_name,'to_oracle') #构建插入语句
                        oracle_cur.execute(sql,record)  #执行插入
                        oracle_conn.commit() #提交
                        oracle_cur.close()
                    j += 1; print 'The (%s)th import end. (%s)rows..' % (j,len(results))
                    time.sleep(time_sleep) #wait for several seconds
                print 'Data import to table:' + to_tb + ' in Oracle:'+oracle_sid+'    |  Complete ~!  \r\n '
            print '\r\nAll Tables Import Complete ~!' #完成导入
                                         
        #以下部分为 Oracle 导入到Mysql的 逻辑代码, 请直接忽略
        elif fromto == 'oracle2mysql' : # Data from Oracle to Mysql
            for i in range(len(from_tables)) :
                from_tb = from_tables[i]
                to_tb = to_tables[i]
                oracle_cur = oracle_conn.cursor()
                #sql = "select c.column_name from SYS.ALL_CONS_COLUMNS c where c.owner='"+oracle_user+"' and c.table_name='"+from_tb+"'"
                # 获取 Oracle 表 列名, 如希望使用上述SQL方法  请重构 columns_name 的值
                sql = "select * from "+from_tb+" where rownum<1"
                oracle_cur.execute(sql)
                columns_name = [col[0] for col in oracle_cur.description] ;#print columns_name
                columns = get_columns(columns_name,'from_oracle'); #print columns
                if columns == '' : raise EOFError('columns error')
                sql = "select count(*) from "+from_tb; #print sql;
                oracle_cur.execute(sql)
                results = oracle_cur.fetchone()
                rows_count = results[0]
                oracle_cur.close()
                print 'Total %s rows from Table:%s found in Oracle:%s .\r\nThis program will select %s time(s)(limit %s rows per time) \r\n ... | Start ...' % (rows_count,from_tb,oracle_sid,int(math.ceil(float(rows_count)/rows_limit)),rows_limit)
                j = 0
                while j*rows_limit < rows_count :
                    oracle_cur = oracle_conn.cursor()
                    sql = "select %s from (select rownum rn,%s from %s where rownum <= %s) where rn > %s" % (columns,columns,from_tb,(j+1)*rows_limit,j*rows_limit);  #print sql;
                    oracle_cur.execute(sql)
                    results = oracle_cur.fetchall()
                    oracle_cur.close()
                    for record in results:
                        mysql_cur = mysql_conn.cursor()
                        sql = "insert into " + to_tb + get_columns(columns_name,'to_mysql')
                        mysql_cur.execute(sql,record)
                        mysql_conn.commit()
                        mysql_cur.close()
                    j += 1; print 'The (%s)th import end.(%s)rows ..' % (j,len(results))
                    time.sleep(time_sleep) #wait for several seconds
                print 'Data import to table:' + to_tb + ' in Mysql:'+mysql_dbname+'    |  Complete ~!  \r\n '
            print '\r\nAll Tables Import Complete ~!'
        else :
            print "unknow value of 'fromto' \r\nmake sure 'fromto' like 'mysql2oracle' or 'oracle2mysql' ...."
        #-close database connection
        mysql_conn.close()
        oracle_conn.close()
    except EOFError,e :
        print e
    finally :
        print 'Database Export/Import Python Code Excute End...\r\nBye~'
        #os.system('PAUSE')


关键字