1 spark关键包
<!--spark-->
<dependency>
<groupId>fakepath</groupId> <artifactId>spark-core</artifactId> <version>2.10-1.5.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming</artifactId> <version>2.10-1.5.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka</artifactId> <version>2.10-1.5.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql</artifactId> <version>2.10-1.5.1</version> </dependency> <dependency> <groupId>backport-util-concurrent.org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.4</version> </dependency> <dependency> <groupId>com.hw</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.objenesis</groupId> <artifactId>kryo</artifactId> <version>2.21</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.5</version> </dependency>
2 分析模型昼伏夜出 spark-java
package com.xinyi.spark.analysis.tasks;
import com.google.common.base.Optional;
import com.xinyi.spark.analysis.utils.dbhelper.DBHelper;import com.xinyi.xframe.base.utils.StringUtils;import com.xinyi.xframe.base.utils.UUIDGenerator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.filter.CompareFilter;import org.apache.hadoop.hbase.filter.Filter;import org.apache.hadoop.hbase.filter.FilterList;import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableInputFormat;import org.apache.hadoop.hbase.protobuf.ProtobufUtil;import org.apache.hadoop.hbase.util.Base64;import org.apache.hadoop.hbase.util.Bytes;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.io.IOException;
import java.text.SimpleDateFormat;import java.util.*;public class RecordInfoSparkAnalsis {
//查询任务列表
private static DBHelper dbHelper = new DBHelper("xinyidb"); private final static String endNum = "9"; public static void main(String[] args) { String sql ="select id,to_char(starttime,'yyyymmddhh24miss') starttime," + "to_char(endtime,'yyyymmddhh24miss') endtime,starthour,endhour," + "to_char(createtime,'yyyymmddhh24miss') createtime from recordinfo_task where status='0'"; List<Map<String,Object>> taskList = dbHelper.query(sql); System.out.println(taskList); if(taskList.isEmpty()){ System.out.println("任务列表为空!"); return; } for(Map<String,Object> task :taskList){ String taskid = String.valueOf(task.get("ID")); updateRecordTask(taskid,"2"); } //初始化Spark环境 SparkConf conf = new SparkConf().setAppName("RecordInfoSparkAnalsis"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.kryo.registrator", "com.xinyi.spark.analysis.utils.MyRegistrator"); conf.set("spark.kryoserializer.buffer.max", "256"); JavaSparkContext jsc = new JavaSparkContext(conf); // 构建spark-Hbase配置 Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration()); //初始化rowkey存储设计的搜索方式 int endInt = Integer.valueOf(endNum);for(Map<String,Object> task :taskList){
Object startObj = task.get("STARTTIME"); Object endObj = task.get("ENDTIME"); if(!StringUtils.isEmpty(startObj)&&!StringUtils.isEmpty(endObj)){ long s = System.currentTimeMillis(); String startTime = String.valueOf(startObj); String endTime = String.valueOf(endObj); String blackStartHour = String.valueOf(task.get("STARTHOUR")); String blackEndHour = String.valueOf(task.get("ENDHOUR")); System.out.println(blackStartHour+"---"+blackEndHour); //全局RDD JavaPairRDD<String, Long> white = null; JavaPairRDD<String, Long> black = null; for (int i = 0; i <= endInt; i++) { //根据时间设置初始和结束rowkey String startkey = String.valueOf(i) + startTime; String endkey = String.valueOf(i) +endTime; System.out.println(startkey); System.out.println(endkey); //查询晚上数据rdd JavaPairRDD<String, Long> reduceRdd2 = getStringLongJavaPairRDD(jsc, hbConf, startkey, endkey,blackStartHour,blackEndHour); if(black==null){ black = reduceRdd2; }else { black = black.union(reduceRdd2); } //查询白天数据rdd JavaPairRDD<String, Long> whiteReduceRdd = getStringLongJavaPairRDD(jsc, hbConf, startkey, endkey,blackEndHour,"235959"); if(white==null){ white = whiteReduceRdd; }else { white = white.union(whiteReduceRdd); } } System.out.println(black.collectAsMap()); black = black.reduceByKey(new Function2<Long, Long, Long>() { public Long call(Long a1, Long a2) throws Exception { return a1 + a2; } }); white = white.reduceByKey(new Function2<Long, Long, Long>() { public Long call(Long a1, Long a2) throws Exception { return a1 + a2; } }); //根据key左连接 JavaPairRDD<String,Tuple2<Long,Optional<Long>>> joinRdd = black.leftOuterJoin(white); joinRdd = joinRdd.filter(new Function<Tuple2<String, Tuple2<Long, Optional<Long>>>, Boolean>() { @Override public Boolean call(Tuple2<String, Tuple2<Long, Optional<Long>>> stringTuple2Tuple2) throws Exception { Long val1 = stringTuple2Tuple2._2._1; Long val2 = 0l; Set valSet = stringTuple2Tuple2._2._2.asSet(); for(Object val:valSet){ val2= Long.valueOf(val.toString()); } //System.out.println(val1+"--"+val2); if(valSet.isEmpty()&&val1>3){ return true; }else if(val2<1&&val1>3){ return true; } return false; } });Map<String,Tuple2<Long,Optional<Long>>> collectMap = joinRdd.collectAsMap();
System.out.println(collectMap);String taskid = String.valueOf(task.get("ID"));
//保存结果到数据库 insert2RecordResult(taskid,collectMap); updateRecordTask(taskid,"1"); long se = System.currentTimeMillis(); System.out.println("共耗时:"+(se-s)); }}
jsc.stop(); }private static void updateRecordTask(String taskid,String status){
String sql = "update recordinfo_task set status='"+status+"' where id='"+taskid+"'"; dbHelper.update(sql); System.out.println("任务表状态已更新!"); } /** * 结果集插入到oracle结果表recordinfo_result * @param taskid * @param results */ private static void insert2RecordResult(String taskid, Map<String, Tuple2<Long, Optional<Long>>> results){ Set<String> keySet = results.keySet(); for(String key :keySet){ Tuple2<Long, Optional<Long>> vals = results.get(key); String id= UUIDGenerator.generateOriginnalUUID(); String sql = "insert into recordinfo_result (id,taskid,tenementid,num) values ('"+id+"','"+taskid+"','"+key+"','"+vals._1+"')"; dbHelper.update(sql); } System.out.println("结果集已插入数据库"); } /** * 把所有相同的key对应的value累加起来并过滤value>某个值的 * @param black * @param val 过滤比较值 * @param compare 比较符 * @return */ private static JavaPairRDD<String, Long> getStringLongJavaPairRDD(JavaPairRDD<String, Long> black,final int val,final String compare) { black = black.reduceByKey(new Function2<Long, Long, Long>() { @Override public Long call(Long a1, Long a2) throws Exception { return a1 + a2; } }); black = black.filter(new Function<Tuple2<String, Long>, Boolean>() { @Override public Boolean call(Tuple2<String, Long> stringLongTuple2) throws Exception { if(">".equals(compare)){ if(stringLongTuple2._2>val){ //System.out.println(stringLongTuple2._1+"---"+stringLongTuple2._2); return true; } }else if("<".equals(compare)){ if(stringLongTuple2._2<val){ //System.out.println(stringLongTuple2._1+"==="+stringLongTuple2._2); return true; } } return false; } }); return black; }/**
* 根据rowkey范围及hourlong范围 查询Hbase 获取JavaPairRDD * @param jsc * @param hbConf * @param startkey * @param endkey * @param startHour * @param endHour * @return */ private static JavaPairRDD<String, Long> getStringLongJavaPairRDD(JavaSparkContext jsc, Configuration hbConf, String startkey, String endkey,String startHour,String endHour) { Scan scan = new Scan(Bytes.toBytes(startkey), Bytes.toBytes(endkey)); // Scan scan = new Scan(); scan.setCacheBlocks(true); scan.setCaching(10000); scan.setStartRow(Bytes.toBytes(startkey)); scan.addFamily(Bytes.toBytes("info"));//colomn family//晚上时间过滤条件
FilterList filterList = new FilterList(); Filter gtfilter = new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes("hourlong"), CompareFilter.CompareOp.GREATER,Bytes.toBytes(startHour)); filterList.addFilter(gtfilter); Filter ltfilter = new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes("hourlong"), CompareFilter.CompareOp.LESS,Bytes.toBytes(endHour)); filterList.addFilter(ltfilter); scan.setFilter(filterList);org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = null;
try { proto = ProtobufUtil.toScan(scan); } catch (IOException e) { e.printStackTrace(); }String scanToString = Base64.encodeBytes(proto.toByteArray());
hbConf.set(TableInputFormat.INPUT_TABLE, "recordinfo");//table name hbConf.set(TableInputFormat.SCAN, scanToString);JavaPairRDD<ImmutableBytesWritable, Result> rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
//过滤算子,过滤集合只保留想要的字段作为key,设value=1 JavaPairRDD<String, Long> rddmap = rdd.mapToPair(new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, Long>() { public Tuple2<String, Long> call(Tuple2<ImmutableBytesWritable, Result> item) throws Exception { Iterator<Cell> it = item._2().listCells().iterator(); String tenementid = ""; while (it.hasNext()) { Cell c = it.next(); String qualifier = Bytes.toString(CellUtil.cloneQualifier(c)); if (qualifier.equals("tenementid")) { tenementid = Bytes.toString(CellUtil.cloneValue(c)).trim(); } } return new Tuple2<String, Long>(tenementid, 1L); } }); //根据key值累加value return rddmap; }}