当前位置:网站首页>Sqoop命令

Sqoop命令

2022-07-05 02:41:00 一个正在努力的菜鸡

数据导入

  • 这只是方便接下来的命令测试

1.project组数据

  • 数据库
1.create database sqooptest1
2.use sqooptest1
3.create table project(
	id int not null auto_increment primary key,
	name varchar(100) not null,
	type tinyint(4) not null default 0,
	description varchar(500) default null,
	create_at date default null,
	update_at timestamp not null default current_timestamp on update current_timestamp,
	status tinyint(4) not null default 0
);
4.insert into project( name,type,description,create_at,status)
values( 'project1',1,'project1 zy','2019-07-27',0);
insert into project( name,type,description,create_at,status)
values( 'project2',1,'project2 zy','2019-07-26',0);
insert into project( name,type,description,create_at,status)
values( 'project2',2,'project2 zy','2019-07-25',0);
  • sqoop命令
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1 --username root --password a --table project
  • 结果
    在这里插入图片描述

2.students组数据

  • 数据库
1.create database sqooptest1
2.use sqooptest1
3.create table students(
	id int not null primary key,
	name varchar(100) not null,
	age varchar(100) not null
);
  • 数据位置
E:\JAVA课程\...\11.Hadoop\12.Sqoop\a.txt
  • 向数据库中插入数据
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;

public class AddBatchMysql {
    
	public static void main(String[] args) {
    
		//1.用户输入文件位置
		Scanner sc = new Scanner(System.in);
		System.out.println("文件位置:");
		String path = sc.nextLine();
		//2.以流的形式读取文件中所有数据,按行读,按\t切,分出id,name,age
		List<String> list = new ArrayList<String>();
		try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(new File(path))))){
    
			String str;
			while((str=br.readLine())!=null){
    
				list.add(str);
			}
		} catch (Exception e) {
    
			e.printStackTrace();
		}
		//3.批量插入数据
		System.out.println("数据总条数:"+list.size());
		
		String sql = "insert into students values(?,?,?)";
		
		Connection con=null;
		PreparedStatement pstmt=null;
		try {
    
			con = DriverManager.getConnection("jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC","root","a");
			con.setAutoCommit(false);//设置成手动提交事务
			pstmt = con.prepareStatement(sql);
			
			int total = 0;
			String s;
			String[] ss;
			for(int i=0;i<list.size();i++){
    
				s = list.get(i);
				ss = s.split("\t");
				pstmt.setString(1, ss[0]);
				pstmt.setString(2, ss[1]);
				pstmt.setString(3, ss[2]);
				
				//加入批处理操作
				//将当前要执行的操作添加到批缓存
				pstmt.addBatch();
				if((i+1)%1000==0){
    //1000条数据处理一次
					int[] res = pstmt.executeBatch();
					total+=sum(res);
					con.commit();
					pstmt.clearBatch();
				}
			}
			
			int[] res = pstmt.executeBatch();
			System.out.println(res);
			
			total+=sum(res);
			con.commit();
			pstmt.clearBatch();
			System.out.println("实际插入数据条数:"+total);
		} catch (Exception e) {
    
			e.printStackTrace();
			try {
    
				con.rollback();
			} catch (SQLException e1) {
    
				e1.printStackTrace();
			}
		}finally {
    
			if(con!=null){
    
				try {
    
					con.setAutoCommit(true);
				} catch (SQLException e) {
    
					e.printStackTrace();
				}
				try {
    
					con.close();
				} catch (SQLException e) {
    
					e.printStackTrace();
				}
			}
		}
	}
	
	private static int sum(int[] res){
    
		int total = 0;
		if(res==null&&res.length<=0){
    
			return 0;
		}
		for(int i=0;i<res.length;i++){
    
			total+=res[i];
		}
		return total;
	}
}
  • 等在数据插入完成。。。

命令官网

sqoop-list-

1.列库

  • 命令
sqoop-list-databases --connect jdbc:mysql://localhost:3306/mysql?serverTimezone=UTC --username root --password a --verbose
	--verbose:工作时打印更多信息

