当前位置:网站首页>dbswitch数据迁移数据增量时如何不覆盖目标源数据

dbswitch数据迁移数据增量时如何不覆盖目标源数据

2022-07-27 05:11:00 超多多和刘宝宝的代码世界

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

最近项目需要一个数据引接功能,要能实现各数据库之间的数据迁移,数据的全量迁移和增量迁移,并找到开源项目DBSWITCH


一、DBSWITCH是什么?

https://gitee.com/dk_88/dbswitch

官方:

一句话,dbswitch工具提供源端数据库向目的端数据的迁移同步功能,包括全量和增量方式。迁移包括:

结构迁移
字段类型、主键信息、建表语句等的转换,并生成建表SQL语句。

数据迁移。
基于JDBC的分批次读取源端数据库数据,并基于insert/copy方式将数据分批次写入目的数据库。

支持有主键表的 增量变更同步 (变化数据计算Change Data Calculate)功能(千万级以上数据量慎用)

二、使用步骤

1.拉取代码

2.读取代码

3.目标数据库数据覆盖问题

DBSWITCH数据迁移是两种方式,一种全量,一种增量。并且默认执行树迁移时先删除表,在创建表。

同时,如果你修改了目标数据库的数据。会在数据增量时,把目标数据库修改和新增的数据覆盖。也就是说目标数据库不允许有和源数据库不一样的数据。

4.如何让源数据库只新增,不覆盖?

修改底层代码实现

三 源码解析

找到手动执行任务接口

  @TokenCheck
  @LogOperate(name = "手动执行任务", description = "'手动执行任务的ID为:'+#ids")
  @ApiOperation(value = "手动执行")
  @PostMapping(value = "/run", produces = MediaType.APPLICATION_JSON_VALUE)
  public Result runAssignments(@RequestBody List<Long> ids) {
    
    assignmentService.runAssignments(ids);
    return Result.success();
  }

在这个接口的service中,数据库连接等信息被封装到jobDetail被异步扔进scheduler.scheduleJob(jobDetail, simpleTrigger)任务中。这个任务一种有一个任务执行器来处理这个job,找到JobExecutorService

@Override
  public void executeInternal(JobExecutionContext context) throws JobExecutionException {
    
    currentThread = Thread.currentThread();
    JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
    if (interrupted) {
    
      log.info("Quartz task id:{} interrupted", jobDataMap.getLong(TASK_ID));
      return;
    }

    JobKey key = context.getJobDetail().getKey();
    Long taskId = jobDataMap.getLongValue(TASK_ID);
    Integer schedule = jobDataMap.getIntValue(SCHEDULE);
    AssignmentJobEntity assignmentJobEntity = assignmentJobDAO
        .newAssignmentJob(taskId, schedule, key.getName());
    try {
    
      ReentrantLock lock = mutexes.get(taskId.toString(), ReentrantLock::new);
      while (!lock.tryLock(1, TimeUnit.SECONDS)) {
    
        TimeUnit.SECONDS.sleep(1);
      }

      try {
    
        log.info("Execute Quartz Job, and task id is : {} , job id is: {}", taskId,
            assignmentJobEntity.getId());

        AssignmentTaskEntity task = assignmentTaskDAO.getById(taskId);
        AssignmentConfigEntity assignmentConfigEntity = assignmentConfigDAO
            .getByAssignmentTaskId(task.getId());

        log.info("Execute Assignment [taskId={}],Task Name: {} ,configuration properties:{}",
            task.getId(),
            task.getName(),
            task.getContent());

        try {
    
          DbswichProperties properties = JsonUtils.toBeanObject(
              task.getContent(), DbswichProperties.class);
          if (!assignmentConfigEntity.getFirstFlag()) {
    
            properties.getTarget().setTargetDrop(false);
            properties.getTarget().setChangeDataSync(true);
          }

          MigrationService mainService = new MigrationService(properties);
          if (interrupted) {
    
            log.info("Quartz task id:{} interrupted", jobDataMap.getLong(TASK_ID));
            return;
          }

          // 实际执行JOB
          mainService.run();

          if (assignmentConfigEntity.getFirstFlag()) {
    
            AssignmentConfigEntity config = new AssignmentConfigEntity();
            config.setId(assignmentConfigEntity.getId());
            config.setTargetDropTable(Boolean.FALSE);
            config.setFirstFlag(Boolean.FALSE);
            assignmentConfigDAO.updateSelective(config);
          }

          assignmentJobEntity.setStatus(JobStatusEnum.PASS.getValue());
          log.info("Execute Assignment Success [taskId={},jobId={}],Task Name: {}",
              task.getId(), assignmentJobEntity.getId(), task.getName());
        } catch (Throwable e) {
    
          assignmentJobEntity.setStatus(JobStatusEnum.FAIL.getValue());
          assignmentJobEntity.setErrorLog(ExceptionUtil.stacktraceToString(e));
          log.info("Execute Assignment Failed [taskId={},jobId={}],Task Name: {}",
              task.getId(), assignmentJobEntity.getId(), task.getName(), e);
        } finally {
    
          assignmentJobEntity.setFinishTime(new Timestamp(System.currentTimeMillis()));
          assignmentJobDAO.updateSelective(assignmentJobEntity);
        }
      } finally {
    
        lock.unlock();
      }
    } catch (ExecutionException | InterruptedException e) {
    
      throw new RuntimeException(e);
    }

这个方法前半部分是取值,后半部分是日志记录,只有 mainService.run(); 是执行job,我们继续往下看。

在MigrationService 这个数据迁移这个主逻辑类中,主要是判断任务参数中的排除表和包含表的情况,来确定数据迁移是哪几张表。判断最后,将需要迁移的数据库表添加到一个异步执行任务的队列中

             for (TableDescription td : tableList) {
    
                // 当没有配置迁移的表是,默认为所有物理表(不含有视图表)
                if (includes.isEmpty() && DBTableType.VIEW.name().equals(td.getTableType())) {
    
                  continue;
                }

                String tableName = td.getTableName();

                if (useExcludeTables) {
     //使用排除表
                  if (!filters.contains(tableName)) {
    
                    futures.add(
                        makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource,
                            numberOfFailures, totalBytesSize));
                  }
                } else {
     //使用包含表
                  if (includes.size() == 1 && (includes.get(0).contains("*") || includes.get(0)
                      .contains("?"))) {
    
                    if (Pattern.matches(includes.get(0), tableName)) {
    
                      futures.add(
                          makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource,
                              numberOfFailures, totalBytesSize));
                    }
                  } else if (includes.contains(tableName)) {
    
                    futures.add(makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource,
                            numberOfFailures, totalBytesSize));
                  }
                }
              }
              

