唐宋元明清 唐宋元明清
首页
  • 基础

    • Java基础
  • 工具

    • hutool
    • commons
  • 框架

    • Spring Boot相关
  • 设计模式

    • 设计模式入门
  • Hadoop

    • Hadoop分布式搭建
    • Hadoop高可用搭建
    • 集群端口
    • 代码demo
  • Zookeeper

    • Zookeeper集群搭建
  • Hive

    • Hive集群搭建
    • Hive相关
    • HSQL
  • Kafka

    • Kafka集群搭建
  • HBase

    • HBase集群搭建
    • HBase基础学习
  • Spark

    • Spark环境搭建
    • Spark相关知识
  • Flink

    • Flink环境搭建
    • Flink学习
  • Flume

    • Flume安装配置
    • Flume高可用集群安装
    • Flume相关学习
  • Sqoop

    • Sqoop安装配置
    • Sqoop使用
  • 其他

    • docker
  • Oracle

    • Oracle相关知识杂记
    • 系统函数篇
    • 与MySQL语法区别
  • MySQL

    • MySQL知识点
  • Python

    • Python简单语法
    • Python操作Office
    • Python类库学习
    • Python爬虫
  • Shell

    • Shell基础
    • Shell命令行
  • Scala

    • 语法学习
  • 正则表达式

    • 正则基础
  • 调度

    • 调度工具
  • 前端

    • 前端相关
  • 杂记

    • 常用工具或网站
    • 琐碎知识
  • 摘录

    • 摘录
GitHub (opens new window)
首页
  • 基础

    • Java基础
  • 工具

    • hutool
    • commons
  • 框架

    • Spring Boot相关
  • 设计模式

    • 设计模式入门
  • Hadoop

    • Hadoop分布式搭建
    • Hadoop高可用搭建
    • 集群端口
    • 代码demo
  • Zookeeper

    • Zookeeper集群搭建
  • Hive

    • Hive集群搭建
    • Hive相关
    • HSQL
  • Kafka

    • Kafka集群搭建
  • HBase

    • HBase集群搭建
    • HBase基础学习
  • Spark

    • Spark环境搭建
    • Spark相关知识
  • Flink

    • Flink环境搭建
    • Flink学习
  • Flume

    • Flume安装配置
    • Flume高可用集群安装
    • Flume相关学习
  • Sqoop

    • Sqoop安装配置
    • Sqoop使用
  • 其他

    • docker
  • Oracle

    • Oracle相关知识杂记
    • 系统函数篇
    • 与MySQL语法区别
  • MySQL

    • MySQL知识点
  • Python

    • Python简单语法
    • Python操作Office
    • Python类库学习
    • Python爬虫
  • Shell

    • Shell基础
    • Shell命令行
  • Scala

    • 语法学习
  • 正则表达式

    • 正则基础
  • 调度

    • 调度工具
  • 前端

    • 前端相关
  • 杂记

    • 常用工具或网站
    • 琐碎知识
  • 摘录

    • 摘录
GitHub (opens new window)
  • Hadoop

    • Hadoop分布式搭建
    • Hadoop高可用搭建
    • 集群端口
    • 代码demo(mr hbase hive redis)
      • 环境
      • MapReduce代码
      • hbase代码
      • MySQL数据插入到HBase
      • hive代码
      • RedisApi
  • Zookeeper

    • Zookeeper集群搭建
  • Hive

    • Hive集群搭建
    • Hive相关
    • HSQL
  • Kafka

    • Kafka集群搭建
  • HBase

    • HBase集群搭建
    • HBase基础学习
  • Spark

    • Spark环境搭建
    • Spark相关知识
    • Spark内核学习
  • Flink

    • Flink环境搭建
    • Flink学习
  • Flume

    • Flume安装配置
    • Flume高可用集群安装
    • Flume相关学习
    • Flume把数据导入hive(文件方式)
  • 数据集成工具

    • Sqoop安装配置
    • Sqoop使用
    • 其他ETL工具
  • Impala

    • Impala
  • 调度

    • 调度工具
  • 其他

    • docker
  • 大数据
  • Hadoop
Ai
2022-02-27
目录

代码demo(mr hbase hive redis)

# 环境


基于Windows下eclipse的MapReduce开发环境配置 https://www.cnblogs.com/Hephaestus/p/12608456.html

# MapReduce代码


MyMapper

package mrproject;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	 /**
     * map阶段的业务逻辑处理就写在map()方法中
     * maptask会对每一行输入数据调用一次我们自定义的map()方法
     * @throws InterruptedException
     */
	 private Text word = new Text();//Text是Hadoop针对字符串的序列化包装类
     private IntWritable one = new IntWritable(1);//IntWritable是Hadoop针对整型的序列化包装类---更多关于Hadoop的序列化包装类请查阅相关资料
     
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		//将maptask传递给我们的文本内容先转换成string
        String line=value.toString();
        //按照空格行切割单词
        String[] words=line.split(" ");
        //将单词输出为<单词,1>
        for(String w:words) {
            word.set(w);
           //将单词作为key,将次数1作为Value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reduce task
            context.write(word,one);
        }
	}
	
}

MyReducer