2.列表

sqoop-list-tables --connect jdbc:mysql://localhost:3306/mysql?serverTimezone=UTC --username root --password a --verbose

sqoop import-

1.指定路径:–target-dir

  • –target-dir
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --target-dir /sqooptest/input1/project
	指定目录:/sqooptest/input1/project
	实际目录:/sqooptest/input1/project
  • 结果
    在这里插入图片描述

2.表名当成数据仓库名:–warehouse-dir

  • –warehouse-dir
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --warehouse-dir /sqooptest/input2
	指定目录:/sqooptest/input2
	实际目录:/sqooptest/input2/project
	分析:指定目录下创建一个名字为表名的目录,此时这个表名就当成了一个数据仓库名(warehouse)
  • 结果
    在这里插入图片描述

3.指定要查询的列与查询条件

sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --warehouse-dir /sqooptest/input3 --columns  'id,name,type'  --where 'id>2'  -m 1
	--table
	--columns
	--where
	-m:表示只用到一个mapper,一个mapper对应一个切片,对应一个输出文件
	因为用了--table, 所以以上会自动地拼装sql 语句. , 不能与-e or -query合用

在这里插入图片描述

4.指定sql语句

sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --target-dir /sqooptest/input4/project --query 'select id,name,type from project where id>2 and  $CONDITIONS'  --split-by project.id -m 1
	--query:不能与--table, --columns合用
	$CONDITIONS:表明分区列
	--split-by:用于拆分工作单元的表格列,不能与 --autoreset-to-one-mapper选项一起使用
	-m:表示只用到一个mapper,  一个mapper对应一个切片,对应一个输出文件

在这里插入图片描述

5.–direct

  • 失败,有坑!!!!!
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --warehouse-dir /sqooptest/input5  --direct   -m 1
	--direct使用mysqldump命令完成导入工作,因为是集群,map任务是分配到每个节点运行,所以每个节点都要有mysqldump命令

6.增量导入

  • 用于仅检索比某些先前导入的行集更新的行
  • 参数
--check-column:检查的列
--incremental append:如何确定哪些值是最新的
	append:追加
	lastmodified:最后一次修改
--last-value:上次导入检索的最大值

  • 插入数据
