代码demo(mr hbase hive redis)
# 环境
基于Windows下eclipse的MapReduce开发环境配置 https://www.cnblogs.com/Hephaestus/p/12608456.html
# MapReduce代码
MyMapper
package mrproject;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* map阶段的业务逻辑处理就写在map()方法中
* maptask会对每一行输入数据调用一次我们自定义的map()方法
* @throws InterruptedException
*/
private Text word = new Text();//Text是Hadoop针对字符串的序列化包装类
private IntWritable one = new IntWritable(1);//IntWritable是Hadoop针对整型的序列化包装类---更多关于Hadoop的序列化包装类请查阅相关资料
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//将maptask传递给我们的文本内容先转换成string
String line=value.toString();
//按照空格行切割单词
String[] words=line.split(" ");
//将单词输出为<单词,1>
for(String w:words) {
word.set(w);
//将单词作为key,将次数1作为Value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reduce task
context.write(word,one);
}
}
}
MyReducer
package mrproject;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* reducer中主要的方式就是reduce,用户处理map阶段产生的key-value数据,如下样例
* <angel,1> <angel,1> <angel,1> <angel,1> <angel,1>
* <hello,1> <hello,1> <hello,1> <hello,1> <hello,1> <hello,1>
* <banana,1> <banana,1> <banana,1> <banana,1> <banana,1> <banana,1>
* 入参key:是一组单词的kv对应的key,将相同单词的一组传递,如此时key是hello,那么参数二是一个迭代器,一组数
* @throws InterruptedException
* @throws IOException
*/
private IntWritable sum= new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int count=0;
/**
Iterator<IntWritable> iterator=values.iterator();
while(iterator.hasNext()) {
count+=iterator.next().get();
}
*/
//此处的values是某个key对应的所有的value,例如上面数据angel对应了5个1
for(IntWritable value:values) {
count+=value.get();
}
sum.set(count);
//上下文输出
context.write(key, sum);
}
}
MRApp
package mrproject;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRApp {
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
//此处主要用于本地运行,如果生产环境使用shell命令运行,则命令中会有输入输出目录
//本地运行时为了方便没有给到args,此处代码处理下给个默认值
if(args.length <2)
{
args= new String[]{
"hdfs://10.20.10.67:9000/input",
"hdfs://10.20.10.67:9000/output4"
};
}
Configuration conf=new Configuration();
/*
* 集群中节点都有配置文件
conf.set("mapreduce.framework.name.", "yarn");
conf.set("yarn.resourcemanager.hostname", "mini1");
*/
Job job=Job.getInstance(conf);
//jar包在哪里,现在在客户端,传递参数
//任意运行,类加载器知道这个类的路径,就可以知道jar包所在的本地路径
job.setJarByClass(MRApp.class);
//指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定最终输出的数据kv类型
job.setOutputKeyClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
//指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//将job中配置的相关参数及job所用的java类在的jar包,提交给yarn去运行
//提交之后,此时客户端代码就执行完毕,退出
//job.submit();
//等集群返回结果在退出
boolean res=job.waitForCompletion(true);
System.exit(res?0:1);
}
}
# hbase代码
表操作(DDL DML)
package com.hrbu.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceExistException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.RandomRowFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.util.Bytes;
/**
* DDL: 1.判断表是否存在 2.创建表 3.创建命名空间 4.删除表
* DML: 5.插入数据 6.查数据(get) 7.查数据(scan) 8.删除数据
*/
public class TestApi {
public static Connection connect = null;
public static Admin admin = null;
static {
try {
// 1. 获取配置文件信息(使用 HBaseConfiguration 的单例方法实例化)
Configuration conf = HBaseConfiguration.create();
//conf.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103");
conf.set("hbase.zookeeper.quorum", "hadoop1");
conf.set("hbase.zookeeper.property.clientPort", "2181");
// 2. 创建连接对象
connect = ConnectionFactory.createConnection(conf);
// 3. 获取管理员对象
admin = connect.getAdmin();
} catch (IOException e) {
// TODO Auto-generated catch block
System.out.println("创建hbase连接对象异常");
e.printStackTrace();
}
}
// 关闭资源和连接
public static void close() {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
System.out.println("admin关闭异常");
e.printStackTrace();
}
}
if (connect != null) {
try {
connect.close();
} catch (IOException e) {
System.out.println("hbase连接对象关闭异常");
e.printStackTrace();
}
}
}
// 1.判断表是否存在
public static void is_Exists(String tableName) throws IOException {
if (admin.tableExists(TableName.valueOf(tableName))) {
System.out.println(tableName + "表已经存在");
} else {
System.out.println(tableName + "表不存在");
}
}
// 2.创建表
public static void create_Table(String tableName) throws IOException {
TableName tableNameTemp = TableName.valueOf(tableName);
if (admin.tableExists(tableNameTemp)) {
System.out.println(tableName + "表已经存在");
} else {
// 4.通过表实例来执行表结构信息
TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(tableNameTemp);
// 列族
ColumnFamilyDescriptor info1 = ColumnFamilyDescriptorBuilder.of("info1");
ColumnFamilyDescriptor info2 = ColumnFamilyDescriptorBuilder.of("info2");
ColumnFamilyDescriptor info3 = ColumnFamilyDescriptorBuilder.of("info3");
List<ColumnFamilyDescriptor> cfList = new ArrayList<ColumnFamilyDescriptor>();
cfList.add(info1);
cfList.add(info2);
cfList.add(info3);
tableBuilder.setColumnFamilies(cfList);
// 5.构建表描述
TableDescriptor tableDesc = tableBuilder.build();
admin.createTable(tableDesc);
System.out.println(tableName + "\t表创建成功");
}
}
// 3.创建命名空间
public static void create_NameSpace(String nameSpace) {
// 创建命名空间描述器
NamespaceDescriptor descriptor = NamespaceDescriptor.create(nameSpace).build();
//创建命名空间
try {
admin.createNamespace(descriptor);
} catch(NamespaceExistException e) {
System.out.println(nameSpace + "命名空间已存在");
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("--我会被执行吗--");
}
// 4.删除表
public static void drop_Table(String tableName) throws IOException {
TableName tableNameTemp = TableName.valueOf(tableName);
if (admin.tableExists(tableNameTemp)) {
admin.disableTable(tableNameTemp);
admin.deleteTable(tableNameTemp);
System.out.println("表" + tableName + "删除成功! ");
} else {
System.out.println("表" + tableName + "不存在! ");
}
}
// 5.插入数据
public static void put_Data(String tableName, String rowKey, String columnFamily, String column, String value) {
Table table = null;
try {
table = connect.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
table.put(put);
} catch(NoSuchColumnFamilyException e){
System.out.println("异常:没有此列族");
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if(table!=null) {
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("数据插入成功");
}
// 6.查数据(get)
public static void get_Data(String tableName, String rowKey, String columnFamily, String column) throws IOException {
Table table = connect.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
//get.addFamily(Bytes.toBytes(columnFamily));
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
Result result = table.get(get);
Cell[] cells = result.rawCells();
System.out.println(tableName + "--" + rowKey + "--" + columnFamily + "--" + column + ":");
print_Cells2(cells);
table.close();
}
public static void get_Data(String tableName, String rowKey, String columnFamily) throws IOException {
Table table = connect.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
get.addFamily(Bytes.toBytes(columnFamily));
Result result = table.get(get);
Cell[] cells = result.rawCells();
System.out.println(tableName + "--" + rowKey + "--" + columnFamily + ":");
print_Cells2(cells);
table.close();
}
public static void get_Data(String tableName, String rowKey) throws IOException {
Table table = connect.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);
Cell[] cells = result.rawCells();
System.out.println(tableName + "--" + rowKey + ":");
print_Cells2(cells);
table.close();
}
// 打印单元格信息
private static void print_Cells(Cell[] cells) {
for (Cell tempCell : cells) {
System.out.println(Bytes.toString(CellUtil.cloneRow(tempCell))
+ "\t\tcolumn=" + Bytes.toString(CellUtil.cloneFamily(tempCell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(tempCell))
+ ",timestamp=" + tempCell.getTimestamp()
+ ", value=" + Bytes.toString(CellUtil.cloneValue(tempCell)) );
}
}
// 打印单元格信息
private static void print_Cells2(Cell[] cells) {
StringBuilder sb = new StringBuilder();
for (Cell cell : cells) {
String column = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
String value = Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
sb.append(column).append(":").append(value).append(";\t");
}
System.out.println(sb.toString());
}
// 7.查数据(scan)
public static void scan_Data(String tableName) {
Table table = null;
try {
table = connect.getTable(TableName.valueOf(tableName));
ResultScanner results = table.getScanner(new Scan());
System.out.println(tableName + "表\nROW\t\tCOLUMN+CELL");
for (Result result : results) {
Cell[] cells = result.rawCells();
print_Cells(cells);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(table!=null) {
table.close();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
// 7.2 查数据(scan + filter)
public static void filter_Data(String tableName, Filter filter, int limit) throws IOException {
Table table = connect.getTable(TableName.valueOf(tableName));
Scan scan = new Scan().setFilter(filter).setLimit(limit);
ResultScanner resultScan = table.getScanner(scan);
System.out.println(tableName + "表(filter)\nROW\t\tCOLUMN+CELL");
for (Result result : resultScan) {
Cell[] cells = result.rawCells();
print_Cells(cells);
}
table.close();
}
// 8. 删除多行数据
public static void delete_Data(String tableName, String... rows) throws IOException {
Table table = connect.getTable(TableName.valueOf(tableName));
//Delete delete = new Delete(Bytes.toBytes(rowKey));
List<Delete> deleteList = new ArrayList<Delete>();
for (String row:rows) {
Delete delete = new Delete(Bytes.toBytes(row));
deleteList.add(delete);
}
table.delete(deleteList);
table.close();
System.out.println("删除成功");
}
// 8.1 删除单元格(Cell)数据
public static void delete_Cell(String tableName, String rowKey, String columnFamily, String column ) throws IOException {
Table table = connect.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column)); //删除最后一版本
//delete.addColumns(Bytes.toBytes(columnFamily), Bytes.toBytes(column)); //删除所有版本
table.delete(delete);
table.close();
}
public static void main(String[] args) throws IOException {
System.out.println("============================start=========================");
// 1.判断表是否存在
//is_Exists("student");
// 3.创建命名空间
//create_NameSpace("std");
// 2.创建表
//create_Table("std:stu");
// 4.删除表
//drop_Table("std:stu");
// 5.插入数据
//put_Data("std:stu", "1003", "info1", "addr", "beijing");
//put_Data("std:stu", "1004", "info1", "sex", "男");
//put_Data("std:stu", "1005", "info1", "class", "2");
// 6.查数据(get)
get_Data("std:stu", "1002");
get_Data("std:stu", "1002", "info1");
// 7.查数据(scan)
scan_Data("std:stu");
// 7.1 查数据(scan + filter)
//filter_Data("std:stu", new PrefixFilter(Bytes.toBytes("100")), 10); //筛选出行键以row为前缀的所有的行
//filter_Data("std:stu", new RandomRowFilter((float) 0.2), 10); //按照一定的几率(<=0会过滤掉所有的行,>=1会包含所有的行)来返回随机的结果集
//filter_Data("std:stu", new KeyOnlyFilter(), 10); // 返回所有的行,但值全是空
// 筛选出匹配的所有的行
filter_Data("std:stu", new RowFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("1002"))), 10);
// 8. 删除多行数据数据
//delete_Data("std:stu", "1003","1004");
// 8.1删除单元格数据
//delete_Cell("std:stu", "1005", "info1", "name");
scan_Data("std:stu");
System.out.println("============================end==========================");
// end:关闭资源和连接
close();
}
}
# MySQL数据插入到HBase
插入HBase
package com.hrbu.hbase;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.util.Bytes;
public class Sqoop_Hbase {
public static Connection connect = null;
public static Admin admin = null;
public static void init() {
try {
// 1. 获取配置文件信息(使用 HBaseConfiguration 的单例方法实例化)
Configuration conf = HBaseConfiguration.create();
//conf.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103");
conf.set("hbase.zookeeper.quorum", "hadoop1");
conf.set("hbase.zookeeper.property.clientPort", "2181");
// 2. 创建连接对象
connect = ConnectionFactory.createConnection(conf);
// 3. 获取管理员对象
admin = connect.getAdmin();
} catch (IOException e) {
// TODO Auto-generated catch block
System.out.println("创建hbase连接对象异常");
e.printStackTrace();
}
}
// 关闭资源和连接
public static void close() {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
System.out.println("admin关闭异常");
e.printStackTrace();
}
}
if (connect != null) {
try {
connect.close();
} catch (IOException e) {
System.out.println("hbase连接对象关闭异常");
e.printStackTrace();
}
}
}
// 插入数据
public static void put_Data(String tableName, String columnFamily,List<Staff> list) throws IOException {
if(admin.tableExists(TableName.valueOf(tableName))) {
System.out.println(tableName + "表已经存在");
}else {
// 4.通过表实例来执行表结构信息
TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
// 列族
ColumnFamilyDescriptor info1 = ColumnFamilyDescriptorBuilder.of(columnFamily);
tableBuilder.setColumnFamily(info1);
// 5.构建表描述
TableDescriptor tableDesc = tableBuilder.build();
admin.createTable(tableDesc);
System.out.println(tableName + "\t表创建成功");
Table table = null;
try {
table = connect.getTable(TableName.valueOf(tableName));
for (int i = 0; i < list.size(); i++) {
//解决数字(int double乱码) 先转字符,再转字符数组
Put put = new Put(Bytes.toBytes(String.valueOf(list.get(i).getId())));
//汉字乱码该怎么解决(将utf-8的汉字转换成unicode格式汉字码 不好使)
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("name"), Bytes.toBytes(list.get(i).getName()));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("sex"), Bytes.toBytes(list.get(i).getSex()));
table.put(put);
}
} catch(NoSuchColumnFamilyException e){
System.out.println("异常:没有此列族");
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if(table!=null) {
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
//外部调用的方法(算是封装好的)
public static void main_Use(String tableName, String columnFamily,List<Staff> list){
init();
try {
put_Data(tableName, columnFamily, list);
scan_Data(tableName);
} catch (IOException e) {
//e.printStackTrace();
System.out.println("admin异常");
}finally {
close();
}
System.out.println("插入数据成功");
}
/****************************以下为多余代码*****************************************/
// 7.查数据(scan)
public static void scan_Data(String tableName) {
Table table = null;
try {
table = connect.getTable(TableName.valueOf(tableName));
ResultScanner results = table.getScanner(new Scan());
System.out.println(tableName + "表\nROW\t\tCOLUMN+CELL");
for (Result result : results) {
Cell[] cells = result.rawCells();
print_Cells(cells);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(table!=null) {
table.close();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
// 打印单元格信息
private static void print_Cells(Cell[] cells) {
for (Cell tempCell : cells) {
System.out.println(Bytes.toString(CellUtil.cloneRow(tempCell))
+ "\t\tcolumn=" + Bytes.toString(CellUtil.cloneFamily(tempCell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(tempCell))
+ ",timestamp=" + tempCell.getTimestamp()
+ ", value=" + Bytes.toString(CellUtil.cloneValue(tempCell)) );
}
}
/**
* 1将utf-8的汉字转换成unicode格式汉字码
* @param string
* @return
*/
public static String stringToUnicode(String string) {
StringBuffer unicode = new StringBuffer();
for (int i = 0; i < string.length(); i++) {
char c = string.charAt(i);
unicode.append("\\u" + Integer.toHexString(c));
}
String str = unicode.toString();
return str.replaceAll("\\\\", "0x");
}
/**
* 2将unicode的汉字码转换成utf-8格式的汉字
* @param unicode
* @return
*/
public static String unicodeToString(String unicode) {
String str = unicode.replace("0x", "\\");
StringBuffer string = new StringBuffer();
String[] hex = str.split("\\\\u");
for (int i = 1; i < hex.length; i++) {
int data = Integer.parseInt(hex[i], 16);
string.append((char) data);
}
return string.toString();
}
}
读MySQL
package com.hrbu.hbase;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
public class Sqoop_Mysql {
public static Connection connect = null;
public static void init() {
//驱动程序名
String driver = "com.mysql.cj.jdbc.Driver";
//url指向要访问的数据库名
String url = "jdbc:mysql://hadoop1:3306/company?useUnicode=true&characterEncoding=utf-8&useSSL=false";
//mysql配置时的用户名
String username = "root";
//密码
String password = "123456789";
try {
//加载驱动程序
Class.forName(driver);
connect = DriverManager.getConnection(url,username,password);
if (!connect.isClosed()) {
System.out.println("数据库连接成功");
}
} catch (ClassNotFoundException|SQLException e) {
e.printStackTrace();
}
}
public static void close(){
if(connect!=null) {
try {
connect.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
public static List<Staff> selectAllFromStaff() {
//实例化Statement对象
List<Staff> list = new ArrayList<Staff>();
Statement stmt = null;
try {
stmt = connect.createStatement();
String sql = "select * from staff";
ResultSet resultSet = stmt.executeQuery(sql);
while(resultSet.next()) {
Staff staff = new Staff();
staff.setId(resultSet.getInt("id"));
staff.setName(resultSet.getString("name"));
staff.setSex(resultSet.getString("sex"));
list.add(staff);
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
try {
if(stmt!=null) {
stmt.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
return list;
}
public static void print(List<Staff> list) {
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i).getId() + "\t" +list.get(i).getName() + "\t" + list.get(i).getSex());
}
}
//外部调用的方法(算是封装好的)
public static List<Staff> main_Use() {
init();
List<Staff> list = selectAllFromStaff();
close();
return list;
}
}
class Staff {
int id;
String name;
String sex;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
}
import java.util.List;
public class Sqoop_Test {
public static void main(String[] args) {
List<Staff> list = Sqoop_Mysql.main_Use();
Sqoop_Hbase.main_Use("staff", "info", list);
}
}
package com.hrbu.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
public class Test {
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
Configuration conf = null;
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.1.100");
conf.set("hbase.zookeeper.property.clientPort", "2181");
System.out.println("11111111111111111111");
//2. 创建连接对象
Connection connect = null;
connect = ConnectionFactory.createConnection(conf);
System.out.println("22222222222222222222");
//3. 获取管理员对象
Admin admin = connect.getAdmin();
System.out.println("333333333333333333333" + admin + "333" + connect);
//System.out.println(admin.tableExists(TableName.valueOf("student")));
if(admin.tableExists(TableName.valueOf("student"))) {
System.out.println("表已经存在");
}else {
System.out.println("表不存在");
}
System.out.println("44444444444444444444");
admin.close();
connect.close();
}
}
# hive代码
表操作
package com.hrbu.hive;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class ClusterTestApi {
//驱动名称
private static String driverName = "org.apache.hive.jdbc.HiveDriver";
//连接用的url
private static String url = "jdbc:hive2://8.8.8.100:10000/default";
//用户名与密码无需提供
private static String user = "banana";
private static String password = "";
private static Connection conn = null;
private static Statement stmt = null;
private static ResultSet rs = null;
/**
* junit单元测试方法,关键技术是注解
* 1可以随时测试某个方法,不用再写main函数与多余的代码
* 2面向切面的before和after使我们的代码结构更加合理
*/
//加载驱动,创建连接
@Before //表示在任意使用@Test注解标注的public void方法之前执行
public void init() {
try {
Class.forName(driverName);
conn = DriverManager.getConnection(url,user,password);
stmt = conn.createStatement();
} catch (Exception e) {
e.printStackTrace();
}
}
//释放资源
@After //表示在任意使用@Test注解标注的public void方法之后执行
public void destory() throws SQLException {
if(rs != null) {
rs.close();
}
if(stmt != null) {
stmt.close();
}
if(conn != null) {
conn.close();
}
}
//创建数据库
@Test
public void create_DataBase() throws SQLException {
String sql = "create database IF NOT EXISTS hive_jdbc_test"; //create database IF NOT EXISTS hive_jdbc_test
System.out.println("Running " + sql);
stmt.execute(sql);
System.out.println("create database success");
}
//创建表格
@Test
public void create_StudentTable() throws SQLException {
//String sql0 = "use hive_jdbc_test";
String sql = "create table student(code string,name string,gender string,school string,profession string)\r\n" +
" comment 'this is a student table'\r\n" +
" row format delimited fields terminated by '\\t'\r\n" +
" stored as textfile";
System.out.println("Running create table student");
//stmt.execute(sql0);
stmt.execute(sql);
System.out.println("create table student success");
}
// 查询所有数据库
@Test
public void show_DataBases() throws SQLException {
String sql = "show Databases";
System.out.println("Running " + sql);
rs = stmt.executeQuery(sql);
while(rs.next()) {
System.out.println(rs.getString(1));
}
}
// 查询当前数据库中所有表
@Test
public void show_Tables() throws SQLException {
String sql = "show tables";
System.out.println("Running " + sql);
rs = stmt.executeQuery(sql);
while(rs.next()) {
System.out.println(rs.getString(1));
}
}
//加载数据
@Test
public void load_Data() throws SQLException {
//linux路径
String filePath = " '/soft/module/datas/short-student-utf8.txt' ";
String sql = "load data local inpath" + filePath + "overwrite into table student";
System.out.println("Running " + sql);
stmt.execute(sql);
System.out.println("load data local success");
}
//查询数据
@Test
public void select_Data() throws SQLException {
String sql = "select * from test_db.student";
System.out.println("Running " + sql);
rs = stmt.executeQuery(sql);
System.out.println("学号\t姓名\t性别\t学校\t专业");
while(rs.next()) {
System.out.println(rs.getString("code") + "\t" + rs.getString("name") + "\t" + rs.getString("gender") + "\t" + rs.getString("school") + "\t" + rs.getString("profession"));
}
}
//统计查询(运行mapreduce作业)
@Test
public void count_Data() throws SQLException {
String sql = "select count(*) from test_db.student";
System.out.println("Running " + sql);
rs = stmt.executeQuery(sql);
System.out.println("学号\t姓名\t性别\t学校\t专业");
while(rs.next()) {
System.out.println(rs.getInt(1));
}
}
//删除表
@Test
public void drop_table() throws SQLException {
String sql = "drop table student";
System.out.println("Running " + sql);
stmt.execute(sql);
System.out.println("drop table success");
}
//删除数据库
@Test
public void drop_DataBase() throws SQLException {
//强制删除数据库
//String sql = "DROP DATABASE IF EXISTS hive_jdbc_test CASCADE";
String sql = "DROP DATABASE IF EXISTS hive_jdbc_test";
System.out.println("Running " + sql);
stmt.execute(sql);
System.out.println("DROP DATABASE success");
}
public static void main(String[] args) throws SQLException {
ClusterTestApi test = new ClusterTestApi();
test.drop_DataBase();
}
}
package com.hrbu.hive;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TeatApi {
//驱动名称
private static String driverName = "org.apache.hive.jdbc.HiveDriver";
//连接用的url
private static String url = "jdbc:hive2://192.168.1.100:10000/default";
//用户名与密码无需提供
private static String user = "root";
private static String password = "";
private static Connection conn = null;
private static Statement stmt = null;
private static ResultSet rs = null;
/**
* junit单元测试方法,关键技术是注解
* 1可以随时测试某个方法,不用再写main函数与多余的代码
* 2面向切面的before和after使我们的代码结构更加合理
*/
//加载驱动,创建连接
@Before //表示在任意使用@Test注解标注的public void方法之前执行
public void init() {
try {
Class.forName(driverName);
conn = DriverManager.getConnection(url,user,password);
stmt = conn.createStatement();
} catch (Exception e) {
e.printStackTrace();
}
}
//释放资源
@After //表示在任意使用@Test注解标注的public void方法之后执行
public void destory() throws SQLException {
if(rs != null) {
rs.close();
}
if(stmt != null) {
stmt.close();
}
if(conn != null) {
conn.close();
}
}
//创建数据库
@Test
public void create_DataBase() throws SQLException {
String sql = "create database IF NOT EXISTS hive_jdbc_test"; //create database IF NOT EXISTS hive_jdbc_test
System.out.println("Running " + sql);
stmt.execute(sql);
System.out.println("create database success");
}
//创建表格
@Test
public void create_StudentTable() throws SQLException {
//String sql0 = "use hive_jdbc_test";
String sql = "create table student(code string,name string,gender string,school string,profession string)\r\n" +
" comment 'this is a student table'\r\n" +
" row format delimited fields terminated by '\\t'\r\n" +
" stored as textfile";
System.out.println("Running create table student");
//stmt.execute(sql0);
stmt.execute(sql);
System.out.println("create table student success");
}
// 查询所有数据库
@Test
public void show_DataBases() throws SQLException {
String sql = "show Databases";
System.out.println("Running " + sql);
rs = stmt.executeQuery(sql);
while(rs.next()) {
System.out.println(rs.getString(1));
}
}
// 查询当前数据库中所有表
@Test
public void show_Tables() throws SQLException {
String sql = "show tables";
System.out.println("Running " + sql);
rs = stmt.executeQuery(sql);
while(rs.next()) {
System.out.println(rs.getString(1));
}
}
//加载数据
@Test
public void load_Data() throws SQLException {
//linux路径
String filePath = "'/soft/datas/short-student-utf8.txt'";
String sql = "load data local inpath" + filePath + "overwrite into table test_db.student";
System.out.println("Running " + sql);
stmt.execute(sql);
System.out.println("load data local success");
}
//查询数据
@Test
public void select_Data() throws SQLException {
String sql = "select * from test_db.student";
System.out.println("Running " + sql);
rs = stmt.executeQuery(sql);
System.out.println("学号\t\t\t姓名\t性别\t学校\t\t专业");
while(rs.next()) {
System.out.println(rs.getString("code") + "\t" + rs.getString("name") + "\t" + rs.getString("gender") + "\t" + rs.getString("school") + "\t" + rs.getString("profession"));
}
}
//统计查询(运行mapreduce作业)
@Test
public void count_Data() throws SQLException {
String sql = "select count(*) from test_db.student";
System.out.println("Running " + sql);
rs = stmt.executeQuery(sql);
while(rs.next()) {
System.out.println(rs.getInt(1));
}
}
//删除表
@Test
public void drop_table() throws SQLException {
String sql = "drop table student";
System.out.println("Running " + sql);
stmt.execute(sql);
System.out.println("drop table success");
}
//删除数据库
@Test
public void drop_DataBase() throws SQLException {
//强制删除数据库
//String sql = "DROP DATABASE IF EXISTS flume_hive CASCADE";
String sql = "DROP DATABASE IF EXISTS hive_jdbc_test";
System.out.println("Running " + sql);
stmt.execute(sql);
System.out.println("DROP DATABASE success");
}
public static void main(String[] args) throws SQLException {
new TeatApi().drop_DataBase();
}
}
UDF
package com.hrbu.hive;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class GenderUDF extends UDF {
/**
* 方法名必须是evaluate,hive执行时会找它
* 业务逻辑为:判断输入值,"M"返回男,"F"返回女,否则返回未知
*
*/
public Text evaluate(Text text) {
String textStr = text.toString();
if(StringUtils.isNotEmpty(textStr)) {
if(textStr.equalsIgnoreCase("M")) {
return new Text("男");
}else if(textStr.equalsIgnoreCase("F")){
return new Text("女");
}else {
return new Text("未知");
}
} else {
return null;
}
}
}
# RedisApi
package com.hrbu.RedisApi;
import java.util.HashMap;
import redis.clients.jedis.Jedis;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
Jedis jedis = new Jedis("8.8.8.8",6379);
//jedis.flushAll();
//System.out.println( "Hello World!" + jedis.ping());
/*****1.字符串(String)*****/
// jedis.set("name1", "aaa");
// System.out.println(jedis.get("name1"));
// jedis.del("name1");
//
// jedis.mset("name1","aaa","name2","bbb","name3","ccc");
// System.out.println(jedis.mget("name1","name2","name3"));
//
// System.out.println(jedis.exists("name1"));
// System.out.println(jedis.exists("name","name0","name1","name2","name3"));
// System.out.println(jedis.keys("*"));
// System.out.println(jedis.type("name1"));
// System.out.println(jedis.randomKey());
// System.out.println(jedis.flushDB());
// System.out.println(jedis.get("name1"));
// System.out.println(jedis.flushAll());
// System.out.println(jedis.randomKey());
/****2.哈希(Hash)******/
/*
HashMap<String,String> hmap = new HashMap<String,String>();
hmap.put("name5", "eee");
hmap.put("name4", "ddd");
hmap.put("id", "666");
// 设置值
jedis.hset("user", "name6", "fff");
jedis.hmset("user", hmap);
// 取值
System.out.println("hget:\t" + jedis.hget("user", "name4"));
System.out.println("hmget:\t" + jedis.hmget("user", "name4","name5","name6"));
System.out.println("hgetAll:\t" + jedis.hgetAll("user"));
System.out.println("keys:\t" + jedis.keys("*"));
System.out.println("hkeys:\t" + jedis.hkeys("user"));
System.out.println("hvals:\t" + jedis.hvals("user"));
System.out.println("hlen:\t" + jedis.hlen("user"));
// 删除field
System.out.println("hdel\t" + jedis.hdel("user","name4"));
System.out.println("hdel\t" + jedis.hdel("user","name5","name6"));
System.out.println(jedis.hgetAll("user"));
// 清空
System.out.println(jedis.flushAll());
System.out.println(jedis.hgetAll("user"));
*/
/*******3.列表List(类似于栈???)*********/
/*
// 放值
jedis.lpushx("list", "999");
jedis.lpush("list", "hhh","ggg","iii"); //1或多到 开始
jedis.rpush("list", "kkk","jjj"); //1或多到 末尾
jedis.lpushx("list", "666"); // 插入已存在列表头部
// 取值
System.out.println(jedis.lrange("list", 0, 10)); //获取指定范围元素
System.out.println(jedis.blpop("list","5")); //移出 第一个元素 等待超时
System.out.println(jedis.brpop("list", "5")); //移出 最后一个元素 等待超时
System.out.println(jedis.lindex("list", 3)); //索引获取元素
System.out.println(jedis.llen("list")); //获取列表长度
System.out.println(jedis.lrange("list", 0, 10));
//清空
System.out.println(jedis.flushAll());
*/
/********4.集合(Set)****************/
/*
// 放值
jedis.sadd("set", "lll", "nnn", "mmm" ,"ooo");
// 取值
System.out.println(jedis.smembers("set")); // 所有元素
System.out.println(jedis.sismember("set", "ooo")); // key(value)是否存在
System.out.println(jedis.sismember("sett", "ooo"));
System.out.println(jedis.srem("set","ooo","mmm")); // 移除
System.out.println(jedis.smembers("set"));
// 运算
jedis.sadd("set", "set");
jedis.sadd("sett", "lll", "nnn", "mmm", "ooo", "sett");
System.out.println(jedis.sinter("set", "sett")); // 交集
System.out.println(jedis.sdiff("set", "sett")); // 差集
System.out.println(jedis.sunion("set", "sett")); // 并集
System.out.println(jedis.scard("set")); // 元素数目
System.out.println(jedis.scard("sett")); // 元素数目
//清空
System.out.println(jedis.flushAll());
*/
/********5.有序集合sorted set(zset)*************/
HashMap<String,Double> scoremap = new HashMap<String, Double>();
scoremap.put("vvv", 0.22);
scoremap.put("ppp", 0.16);
scoremap.put("sss", 0.19);
scoremap.put("qqq", 0.17);
scoremap.put("www", 0.23);
scoremap.put("rrr", 0.18);
scoremap.put("uuu", 0.21);
scoremap.put("ttt", 0.20);
// 放值
jedis.zadd("zset", scoremap); // 添加元素或更新分数
jedis.zadd("zset", 0.24, "xxx");
// 元素查找修改
System.out.println(jedis.zrange("zset", 0, 10)); // 根据索引返回区间
System.out.println(jedis.zcount("zset", 0.18, 0.21)); // 根据分数返回元素数
System.out.println(jedis.zrem("zset", "sss", "ppp")); // 移除元素
System.out.println(jedis.zcard("zset")); // 元素数目
System.out.println(jedis.zincrby("zset", 1, "www")); // 分数增量
System.out.println(jedis.zscore("zset", "www")); // 返回分数值
System.out.println(jedis.zrank("zset", "www")); // 返回元素索引
System.out.println(jedis.zrangeByScore("zset", 0, 2)); // 根据分数返回元素区间
//清空
System.out.println(jedis.flushAll());
jedis.close();
}
}
编辑 (opens new window)
上次更新: 2022/08/11, 00:48:36