makeFutureTask方法是构建一个异步任务,这个方法中还有一个最终要的方法getMigrateHandler 单表迁移方法

CompletableFuture.supplyAsync 异步执行 getMigrateHandler 单表迁移这个方法

  /** * 构造一个异步执行任务 * * @param td 表描述上下文 * @param indexInternal 源端索引号 * @param sds 源端的DataSource数据源 * @param tds 目的端的DataSource数据源 * @param numberOfFailures 失败的数量 * @param totalBytesSize 同步的字节大小 * @return CompletableFuture<Void> */
  private CompletableFuture<Void> makeFutureTask(TableDescription td,Integer indexInternal,
                                                 HikariDataSource sds, HikariDataSource tds,
                                                 AtomicInteger numberOfFailures,
                                                 AtomicLong totalBytesSize) {
    
    return CompletableFuture.supplyAsync(getMigrateHandler(td, indexInternal, sds, tds))
        .exceptionally(getExceptHandler(td, numberOfFailures))
        .thenAccept(totalBytesSize::addAndGet);
  }
  
 /** * 单表迁移处理方法 * * @param td 表描述上下文 * @param indexInternal 源端索引号 * @param sds 源端的DataSource数据源 * @param tds 目的端的DataSource数据源 * @return Supplier<Long> */
  private Supplier<Long> getMigrateHandler(TableDescription td,
                                           Integer indexInternal,
                                           HikariDataSource sds,
                                           HikariDataSource tds) {
    
    return () -> MigrationHandler.createInstance(td, properties, indexInternal, sds, tds).get();
  }


MigrationHandler中的get方法就是最终数据迁移的实现

    if (properties.getTarget().getTargetDrop()) {
    
      log.error("targetDrop " + properties.getTarget().getChangeDataSync());
      /* 如果配置了dbswitch.target.datasource-target-drop=true时, <p> 先执行drop table语句,然后执行create table语句 */

我们看properties.getTarget().getTargetDrop())为true时,会删除目表原始数据库,在执行createTable语句,我们需要目标数据源有自己独有的数据而不被覆盖,那么这个结果一定要为false。查找到这个类对象将targetDrop字段设置为false,DbswichProperties来自config.yml,建议也将字段设置为false

继续往下看,会看到两个方法:

doFullCoverSynchronize(writer) — 全量覆盖

doIncreaseSynchronize(writer) ---- 增量

我以为到这儿,我只需要将targetDrop设置为false就完成了,但是,实验之后,依然没有解决。

继续查看增量的方法,查看日志后,发现代码执行了 doDelete() 和 doUpdate doInsert 三个方法,并且输出了三个方法执行的次数,查看代码:

@Override
      public void destroy(List<String> fields) {
    

        //TODO 取消删除
        if (cacheDelete.size() > 0) {
    
          doDelete(fields);
        }

        if (cacheInsert.size() > 0) {
    
          doInsert(fields);
        }

        //TODO 取消更新
        if (cacheUpdate.size() > 0) {
    
          doUpdate(fields);
        }
        log.info("[IncreaseSync] Handle table [{}] total count: {}, Insert:{},Update:{},Delete:{} ",
            tableNameMapString, countTotal, countInsert, countUpdate, countDelete);
      }

原理:在底层设计中,源数据库和目标数据库数据会进行对比,并且标名这个数据状态(修改,新增,未变,删除)。并且将这新增放入cacheInsert,更新放入cacheUpdate,删除放入cacheDelete中。然后首先执行删除,将目标源数据进行删除,在执行新增,最后更新。

所以要想目标源保留独有数据,需要将doDelete和cacheUpdate方法取消,将对应的cache缓存清除以免内存溢出.

@Override
      public void destroy(List<String> fields) {
    

        //TODO 取消删除
/* if (cacheDelete.size() > 0) { doDelete(fields); }*/

        if (cacheInsert.size() > 0) {
    
          doInsert(fields);
        }

        //TODO 取消更新
/* if (cacheUpdate.size() > 0) { doUpdate(fields); }*/

        //TODO 缓存删除
        cacheUpdate.clear();
        cacheDelete.clear();

        log.info("[IncreaseSync] Handle table [{}] total count: {}, Insert:{},Update:{},Delete:{} ",
            tableNameMapString, countTotal, countInsert, countUpdate, countDelete);
      }
原网站

版权声明
本文为[超多多和刘宝宝的代码世界]所创,转载请带上原文链接,感谢
https://blog.csdn.net/qq_48329942/article/details/125681781