package mrproject;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	 /**
	  * reducer中主要的方式就是reduce,用户处理map阶段产生的key-value数据,如下样例
     * <angel,1> <angel,1> <angel,1> <angel,1> <angel,1>
     * <hello,1> <hello,1> <hello,1> <hello,1> <hello,1> <hello,1>
     * <banana,1> <banana,1> <banana,1> <banana,1> <banana,1> <banana,1>
     * 入参key:是一组单词的kv对应的key,将相同单词的一组传递,如此时key是hello,那么参数二是一个迭代器,一组数
     * @throws InterruptedException
     * @throws IOException
     */
	private IntWritable sum= new IntWritable();
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
		int count=0;
	     /**
	             Iterator<IntWritable> iterator=values.iterator();
	             while(iterator.hasNext()) {
	                 count+=iterator.next().get();
	             }
	     */
		//此处的values是某个key对应的所有的value,例如上面数据angel对应了5个1
	             for(IntWritable value:values) {
	                 count+=value.get();
	             }
	             sum.set(count);
	             //上下文输出
	             context.write(key, sum);
	 }   
}

MRApp

package mrproject;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class MRApp {
	public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        //此处主要用于本地运行,如果生产环境使用shell命令运行,则命令中会有输入输出目录
        //本地运行时为了方便没有给到args,此处代码处理下给个默认值
        if(args.length <2)
        {
           args= new String[]{
                     "hdfs://10.20.10.67:9000/input",
                     "hdfs://10.20.10.67:9000/output4"
           };   
        }
        Configuration conf=new Configuration();
        /*
         * 集群中节点都有配置文件
        conf.set("mapreduce.framework.name.", "yarn");
        conf.set("yarn.resourcemanager.hostname", "mini1");
        */
        Job job=Job.getInstance(conf);
        //jar包在哪里,现在在客户端,传递参数
        //任意运行,类加载器知道这个类的路径,就可以知道jar包所在的本地路径
        job.setJarByClass(MRApp.class);
        //指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        //指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //指定最终输出的数据kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        //指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job的输出结果所在目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //将job中配置的相关参数及job所用的java类在的jar包,提交给yarn去运行
        //提交之后,此时客户端代码就执行完毕,退出
        //job.submit();
        //等集群返回结果在退出
        boolean res=job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

# hbase代码


表操作(DDL DML)

package com.hrbu.hbase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceExistException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.RandomRowFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.util.Bytes;

/**
 * DDL: 1.判断表是否存在 2.创建表 3.创建命名空间 4.删除表 
 * DML: 5.插入数据 6.查数据(get) 7.查数据(scan) 8.删除数据
 */
public class TestApi {

	public static Connection connect = null;
	public static Admin admin = null;
	static {
		try {
			// 1. 获取配置文件信息(使用 HBaseConfiguration 的单例方法实例化)
			Configuration conf = HBaseConfiguration.create();
			//conf.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103");

			conf.set("hbase.zookeeper.quorum", "hadoop1");
			conf.set("hbase.zookeeper.property.clientPort", "2181");

			// 2. 创建连接对象
			connect = ConnectionFactory.createConnection(conf);

			// 3. 获取管理员对象
			admin = connect.getAdmin();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			System.out.println("创建hbase连接对象异常");
			e.printStackTrace();
		}

	}

	// 关闭资源和连接
	public static void close() {

		if (admin != null) {
			try {
				admin.close();
			} catch (IOException e) {
				System.out.println("admin关闭异常");
				e.printStackTrace();
			}
		}

		if (connect != null) {
			try {
				connect.close();
			} catch (IOException e) {
				System.out.println("hbase连接对象关闭异常");
				e.printStackTrace();
			}
		}

	}

	// 1.判断表是否存在
	public static void is_Exists(String tableName) throws IOException {
		if (admin.tableExists(TableName.valueOf(tableName))) {
			System.out.println(tableName + "表已经存在");
		} else {
			System.out.println(tableName + "表不存在");
		}
	}

	// 2.创建表
	public static void create_Table(String tableName) throws IOException {
		TableName tableNameTemp = TableName.valueOf(tableName);
		if (admin.tableExists(tableNameTemp)) {
			System.out.println(tableName + "表已经存在");
		} else {
			// 4.通过表实例来执行表结构信息
			TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(tableNameTemp);
			// 列族
			ColumnFamilyDescriptor info1 = ColumnFamilyDescriptorBuilder.of("info1");
			ColumnFamilyDescriptor info2 = ColumnFamilyDescriptorBuilder.of("info2");
			ColumnFamilyDescriptor info3 = ColumnFamilyDescriptorBuilder.of("info3");
			List<ColumnFamilyDescriptor> cfList = new ArrayList<ColumnFamilyDescriptor>();
			cfList.add(info1);
			cfList.add(info2);
			cfList.add(info3);
			tableBuilder.setColumnFamilies(cfList);
			// 5.构建表描述
			TableDescriptor tableDesc = tableBuilder.build();
			admin.createTable(tableDesc);
			System.out.println(tableName + "\t表创建成功");
		}
	}

	// 3.创建命名空间
	public static void create_NameSpace(String nameSpace) {
		// 创建命名空间描述器
		NamespaceDescriptor descriptor = NamespaceDescriptor.create(nameSpace).build();
		//创建命名空间
		try {
			admin.createNamespace(descriptor);
		} catch(NamespaceExistException e) {
			System.out.println(nameSpace + "命名空间已存在");
		} catch (IOException e) {
			e.printStackTrace();
		}
		System.out.println("--我会被执行吗--");
	}
	// 4.删除表
	public static void drop_Table(String tableName) throws IOException {
		TableName tableNameTemp = TableName.valueOf(tableName);
		if (admin.tableExists(tableNameTemp)) {
			admin.disableTable(tableNameTemp);
			admin.deleteTable(tableNameTemp);
			System.out.println("表" + tableName + "删除成功! ");
		} else {
			System.out.println("表" + tableName + "不存在! ");
		}
	}
	// 5.插入数据
	public static void put_Data(String tableName, String rowKey, String columnFamily, String column, String value) {
		Table table = null;
		try {
			table = connect.getTable(TableName.valueOf(tableName));
			Put put = new Put(Bytes.toBytes(rowKey));
			put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
			table.put(put);
		} catch(NoSuchColumnFamilyException e){
			System.out.println("异常:没有此列族");
		} catch (IOException e) {
			e.printStackTrace();
		}finally {
			try {
				if(table!=null) {
					table.close();
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		System.out.println("数据插入成功");
		
	}
	// 6.查数据(get)
	public static void get_Data(String tableName, String rowKey, String columnFamily, String column) throws IOException {
		Table table = connect.getTable(TableName.valueOf(tableName));
		
		Get get = new Get(Bytes.toBytes(rowKey));
		//get.addFamily(Bytes.toBytes(columnFamily));
		get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
		Result result = table.get(get);
		Cell[] cells = result.rawCells();
		System.out.println(tableName + "--" + rowKey + "--" + columnFamily + "--" + column + ":");
		print_Cells2(cells);
		table.close();
	}
	public static void get_Data(String tableName, String rowKey, String columnFamily) throws IOException {
		Table table = connect.getTable(TableName.valueOf(tableName));
		
		Get get = new Get(Bytes.toBytes(rowKey));
		get.addFamily(Bytes.toBytes(columnFamily));
		Result result = table.get(get);
		Cell[] cells = result.rawCells();
		System.out.println(tableName + "--" + rowKey + "--" + columnFamily + ":");
		print_Cells2(cells);
		table.close();
	}
	public static void get_Data(String tableName, String rowKey) throws IOException {
		Table table = connect.getTable(TableName.valueOf(tableName));
		
		Get get = new Get(Bytes.toBytes(rowKey));
		Result result = table.get(get);
		Cell[] cells = result.rawCells();
		System.out.println(tableName + "--" + rowKey + ":");
		print_Cells2(cells);
		table.close();
	}
	// 打印单元格信息
	private static void print_Cells(Cell[] cells) {
		for (Cell tempCell : cells) {
			System.out.println(Bytes.toString(CellUtil.cloneRow(tempCell)) 
					+ "\t\tcolumn=" + Bytes.toString(CellUtil.cloneFamily(tempCell)) + ":" +  Bytes.toString(CellUtil.cloneQualifier(tempCell))
					+ ",timestamp=" + tempCell.getTimestamp() 
					+ ", value=" + Bytes.toString(CellUtil.cloneValue(tempCell)) );
		}
	}
	// 打印单元格信息
	private static void print_Cells2(Cell[] cells) {
		StringBuilder sb = new StringBuilder();
		for (Cell cell : cells) {
			String column = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
			String value = Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
			sb.append(column).append(":").append(value).append(";\t");
		}
		System.out.println(sb.toString());
	}
	// 7.查数据(scan)
	public static void scan_Data(String tableName) {
		Table table = null;
		try {
			table = connect.getTable(TableName.valueOf(tableName));
			ResultScanner results = table.getScanner(new Scan());
			System.out.println(tableName + "表\nROW\t\tCOLUMN+CELL");
			for (Result result : results) {
				Cell[] cells = result.rawCells();
				print_Cells(cells);
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			try {
				if(table!=null) {
					table.close();
				}
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	// 7.2 查数据(scan + filter)
	public static void filter_Data(String tableName, Filter filter, int limit) throws IOException {
		Table table = connect.getTable(TableName.valueOf(tableName));
		Scan scan = new Scan().setFilter(filter).setLimit(limit);
		ResultScanner resultScan = table.getScanner(scan);
		System.out.println(tableName + "表(filter)\nROW\t\tCOLUMN+CELL");
		for (Result result : resultScan) {
			Cell[] cells = result.rawCells();
			print_Cells(cells);
		}
		table.close();
	}
	// 8. 删除多行数据
	public static void delete_Data(String tableName, String... rows) throws IOException {
		Table table = connect.getTable(TableName.valueOf(tableName));
		//Delete delete = new Delete(Bytes.toBytes(rowKey));
		List<Delete> deleteList = new ArrayList<Delete>();
		for (String row:rows) {
			Delete delete = new Delete(Bytes.toBytes(row));
			deleteList.add(delete);
		}
		table.delete(deleteList);
		table.close();
		System.out.println("删除成功");
	}
	// 8.1 删除单元格(Cell)数据
	public static void delete_Cell(String tableName,  String rowKey, String columnFamily, String column ) throws IOException {
		Table table = connect.getTable(TableName.valueOf(tableName));
		Delete delete = new Delete(Bytes.toBytes(rowKey));
		delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));		//删除最后一版本
		//delete.addColumns(Bytes.toBytes(columnFamily), Bytes.toBytes(column));		//删除所有版本
		table.delete(delete);
		table.close();
	}

	public static void main(String[] args) throws IOException {
		System.out.println("============================start=========================");
		// 1.判断表是否存在
		//is_Exists("student");
		
		// 3.创建命名空间
		//create_NameSpace("std");
		
		// 2.创建表
		//create_Table("std:stu");
		
		// 4.删除表
		//drop_Table("std:stu");
		
		// 5.插入数据
		//put_Data("std:stu", "1003", "info1", "addr", "beijing");
		//put_Data("std:stu", "1004", "info1", "sex", "男");
		//put_Data("std:stu", "1005", "info1", "class", "2");
		
		// 6.查数据(get)
		get_Data("std:stu", "1002");
		get_Data("std:stu", "1002", "info1");
		
		// 7.查数据(scan)
		scan_Data("std:stu");
		
		// 7.1 查数据(scan + filter)
		//filter_Data("std:stu", new PrefixFilter(Bytes.toBytes("100")), 10);	//筛选出行键以row为前缀的所有的行
		//filter_Data("std:stu", new RandomRowFilter((float) 0.2), 10);	//按照一定的几率(<=0会过滤掉所有的行,>=1会包含所有的行)来返回随机的结果集
		//filter_Data("std:stu", new KeyOnlyFilter(), 10); // 返回所有的行,但值全是空 
		//  筛选出匹配的所有的行  
		filter_Data("std:stu", new RowFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("1002"))), 10);
		
		// 8. 删除多行数据数据
		//delete_Data("std:stu", "1003","1004");
		
		// 8.1删除单元格数据
		//delete_Cell("std:stu", "1005", "info1", "name");
		
		scan_Data("std:stu");
		
		System.out.println("============================end==========================");
		// end:关闭资源和连接
		close();
	}

}

# MySQL数据插入到HBase


插入HBase

package com.hrbu.hbase;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.util.Bytes;

public class Sqoop_Hbase {
	public static Connection connect = null;
	public static Admin admin = null;
	public static void init() {
		try {
			// 1. 获取配置文件信息(使用 HBaseConfiguration 的单例方法实例化)
			Configuration conf = HBaseConfiguration.create();
			//conf.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103");

			conf.set("hbase.zookeeper.quorum", "hadoop1");
			conf.set("hbase.zookeeper.property.clientPort", "2181");

			// 2. 创建连接对象
			connect = ConnectionFactory.createConnection(conf);

			// 3. 获取管理员对象
			admin = connect.getAdmin();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			System.out.println("创建hbase连接对象异常");
			e.printStackTrace();
		}

	}

	// 关闭资源和连接
	public static void close() {

		if (admin != null) {
			try {
				admin.close();
			} catch (IOException e) {
				System.out.println("admin关闭异常");
				e.printStackTrace();
			}
		}

		if (connect != null) {
			try {
				connect.close();
			} catch (IOException e) {
				System.out.println("hbase连接对象关闭异常");
				e.printStackTrace();
			}
		}

	}
	
	// 插入数据
	public static void put_Data(String tableName, String columnFamily,List<Staff> list) throws IOException {
		
		if(admin.tableExists(TableName.valueOf(tableName))) {
			System.out.println(tableName + "表已经存在");
		}else {
			// 4.通过表实例来执行表结构信息
			TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
			// 列族
			ColumnFamilyDescriptor info1 = ColumnFamilyDescriptorBuilder.of(columnFamily);
			tableBuilder.setColumnFamily(info1);
			// 5.构建表描述
			TableDescriptor tableDesc = tableBuilder.build();
			admin.createTable(tableDesc);
			System.out.println(tableName + "\t表创建成功");
			
			Table table = null;
			try {
				table = connect.getTable(TableName.valueOf(tableName));
				for (int i = 0; i < list.size(); i++) {
					//解决数字(int double乱码) 先转字符,再转字符数组
					Put put = new Put(Bytes.toBytes(String.valueOf(list.get(i).getId())));
					//汉字乱码该怎么解决(将utf-8的汉字转换成unicode格式汉字码  不好使)
					put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("name"), Bytes.toBytes(list.get(i).getName()));
					put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("sex"), Bytes.toBytes(list.get(i).getSex()));
					table.put(put);
				}
			} catch(NoSuchColumnFamilyException e){
				System.out.println("异常:没有此列族");
			} catch (IOException e) {
				e.printStackTrace();
			}finally {
				try {
					if(table!=null) {
						table.close();
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
	//外部调用的方法(算是封装好的)
	public static void main_Use(String tableName, String columnFamily,List<Staff> list){
		init();
		try {
			put_Data(tableName, columnFamily, list);
			scan_Data(tableName);
		} catch (IOException e) {
			//e.printStackTrace();
			System.out.println("admin异常");
		}finally {
			close();
		}
		System.out.println("插入数据成功");
	}
	
	
	/****************************以下为多余代码*****************************************/
	// 7.查数据(scan)
	public static void scan_Data(String tableName) {
		Table table = null;
		try {
			table = connect.getTable(TableName.valueOf(tableName));
			ResultScanner results = table.getScanner(new Scan());
			System.out.println(tableName + "表\nROW\t\tCOLUMN+CELL");
			for (Result result : results) {
				Cell[] cells = result.rawCells();
				print_Cells(cells);
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			try {
				if(table!=null) {
					table.close();
				}
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	// 打印单元格信息
	private static void print_Cells(Cell[] cells) {
		for (Cell tempCell : cells) {
			System.out.println(Bytes.toString(CellUtil.cloneRow(tempCell)) 
					+ "\t\tcolumn=" + Bytes.toString(CellUtil.cloneFamily(tempCell)) + ":" +  Bytes.toString(CellUtil.cloneQualifier(tempCell))
					+ ",timestamp=" + tempCell.getTimestamp() 
					+ ", value=" + Bytes.toString(CellUtil.cloneValue(tempCell)) );
		}
	}
	 /**
     * 1将utf-8的汉字转换成unicode格式汉字码
     * @param string
     * @return
     */
    public static String stringToUnicode(String string) {

        StringBuffer unicode = new StringBuffer();
        for (int i = 0; i < string.length(); i++) {
            char c = string.charAt(i);
            unicode.append("\\u" + Integer.toHexString(c));
        }
        String str = unicode.toString();

        return str.replaceAll("\\\\", "0x");
    }

    /**
     * 2将unicode的汉字码转换成utf-8格式的汉字
     * @param unicode
     * @return
     */
    public static String unicodeToString(String unicode) {

        String str = unicode.replace("0x", "\\");

        StringBuffer string = new StringBuffer();
        String[] hex = str.split("\\\\u");
        for (int i = 1; i < hex.length; i++) {
            int data = Integer.parseInt(hex[i], 16);
            string.append((char) data);
        }
        return string.toString();
    }

}

读MySQL

package com.hrbu.hbase;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;


public class Sqoop_Mysql {
	public static Connection connect = null;
	
	public static void init() {
		//驱动程序名
		String driver = "com.mysql.cj.jdbc.Driver";
		//url指向要访问的数据库名
		String url = "jdbc:mysql://hadoop1:3306/company?useUnicode=true&characterEncoding=utf-8&useSSL=false";
		//mysql配置时的用户名
		String username = "root";
		//密码
		String password = "123456789";
		
		try {
			//加载驱动程序
			Class.forName(driver);
			connect = DriverManager.getConnection(url,username,password);
			if (!connect.isClosed()) {
                System.out.println("数据库连接成功");
            }
		} catch (ClassNotFoundException|SQLException e) {
			e.printStackTrace();
		} 
	}
	
	public static void close(){
		if(connect!=null) {
			try {
				connect.close();
			} catch (SQLException e) {
				e.printStackTrace();
			}
		}
	}
	
	public static List<Staff> selectAllFromStaff() {
		//实例化Statement对象
		List<Staff> list = new ArrayList<Staff>();
		Statement stmt = null;
		try {
			stmt = connect.createStatement();
			String sql = "select * from staff";
			ResultSet resultSet = stmt.executeQuery(sql);
			while(resultSet.next()) {
				Staff staff = new Staff();
				staff.setId(resultSet.getInt("id"));
				staff.setName(resultSet.getString("name"));
				staff.setSex(resultSet.getString("sex"));
				list.add(staff);
			}
			
		} catch (SQLException e) {
			e.printStackTrace();
		} finally {
			try {
				if(stmt!=null) {
					stmt.close();
				}
			} catch (SQLException e) {
				e.printStackTrace();
			}
		}
		return list;
	}
	
	public static void print(List<Staff> list) {
		for (int i = 0; i < list.size(); i++) {
			System.out.println(list.get(i).getId() + "\t" +list.get(i).getName() + "\t" + list.get(i).getSex());
		}
	}
	//外部调用的方法(算是封装好的)
	public static List<Staff> main_Use() {
		init();
		List<Staff> list = selectAllFromStaff();
		close();
		return list;
	}
}

class Staff {
	int id;
	String name;
	String sex;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getSex() {
		return sex;
	}
	public void setSex(String sex) {
		this.sex = sex;
	}
}
import java.util.List;

public class Sqoop_Test {
	public static void main(String[] args) {
		List<Staff> list = Sqoop_Mysql.main_Use();
		Sqoop_Hbase.main_Use("staff", "info", list);
	}
}
package com.hrbu.hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

public class Test {

	public static void main(String[] args) throws IOException {
		// TODO Auto-generated method stub
		Configuration conf = null;
		conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", "192.168.1.100");
		conf.set("hbase.zookeeper.property.clientPort", "2181");
		System.out.println("11111111111111111111");
		//2. 创建连接对象
		Connection connect = null;
		connect = ConnectionFactory.createConnection(conf);
		System.out.println("22222222222222222222");
		//3. 获取管理员对象
		Admin admin = connect.getAdmin();
		System.out.println("333333333333333333333" + admin + "333" + connect);
		//System.out.println(admin.tableExists(TableName.valueOf("student")));
		if(admin.tableExists(TableName.valueOf("student"))) {
			System.out.println("表已经存在");
		}else {
			System.out.println("表不存在");
		}
		System.out.println("44444444444444444444");
		admin.close();
		connect.close();
	}

}

# hive代码


表操作

package com.hrbu.hive;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class ClusterTestApi {
	
	//驱动名称
	private static String driverName = "org.apache.hive.jdbc.HiveDriver";
	//连接用的url
	private static String url = "jdbc:hive2://8.8.8.100:10000/default";
	//用户名与密码无需提供
	private static String user = "banana";
	private static String password = "";
	
	private static Connection conn = null;
	private static Statement stmt = null;
	private static ResultSet rs = null;
	
	/**
	 * junit单元测试方法,关键技术是注解
	 * 1可以随时测试某个方法,不用再写main函数与多余的代码
	 * 2面向切面的before和after使我们的代码结构更加合理
	 */
	//加载驱动,创建连接
	@Before	//表示在任意使用@Test注解标注的public void方法之前执行
	public void init() {
		try {
			Class.forName(driverName);
			conn = DriverManager.getConnection(url,user,password);
			stmt = conn.createStatement();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	//释放资源
	@After //表示在任意使用@Test注解标注的public void方法之后执行
	public void destory() throws SQLException {
		if(rs != null) {
			rs.close();
		}
		if(stmt != null) {
			stmt.close();
		}
		if(conn != null) {
			conn.close();
		}
	}
	//创建数据库
	@Test
	public void create_DataBase() throws SQLException {
		String sql = "create database IF NOT EXISTS hive_jdbc_test";	//create database IF NOT EXISTS hive_jdbc_test
		System.out.println("Running " + sql);
		stmt.execute(sql);
		System.out.println("create database success");
	}
	//创建表格
	@Test
	public void create_StudentTable() throws SQLException {
		//String sql0 = "use hive_jdbc_test";
		String sql = "create table student(code string,name string,gender string,school string,profession string)\r\n" + 
				" comment 'this is a student table'\r\n" + 
				" row format delimited fields terminated by '\\t'\r\n" + 
				" stored as textfile";
		System.out.println("Running create table student");
		//stmt.execute(sql0);
		stmt.execute(sql);
		System.out.println("create table student success");
	}
	
	// 查询所有数据库
	@Test
	public void show_DataBases() throws SQLException {
		String sql = "show Databases";
		System.out.println("Running " + sql);
		rs = stmt.executeQuery(sql);
		while(rs.next()) {
			System.out.println(rs.getString(1));
		}
	}
	// 查询当前数据库中所有表
	@Test
	public void show_Tables() throws SQLException {
		String sql = "show tables";
		System.out.println("Running " + sql);
		rs = stmt.executeQuery(sql);
		while(rs.next()) {
			System.out.println(rs.getString(1));
		}
	}
	//加载数据
	@Test
	public void load_Data() throws SQLException {
		//linux路径
		String filePath = " '/soft/module/datas/short-student-utf8.txt' ";
		String sql = "load data local inpath" + filePath + "overwrite into table student";
		System.out.println("Running " + sql);
		stmt.execute(sql);
		System.out.println("load data local success");
	}
	//查询数据
	@Test
	public void select_Data() throws SQLException {
		String sql = "select * from test_db.student";
		System.out.println("Running " + sql);
		rs = stmt.executeQuery(sql);
		System.out.println("学号\t姓名\t性别\t学校\t专业");
		while(rs.next()) {
			System.out.println(rs.getString("code") + "\t" + rs.getString("name") + "\t" + rs.getString("gender") + "\t" + rs.getString("school") + "\t" + rs.getString("profession"));
		}
	}
	//统计查询(运行mapreduce作业)
	@Test
	public void count_Data() throws SQLException {
		String sql = "select count(*) from test_db.student";
		System.out.println("Running " + sql);
		rs = stmt.executeQuery(sql);
		System.out.println("学号\t姓名\t性别\t学校\t专业");
		while(rs.next()) {
			System.out.println(rs.getInt(1));
		}
	}
	//删除表
	@Test
	public void drop_table() throws SQLException {
		String sql = "drop table student";
		System.out.println("Running " + sql);
		stmt.execute(sql);
		System.out.println("drop table success");
	}
	
	//删除数据库
	@Test
	public void drop_DataBase() throws SQLException {
		//强制删除数据库
		//String sql = "DROP DATABASE IF EXISTS hive_jdbc_test CASCADE";
		String sql = "DROP DATABASE IF EXISTS hive_jdbc_test";
		System.out.println("Running " + sql);
		stmt.execute(sql);
		System.out.println("DROP DATABASE success");
	}
	
	public static void main(String[] args) throws SQLException {
		ClusterTestApi test = new ClusterTestApi();
		test.drop_DataBase();
	}
}
package com.hrbu.hive;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;



public class TeatApi {
	
	//驱动名称
	private static String driverName = "org.apache.hive.jdbc.HiveDriver";
	//连接用的url
	private static String url = "jdbc:hive2://192.168.1.100:10000/default";
	//用户名与密码无需提供
	private static String user = "root";
	private static String password = "";
	
	private static Connection conn = null;
	private static Statement stmt = null;
	private static ResultSet rs = null;
	
	/**
	 * junit单元测试方法,关键技术是注解
	 * 1可以随时测试某个方法,不用再写main函数与多余的代码
	 * 2面向切面的before和after使我们的代码结构更加合理
	 */
	//加载驱动,创建连接
	@Before	//表示在任意使用@Test注解标注的public void方法之前执行
	public void init() {
		try {
			Class.forName(driverName);
			conn = DriverManager.getConnection(url,user,password);
			stmt = conn.createStatement();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	//释放资源
	@After //表示在任意使用@Test注解标注的public void方法之后执行
	public void destory() throws SQLException {
		if(rs != null) {
			rs.close();
		}
		if(stmt != null) {
			stmt.close();
		}
		if(conn != null) {
			conn.close();
		}
	}
	//创建数据库
	@Test
	public void create_DataBase() throws SQLException {
		String sql = "create database IF NOT EXISTS hive_jdbc_test";	//create database IF NOT EXISTS hive_jdbc_test
		System.out.println("Running " + sql);
		stmt.execute(sql);
		System.out.println("create database success");
	}
	//创建表格
	@Test
	public void create_StudentTable() throws SQLException {
		//String sql0 = "use hive_jdbc_test";
		String sql = "create table student(code string,name string,gender string,school string,profession string)\r\n" + 
				" comment 'this is a student table'\r\n" + 
				" row format delimited fields terminated by '\\t'\r\n" + 
				" stored as textfile";
		System.out.println("Running create table student");
		//stmt.execute(sql0);
		stmt.execute(sql);
		System.out.println("create table student success");
	}
	
	// 查询所有数据库
	@Test
	public void show_DataBases() throws SQLException {
		String sql = "show Databases";
		System.out.println("Running " + sql);
		rs = stmt.executeQuery(sql);
		while(rs.next()) {
			System.out.println(rs.getString(1));
		}
	}
	// 查询当前数据库中所有表
	@Test
	public void show_Tables() throws SQLException {
		String sql = "show tables";
		System.out.println("Running " + sql);
		rs = stmt.executeQuery(sql);
		while(rs.next()) {
			System.out.println(rs.getString(1));
		}
	}
	//加载数据
	@Test
	public void load_Data() throws SQLException {
		//linux路径
		String filePath = "'/soft/datas/short-student-utf8.txt'";
		String sql = "load data local inpath" + filePath + "overwrite into table test_db.student";
		System.out.println("Running " + sql);
		stmt.execute(sql);
		System.out.println("load data local success");
	}
	//查询数据
	@Test
	public void select_Data() throws SQLException {
		String sql = "select * from test_db.student";
		System.out.println("Running " + sql);
		rs = stmt.executeQuery(sql);
		System.out.println("学号\t\t\t姓名\t性别\t学校\t\t专业");
		while(rs.next()) {
			System.out.println(rs.getString("code") + "\t" + rs.getString("name") + "\t" + rs.getString("gender") + "\t" + rs.getString("school") + "\t" + rs.getString("profession"));
		}
	}
	//统计查询(运行mapreduce作业)
	@Test
	public void count_Data() throws SQLException {
		String sql = "select count(*) from test_db.student";
		System.out.println("Running " + sql);
		rs = stmt.executeQuery(sql);
		while(rs.next()) {
			System.out.println(rs.getInt(1));
		}
	}
	//删除表
	@Test
	public void drop_table() throws SQLException {
		String sql = "drop table student";
		System.out.println("Running " + sql);
		stmt.execute(sql);
		System.out.println("drop table success");
	}
	
	//删除数据库
	@Test
	public void drop_DataBase() throws SQLException {
		//强制删除数据库
		//String sql = "DROP DATABASE IF EXISTS flume_hive CASCADE";
		String sql = "DROP DATABASE IF EXISTS hive_jdbc_test";
		System.out.println("Running " + sql);
		stmt.execute(sql);
		System.out.println("DROP DATABASE success");
	}
	
	public static void main(String[] args) throws SQLException {
		new TeatApi().drop_DataBase();
	}
}

UDF

package com.hrbu.hive;


import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class GenderUDF extends UDF {
	/**
	 * 方法名必须是evaluate,hive执行时会找它
	 * 业务逻辑为:判断输入值,"M"返回男,"F"返回女,否则返回未知
	 * 
	 */
	public Text evaluate(Text text) {
		String textStr = text.toString();
		if(StringUtils.isNotEmpty(textStr)) {
			if(textStr.equalsIgnoreCase("M")) {
				return new Text("男");
			}else if(textStr.equalsIgnoreCase("F")){
				return new Text("女");
			}else {
				return new Text("未知");
			}
			
		} else {
			return null;
		}
	}
}

# RedisApi


package com.hrbu.RedisApi;

import java.util.HashMap;

import redis.clients.jedis.Jedis;

/**
 * Hello world!
 *
 */
public class App 
{
    public static void main( String[] args )
    {
    	Jedis jedis = new Jedis("8.8.8.8",6379);
    	//jedis.flushAll();
        //System.out.println( "Hello World!" + jedis.ping());
        /*****1.字符串(String)*****/
//    	jedis.set("name1", "aaa");
//        System.out.println(jedis.get("name1"));
//    	jedis.del("name1");
//    	
//    	jedis.mset("name1","aaa","name2","bbb","name3","ccc");
//    	System.out.println(jedis.mget("name1","name2","name3"));
//    	
//    	System.out.println(jedis.exists("name1"));
//    	System.out.println(jedis.exists("name","name0","name1","name2","name3"));
//    	System.out.println(jedis.keys("*"));
//    	System.out.println(jedis.type("name1"));
//    	System.out.println(jedis.randomKey());
//    	System.out.println(jedis.flushDB());
//    	System.out.println(jedis.get("name1"));
//    	System.out.println(jedis.flushAll());
//    	System.out.println(jedis.randomKey());
    	
    	/****2.哈希(Hash)******/
    	/*
    	HashMap<String,String> hmap = new HashMap<String,String>();
    	hmap.put("name5", "eee");
    	hmap.put("name4", "ddd");
    	hmap.put("id", "666");
    	// 设置值
    	jedis.hset("user", "name6", "fff");
    	jedis.hmset("user", hmap);
    	// 取值
    	System.out.println("hget:\t" + jedis.hget("user", "name4"));
    	System.out.println("hmget:\t" + jedis.hmget("user", "name4","name5","name6"));
    	System.out.println("hgetAll:\t" + jedis.hgetAll("user"));
    	System.out.println("keys:\t" + jedis.keys("*"));
    	System.out.println("hkeys:\t" + jedis.hkeys("user"));
    	System.out.println("hvals:\t" + jedis.hvals("user"));
    	System.out.println("hlen:\t" + jedis.hlen("user"));
    	// 删除field
    	System.out.println("hdel\t" + jedis.hdel("user","name4"));
    	System.out.println("hdel\t" + jedis.hdel("user","name5","name6"));
    	System.out.println(jedis.hgetAll("user"));
    	// 清空
    	System.out.println(jedis.flushAll());
    	System.out.println(jedis.hgetAll("user"));
    	*/

    	
    	/*******3.列表List(类似于栈???)*********/
    	/*
    	// 放值
    	jedis.lpushx("list", "999");
    	jedis.lpush("list", "hhh","ggg","iii");		//1或多到  开始
    	jedis.rpush("list", "kkk","jjj");			//1或多到  末尾
    	jedis.lpushx("list", "666");				// 插入已存在列表头部
    	// 取值
    	System.out.println(jedis.lrange("list", 0, 10));	//获取指定范围元素
    	System.out.println(jedis.blpop("list","5"));		//移出 第一个元素 等待超时
    	System.out.println(jedis.brpop("list", "5"));		//移出 最后一个元素 等待超时
    	System.out.println(jedis.lindex("list", 3));		//索引获取元素
    	System.out.println(jedis.llen("list"));				//获取列表长度
    	System.out.println(jedis.lrange("list", 0, 10));
    	//清空
    	System.out.println(jedis.flushAll());
    	*/
    	
    	
    	/********4.集合(Set)****************/
    	/*
    	// 放值
    	jedis.sadd("set", "lll", "nnn", "mmm" ,"ooo");
    	// 取值
    	System.out.println(jedis.smembers("set"));		// 所有元素
    	System.out.println(jedis.sismember("set", "ooo"));			// key(value)是否存在
    	System.out.println(jedis.sismember("sett", "ooo"));
    	System.out.println(jedis.srem("set","ooo","mmm"));		// 移除
    	System.out.println(jedis.smembers("set"));
    	// 运算
    	jedis.sadd("set", "set");
    	jedis.sadd("sett", "lll", "nnn", "mmm", "ooo", "sett");
    	System.out.println(jedis.sinter("set", "sett"));		// 交集
    	System.out.println(jedis.sdiff("set", "sett"));			// 差集
    	System.out.println(jedis.sunion("set", "sett"));		// 并集
    	
    	System.out.println(jedis.scard("set"));		// 元素数目
    	System.out.println(jedis.scard("sett"));		// 元素数目
    	//清空
    	System.out.println(jedis.flushAll());
    	*/
    	
    	/********5.有序集合sorted set(zset)*************/
    	HashMap<String,Double> scoremap = new HashMap<String, Double>(); 
    	scoremap.put("vvv", 0.22);
    	scoremap.put("ppp", 0.16);
    	scoremap.put("sss", 0.19);
    	scoremap.put("qqq", 0.17);
    	scoremap.put("www", 0.23);
    	scoremap.put("rrr", 0.18);
    	scoremap.put("uuu", 0.21);
    	scoremap.put("ttt", 0.20);
    	// 放值
    	jedis.zadd("zset", scoremap);		// 添加元素或更新分数
    	jedis.zadd("zset", 0.24, "xxx");
    	// 元素查找修改
    	System.out.println(jedis.zrange("zset", 0, 10));		// 根据索引返回区间
    	System.out.println(jedis.zcount("zset", 0.18, 0.21));		// 根据分数返回元素数
    	System.out.println(jedis.zrem("zset", "sss", "ppp"));		// 移除元素
    	System.out.println(jedis.zcard("zset"));					// 元素数目
    	System.out.println(jedis.zincrby("zset", 1, "www"));		// 分数增量
    	System.out.println(jedis.zscore("zset", "www"));		// 返回分数值
    	System.out.println(jedis.zrank("zset", "www"));		// 返回元素索引
    	System.out.println(jedis.zrangeByScore("zset", 0, 2));		// 根据分数返回元素区间
    	//清空
    	System.out.println(jedis.flushAll());
    	
    	
        jedis.close();
    }
}
编辑 (opens new window)
上次更新: 2022/08/11, 00:48:36
集群端口
Zookeeper集群搭建

← 集群端口 Zookeeper集群搭建→

Theme by Vdoing | Copyright © 2022-2025 Ai | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
×