insert into project( name,type,description,create_at,status)
values( 'project5',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project6',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project7',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project8',5,'project5 zy','2019-07-25',0);

在这里插入图片描述

  • 命令
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --warehouse-dir /sqooptest/input6 -m 1 --check-column id  --incremental append --last-value 3
  • 结果
    在这里插入图片描述

  • 插入数据
insert into project( name,type,description,create_at,status)
values( 'project9',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project10',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project11',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project12',5,'project5 zy','2019-07-25',0);

在这里插入图片描述

  • 命令:注意输出目录没有变
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --warehouse-dir /sqooptest/input6 -m 1 --check-column id  --incremental append --last-value 7
  • 结果:图中的输出目录正确为input6
    在这里插入图片描述

  • 追加最后一次修改时间
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --warehouse-dir /sqooptest/input8 -m 1 --check-column update_at  --incremental lastmodified --last-value "2022-06-29 15:46:12" --append
	--append:将数据追加到 HDFS 中的现有数据集
  • 结果
    在这里插入图片描述
    在这里插入图片描述

sqoop job-

1.语法格式

sqoop job (generic-args) (job-args) [-- [subtool-name] (subtool-args)]
	注意--后有空格

2.创建任务,导入sqooptest1库中project表所以内容到hadoop

  • 原来的命令
sqoop import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC --username root --password a --table project --target-dir /sqooptest/input9/project -m 1

在这里插入图片描述


  • 创建任务命令
sqoop job --create yc-job1 -- import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC  --username root --password a --table project --target-dir /sqooptest/input10/project -m 1
  • 可能出现的问题
1.任务已经存在了,请更改任务名或删除掉原任务. 
2.Caused by: java.lang.ClassNotFoundException: org.json.JSONObject   
	缺少jar包(org.json.json),将java-json.jar包上传到sqoop/lib包下
  • 查看创建的任务的两种方法
sqoop job --list
sqoop job --show yc-job1
  • 执行任务
sqoop job --exec yc-job1
  • 查看执行结果
    在这里插入图片描述

3.使用密码文件登录数据库

  • 以上创建任务时,提示MySQL的密码输入, 阻塞了自动化运行
  • 官方7.2.1提示配置密码文件
    在这里插入图片描述
  • 创建密码隐藏文件
echo -n "a" >/root/.mysql.password
chmod 400 /root/.mysql.password
  • 列出所有的文件,包括隐藏
ls -al
  • 创建任务的代码
sqoop job --create yc-job2 -- import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC  --username root --password-file file:////root/.mysql.password --table project --target-dir /sqooptest/input11/project -m 1
  • 查看创建的任务
sqoop job --list
sqoop job --show yc-job2
  • 执行任务
sqoop job --exec yc-job2
  • 查看执行结果
    在这里插入图片描述

4.创建追加导入任务

  • 到mysql中查看一下project表的id最大值在这里插入图片描述
  • 插入一些新数据
insert into project( name,type,description,create_at,status)
values( 'project12',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project13',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project14',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project15',5,'project5 zy','2019-07-25',0);

在这里插入图片描述

  • 创建任务代码
sqoop job --create yc-job3 -- import --connect jdbc:mysql://node3:3306/sqooptest1?serverTimezone=UTC  --username root --password-file file:////root/.mysql.password --table project --target-dir /sqooptest/input12/project -m 1 --check-column id --incremental append --last-value 11
  • 查看创建的任务
sqoop job --list
  • 执行任务
sqoop job --exec yc-job3
  • 查看执行结果
    在这里插入图片描述
  • 再插入一些新数据
insert into project( name,type,description,create_at,status)
values( 'project16',5,'project5 zy','2019-07-25',0);
insert into project( name,type,description,create_at,status)
values( 'project17',5,'project5 zy','2019-07-25',0);	
  • 执行任务
sqoop job --exec yc-job3
  • 查看执行结果
    在这里插入图片描述
  • ** 输出结果中表明,这个job底层有一个叫metastore的元数据库(sqlite, metastore)存储当前 id 的最新值 ,以便下一次从此处导入,这方便了定时任务,不用程序员自己记录更新到那一条数据了**

5.定时作业

  • 三种方案
1.oozie,azkaban框架***  
2.编写定时程序(Thread,java.util.TimerTask,Quartz定时器框架->cron表达式)
3.centos自带的crontab实现****

  • 第三种方案的实现
  • /usr/local/bin下创建sqoop_incremental.sh定时任务脚本文件
cd /usr/local/bin
vim sqoop_incremental.sh
	#! /bin/bash
	/usr/local/sqoop147/bin/sqoop  job --exec yc-job3>>/usr/local/sqoop147/myjob.out 2>&1 &
		#解释
		#/usr/local/sqoop147/bin/sqoop:sqoop命令全路径,防止找不到 
		#/usr/local/sqoop147/myjob.out:命令的结果输出到myjob.out
		#2>&1:错误日志也当成正确日志
		#&:后台进程
  • 创建crontab
crontab -e
	#每5分钟执行一次
	*/5 * * * * /usr/bin/bash /usr/local/sqoop147/bin/sqoop_incremental.sh
	#格式:分 时 日 月 周 命令
  • 再插入一些新数据
insert into project( name,type,description,create_at,status)
values( 'project18',5,'project5 zy','2019-07-25',0);
  • 等待五分钟左右
  • 查看日志文件:/usr/local/sqoop147/myjob.out
    在这里插入图片描述
  • 查看执行结果
    在这里插入图片描述
原网站

版权声明
本文为[一个正在努力的菜鸡]所创,转载请带上原文链接,感谢
https://blog.csdn.net/weixin_51699336/article/details/125588620