Storm-kafka项目
2021-01-17 本文已影响0人
小左伯爵
1.项目架构
未命名文件.jpg- 1.生产数据发送到kafka,数据格式为:
100 29448-000005 2021-01-17 21:05:21 2
序号 小区编号 时间 掉话代码(0正常,1掉话,2断话) - 2.storm作为消费者从kafka拉取数据作为spout的数据源
- 3.经过storm处理后存放到hbase中
2.项目文件结构
2021-01-17_212322.jpg2.1pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itbin</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!-- 跳过测试 -->
<skipTests>true</skipTests>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
</project>
hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://mycluster/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>node02,node03,node04</value>
</property>
</configuration>
hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2,nn3</value>
</property>
#rpc通讯
<property>
<name>dfs.namenode.rpc-address.mycluster.nn1</name>
<value>node01:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn2</name>
<value>node02:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn3</name>
<value>node03:8020</value>
</property>
#http通讯
<property>
<name>dfs.namenode.http-address.mycluster.nn1</name>
<value>node01:9870</value>
</property>
<property>
<name>dfs.namenode.http-address.mycluster.nn2</name>
<value>node02:9870</value>
</property>
<property>
<name>dfs.namenode.http-address.mycluster.nn3</name>
<value>node03:9870</value>
</property>
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://node01:8485;node02:8485;node03:8485/mycluster</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.mycluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/opt/hadoop/full/data</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>
3.生产数据并发送到kafka
3.1启动kafka集群
#1.首先启动zookeeper集群
[root@node02 ~]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/apache-zookeeper-3.6.2/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@node02 ~]# jps
15724 Jps
15677 QuorumPeerMain
[root@node03 ~]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/apache-zookeeper-3.6.2/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@node03 ~]# jps
11398 Jps
11351 QuorumPeerMain
[root@node04 ~]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/apache-zookeeper-3.6.2/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@node04 ~]# jps
6138 QuorumPeerMain
6191 Jps
#2.然后启动kafka集群
[root@node02 ~]# kafka-server-start.sh /opt/kafka/kafka_2.13-2.6.0/config/server.properties
[root@node03 ~]# kafka-server-start.sh /opt/kafka/kafka_2.13-2.6.0/config/server.properties
[root@node04 ~]# kafka-server-start.sh /opt/kafka/kafka_2.13-2.6.0/config/server.properties
[root@node02 ~]# jps
16164 Jps
15739 Kafka
15677 QuorumPeerMain
[root@node03 ~]# jps
11414 Kafka
11351 QuorumPeerMain
11835 Jps
[root@node04 ~]# jps
6629 Jps
6138 QuorumPeerMain
6207 Kafka
3.2生产数据的代码
package cn.itbin.kafka;
import cn.itbin.constants.Constant;
import cn.itbin.tools.DateFmt;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import java.util.Properties;
import java.util.Random;
/**
* @author chenxiaogao
* @className CellProducer
* @description TODO
* @date 2021/1/15
**/
public class CellProducer extends Thread {
private final Producer<String, String> producer;
private final String topic;
private final Properties props = new Properties();
public CellProducer(String topic) {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.BROKER_LIST);
producer = new KafkaProducer<>(props);
this.topic = topic;
}
@Override
public void run() {
Random random = new Random();
String[] cell_num = { "29448-37062", "29448-51331", "29448-51331", "29448-51333", "29448-51343" };
// 正常0; 掉话1(信号断断续续); 断话2(完全断开)
String[] drop_num = { "0", "1", "2" };
int i = 0;
while (true) {
i++;
String testStr = String.format("%06d", random.nextInt(10) + 1);
// messageStr: 2494 29448-000003 2016-01-05 10:25:17 1
String messageStr = i + "\t" + ("29448-" + testStr) + "\t" + DateFmt.getCountDate(null, DateFmt.date_long)
+ "\t" + drop_num[random.nextInt(drop_num.length)];
System.out.println("product:" + messageStr);
producer.send(new ProducerRecord<String, String>(topic, messageStr));
producer.flush();
Utils.sleep(5000);
}
}
public static void main(String[] args) {
CellProducer test = new CellProducer(Constant.TOPIC_NAME);
test.start();
}
}
3.2.1Constant
package cn.itbin.constants;
/**
* @author chenxiaogao
* @className Constant
* @description TODO
* @date 2021/1/17
**/
public class Constant {
public static final String BROKER_LIST = "node02:9092,node03:9092,node04:9092";
public static final String TOPIC_NAME = "test";
}
3.3测试生产数据是否正确
#启动一个kafka消费者,监听test主题
[root@node02 ~]# kafka-console-consumer.sh --bootstrap-server node02:9092,node03:9092,node04:9092 --from-beginning --topic test
#2.运行3.2CellProducer的代码
#3.kafka消费端显示,表示正常
1 29448-000010 2021-01-17 21:45:48 2
2 29448-000008 2021-01-17 21:45:54 0
3 29448-000008 2021-01-17 21:45:59 1
4 29448-000010 2021-01-17 21:46:04 2
5 29448-000009 2021-01-17 21:46:09 0
6 29448-000007 2021-01-17 21:46:14 1
7 29448-000003 2021-01-17 21:46:19 1
8 29448-000001 2021-01-17 21:46:24 2
4.storm从kafka拉取数据
SkcTopo
package cn.itbin.skc;
import cn.itbin.blot.CellDaoltBolt;
import cn.itbin.blot.CellFilterBolt;
import cn.itbin.constants.Constant;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
/**
* @author chenxiaogao
* @className SkcTopo
* @description TODO
* @date 2021/1/16
**/
public class SkcTopo {
public static void main(String[] args) throws Exception {
final TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(Constant.BROKER_LIST, Constant.TOPIC_NAME)), 3);
builder.setBolt("cell_filter", new CellFilterBolt(),3).shuffleGrouping("kafka_spout");
builder.setBolt("cell_dao", new CellDaoltBolt(),5).fieldsGrouping("cell_filter", new Fields("cell_num"));
// 如果外部传参 cluster 则代表线上环境启动,否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterReadingFromKafkaApp", new Config(), builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalReadingFromKafkaApp",
new Config(), builder.createTopology());
}
}
private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic) {
return KafkaSpoutConfig.builder(bootstrapServers, topic)
// 除了分组 ID,以下配置都是可选的。分组 ID 必须指定,否则会抛出 InvalidGroupIdException 异常
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group")
// 定时提交偏移量的时间间隔,默认是 15s
.setOffsetCommitPeriodMs(10_000)
.build();
}
}
CellFilterBolt
package cn.itbin.blot;
import cn.itbin.tools.DateFmt;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class CellFilterBolt implements IBasicBolt {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
List<Object> values = input.getValues();
String value = null;
if (values.size() > 4) {
value = (String) values.get(4);
}
try {
if (value != null) {
String arr[] = value.split("\\t");
System.out.println("arr = " + arr.length);
String s = Arrays.toString(arr);
System.out.println("s = " + s);
// messageStr格式:消息编号\t小区编号\t时间\t状态
// 例: 2494 29448-000003 2016-01-05 10:25:17 1
// DateFmt.date_short是yyyy-MM-dd,把2016-01-05 10:25:17格式化2016-01-05
// 发出的数据格式: 时间, 小区编号, 掉话状态
collector.emit(new Values(DateFmt.getCountDate(arr[2], DateFmt.date_short), arr[1], arr[3]));
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("date", "cell_num", "drop_num"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public void prepare(Map map, TopologyContext arg1) {
// TODO Auto-generated method stub
}
}
CellDaoltBolt
package cn.itbin.blot;
import cn.itbin.hbase.HBaseDAO;
import cn.itbin.hbase.impl.HBaseDAOImpl;
import cn.itbin.tools.DateFmt;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
public class CellDaoltBolt implements IBasicBolt {
private static final long serialVersionUID = 1L;
HBaseDAO dao = null;
long beginTime = System.currentTimeMillis();
long endTime = 0;
// 通话总数
Map<String, Long> cellCountMap = new HashMap<String, Long>();
// 掉话数
Map<String, Long> cellDropCountMap = new HashMap<String, Long>();
String todayStr = null;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// input为2016-01-05,29448-000003,1
if (input != null) {
String dateStr = input.getString(0);
String cellNum = input.getString(1);
String dropNum = input.getString(2);
// 判断是否是当天,不是当天 就清除map 避免内存过大
// 基站数目 大概5-10万(北京市)
// http://bbs.c114.net/thread-793707-1-1.html
todayStr = DateFmt.getCountDate(null, DateFmt.date_short);
// 跨天的处理,大于当天的数据来了,就清空两个map
// 思考: 如果程序崩溃了,map清零了,如果不出问题,一直做同一个cellid的累加
// 这个逻辑不好,应该换成一个线程定期的清除map数据,而不是这里判断
if (todayStr != dateStr && todayStr.compareTo(dateStr) < 0) {
cellCountMap.clear();
cellDropCountMap.clear();
}
// 当前cellid的通话数统计
Long cellAll = cellCountMap.get(cellNum);
if (cellAll == null) {
cellAll = 0L;
}
cellCountMap.put(cellNum, ++cellAll);
// 掉话数统计,大于0就是掉话
Long cellDropAll = cellDropCountMap.get(cellNum);
int t = Integer.parseInt(dropNum);
if (t > 0) {
if (cellDropAll == null) {
cellDropAll = 0L;
}
cellDropCountMap.put(cellNum, ++cellDropAll);
}
// 1.定时写库.为了防止写库过于频繁 这里间隔一段时间写一次
// 2.也可以检测map里面数据size 写数据到 hbase
// 3.自己可以设计一些思路 ,当然 采用redis 也不错
// 4.采用tick定时存储也是一个思路
endTime = System.currentTimeMillis();
// flume+kafka 集成
// 当前掉话数
// 1.每小时掉话数目
// 2.每小时 通话数据
// 3.每小时 掉话率
// 4.昨天的历史轨迹
// 5.同比去年今天的轨迹(如果有数据)
// hbase 按列存储的数据()
// 10万
// rowkey cellnum+ day
if (endTime - beginTime >= 5000) {
// 5s 写一次库
if (cellCountMap.size() > 0 && cellDropCountMap.size() > 0) {
// x轴,相对于小时的偏移量,格式为 时:分,数值 数值是时间的偏移
String arr[] = this.getAxsi();
// 当前日期
String today = DateFmt.getCountDate(null, DateFmt.date_short);
// 当前分钟
String today_minute = DateFmt.getCountDate(null, DateFmt.date_minute);
// cellCountMap为通话数据的map
Set<String> keys = cellCountMap.keySet();
for (Iterator iterator = keys.iterator(); iterator.hasNext();) {
String key_cellnum = (String) iterator.next();
System.out.println("key_cellnum: " + key_cellnum + "***"
+ arr[0] + "---"
+ arr[1] + "---"
+ cellCountMap.get(key_cellnum) + "----"
+ cellDropCountMap.get(key_cellnum));
//写入HBase数据,样例: {time_title:"10:45",xAxis:10.759722222222223,call_num:140,call_drop_num:91}
dao.insert("cell_monitor_table",
key_cellnum + "_" + today,
"cf",
new String[] { today_minute },
new String[] { "{" + "time_title:\"" + arr[0] + "\",xAxis:" + arr[1] + ",call_num:"
+ cellCountMap.get(key_cellnum) + ",call_drop_num:" + cellDropCountMap.get(key_cellnum) + "}" }
);
}
}
// 需要重置初始时间
beginTime = System.currentTimeMillis();
}
}
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
// TODO Auto-generated method stub
dao = new HBaseDAOImpl();
Calendar calendar = Calendar.getInstance();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
// 获取X坐标,就是当前时间的坐标,小时是单位
public String[] getAxsi() {
// 取当前时间
Calendar c = Calendar.getInstance();
int hour = c.get(Calendar.HOUR_OF_DAY);
int minute = c.get(Calendar.MINUTE);
int sec = c.get(Calendar.SECOND);
// 总秒数
int curSecNum = hour * 3600 + minute * 60 + sec;
// (12*3600+30*60+0)/3600=12.5
Double xValue = (double) curSecNum / 3600;
// 时:分,数值 数值是时间的偏移
String[] end = { hour + ":" + minute, xValue.toString() };
return end;
}
@Override
public void cleanup() {
}
}
5.保存数据到hbase
#1.启动hadoop集群
[root@node01 ~]# start-all.sh
WARNING: HADOOP_SECURE_DN_USER has been replaced by HDFS_DATANODE_SECURE_USER. Using value of HADOOP_SECURE_DN_USER.
Starting namenodes on [node01 node02 node03]
Starting datanodes
Starting journal nodes [node01 node02 node03]
Starting ZK Failover Controllers on NN hosts [node01 node02 node03]
Starting resourcemanagers on [ node01 node02 node03]
Starting nodemanagers
[root@node01 ~]# jps
26368 NameNode
26854 DFSZKFailoverController
27639 Jps
27272 ResourceManager
26652 JournalNode
#2.启动hbase集群
[root@node01 ~]# start-hbase.sh
node04: running regionserver, logging to /opt/hbase/hbase-2.3.3/bin/../logs/hbase-root-regionserver-node04.out
node03: running regionserver, logging to /opt/hbase/hbase-2.3.3/bin/../logs/hbase-root-regionserver-node03.out
node02: running regionserver, logging to /opt/hbase/hbase-2.3.3/bin/../logs/hbase-root-regionserver-node02.out
node02: running master, logging to /opt/hbase/hbase-2.3.3/bin/../logs/hbase-root-master-node02.out
[root@node01 ~]# hbase shell
Version 2.3.3, r3e4bf4bee3a08b25591b9c22fea0518686a7e834, Wed Oct 28 06:36:25 UTC 2020
Took 0.0005 seconds
hbase(main):001:0> status
1 active master, 1 backup masters, 1 servers, 0 dead, 0.0000 average load
Took 1.3308 seconds
正常启动
package cn.itbin.hbase;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import java.util.List;
public interface HBaseDAO {
public void save(Put put, String tableName);
public void insert(String tableName, String rowKey, String family, String quailifer, String value);
public void insert(String tableName, String rowKey, String family, String quailifer[], String value[]);
public void save(List<Put> Put, String tableName);
public Result getOneRow(String tableName, String rowKey);
public List<Result> getRows(String tableName, String rowKey_like);
public List<Result> getRows(String tableName, String rowKeyLike, String cols[]);
public List<Result> getRows(String tableName, String startRow, String stopRow);
public void deleteRecords(String tableName, String rowKeyLike);
public void deleteTable(String tableName);
public void createTable(String tableName, String columnFamilies);
}
package cn.itbin.hbase.impl;
import cn.itbin.hbase.HBaseDAO;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.List;
/**
* @author chenxiaogao
* @className HBaseDAOImpl
* @description TODO
* @date 2021/1/17
**/
public class HBaseDAOImpl implements HBaseDAO {
private static final String TABLE_NAME = "PHONE";
private static final String CF_DEFAULT = "DEFAULT_COLUMN_PHONE";
Admin admin = null;
Connection connection = null;
public HBaseDAOImpl() {
try {
Configuration conf = HBaseConfiguration.create();
conf.addResource(new Path("D:\\ideawork\\storm-kafka\\src\\main\\resources\\hbase-site.xml"));
conf.addResource(new Path("D:\\ideawork\\storm-kafka\\src\\main\\resources\\hbase-site.xml"));
connection = ConnectionFactory.createConnection(conf);
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void save(Put put, String tableName) {
}
@Override
public void insert(String tableName, String rowKey, String family, String quailifer, String value) {
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(rowKey.getBytes());
put.addColumn(family.getBytes(), quailifer.getBytes(), value.getBytes());
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void insert(String tableName, String rowKey, String family, String[] quailifer, String[] value) {
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(rowKey.getBytes());
// 批量添加
for (int i = 0; i < quailifer.length; i++) {
String col = quailifer[i];
String val = value[i];
put.addColumn(family.getBytes(), col.getBytes(), val.getBytes());
}
table.put(put);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void save(List<Put> Put, String tableName) {
}
@Override
public Result getOneRow(String tableName, String rowKey) {
return null;
}
@Override
public List<Result> getRows(String tableName, String rowKey_like) {
return null;
}
@Override
public List<Result> getRows(String tableName, String rowKeyLike, String[] cols) {
return null;
}
@Override
public List<Result> getRows(String tableName, String startRow, String stopRow) {
return null;
}
@Override
public void deleteRecords(String tableName, String rowKeyLike) {
}
@Override
public void deleteTable(String tableName) {
}
@Override
public void createTable(String tableName, String columnFamilies) {
TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(columnFamilies))
.build();
try {
admin.createTable(td);
} catch (IOException e) {
e.printStackTrace();
}
System.out.print("Creating table. ");
}
}
package cn.itbin.tools;
import cn.itbin.hbase.impl.HBaseDAOImpl;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
public class DateFmt {
public static final String date_long = "yyyy-MM-dd HH:mm:ss";
public static final String date_short = "yyyy-MM-dd";
public static final String date_minute = "yyyyMMddHHmm";
public static SimpleDateFormat sdf = new SimpleDateFormat(date_short);
public static String getCountDate(String date, String pattern) {
SimpleDateFormat sdf = new SimpleDateFormat(pattern);
Calendar cal = Calendar.getInstance();
if (date != null) {
try {
// 2016-01-05 10:25:17
cal.setTime(sdf.parse(date));
} catch (ParseException e) {
e.printStackTrace();
}
}
return sdf.format(cal.getTime());
}
public static String getCountDate(String date, String pattern, int step) {
SimpleDateFormat sdf = new SimpleDateFormat(pattern);
Calendar cal = Calendar.getInstance();
if (date != null) {
try {
cal.setTime(sdf.parse(date));
} catch (ParseException e) {
e.printStackTrace();
}
}
cal.add(Calendar.DAY_OF_MONTH, step);
return sdf.format(cal.getTime());
}
public static Date parseDate(String dateStr) throws Exception {
return sdf.parse(dateStr);
}
public static void main(String[] args) throws Exception {
}
}
6.测试
#创建hbase 表
hbase(main):002:0> create 'cell_monitor_table','cf'
#启动程序查看数据
hbase(main):004:0> scan 'cell_monitor_table'
ROW COLUMN+CELL
01 column=cf:cf01, timestamp=2021-01-18T00:34:46.235, value=test_value
29448-000001_2021-01-17 column=cf:202101171737, timestamp=2021-01-18T01:37:35.925, value={time_title:"17:37",xAxis:17.6
2638888888889,call_num:4,call_drop_num:4}