创建个人网站的步骤,ui设计师要学什么,网站快速建站,免费logo在线制作u钙网1. hbase连接
首先用hbase shell 命令来进入到hbase数据库#xff0c;然后用list命令来查看hbase下所有表#xff0c;以其中表“DB_level0”为例#xff0c;可以看到库名“baotouyiqi”是拼接的#xff0c;python代码访问时先连接#xff1a; def hbase_connection(hbase…1. hbase连接
首先用hbase shell 命令来进入到hbase数据库然后用list命令来查看hbase下所有表以其中表“DB_level0”为例可以看到库名“baotouyiqi”是拼接的python代码访问时先连接 def hbase_connection(hbase_master, hbase_port, table_prefixNone):connection happybase.Connection(hosthbase_master, porthbase_port, table_prefixtable_prefix)return connection
connection hbase_connection(hbase_master, hbase_port, table_prefix) # 在连接的时候创建项目空间
table connection.table(tablename) # 获取表连接
备注完整代码在最后想运行的直接滑倒最后复制即可
2. 按条件读取hbase数据
然后按照条件来查询表中想要的数据集这里只列举两个条件时间区间和指定列。同样我们在shell下用scan命令来查看表中的数据结构 可以看到第一列是ROW第二列是COLUMNCELLpython代码取数据方法差不多
date_prex_start bytes(dt_ starttime, encodingutf-8) # row_start
date_prex_end bytes(dt_ endtime, encodingutf-8) # row_stop
# 通过设置row key的前缀row_prefix参数来进行局部扫描
outdata dict(table.scan(row_startdate_prex_start, row_stopdate_prex_end,columns[onecolumn]))得到的结果如下是个字典格式 3. 按格式输出hbase数据结果
我们希望输出的结果是dataframe的而且第一列是time第二列是value所以就做个简单格式处理
timesep list(map(lambda x: x.decode(utf-8).replace(dt_, ), outdata.keys()))
tempdata list(outdata.values())
valuelist list(map(lambda x: float(list(x.values())[0]), tempdata))
if len(timesep) 0:db_data2 pd.DataFrame({时间: timesep, onecolumn: valuelist})db_data2.loc[:, 时间2] [i[:16] for i in db_data2[时间]]db_data2 db_data2.drop_duplicates(subset[时间2], keeplast) # 一分钟内多次数值取一个即可
else:db_data2 pd.DataFrame()
if len(db_data2) 1:return pd.DataFrame()
db_data2.loc[:, 时间戳] [time.mktime(time.strptime(i, %Y-%m-%d %H:%M:%S)) for i in db_data2[时间]]
db_data2 db_data2.sort_values(by[时间戳], ascendingFalse) # 将最新的数值放最前面
db_data3 db_data2.drop(columns[时间2, 时间戳])
db_data3.columns [time, value]4. 完整代码code
import happybase
import time
import pandas as pd
from pathlib import Pathos_file_name Path(__file__).namedef hbase_connection(hbase_master, hbase_port, table_prefixNone):connection happybase.Connection(hosthbase_master, porthbase_port, table_prefixtable_prefix)return connectiondef get_data_by_tum(hbase_master, hbase_port, table_prefix, tablename, columnslist, starttime, endtime):columnsid $.join(columnslist)onecolumn TimeSe:dt_ columnsid # columnconnection hbase_connection(hbase_master, hbase_port, table_prefix) # 在连接的时候创建项目空间table connection.table(tablename) # 获取表连接date_prex_start bytes(dt_ starttime, encodingutf-8) # row_startdate_prex_end bytes(dt_ endtime, encodingutf-8) # row_stop# 通过设置row key的前缀row_prefix参数来进行局部扫描outdata dict(table.scan(row_startdate_prex_start, row_stopdate_prex_end,columns[onecolumn]))timesep list(map(lambda x: x.decode(utf-8).replace(dt_, ), outdata.keys()))tempdata list(outdata.values())valuelist list(map(lambda x: float(list(x.values())[0]), tempdata))if len(timesep) 0:db_data2 pd.DataFrame({时间: timesep, onecolumn: valuelist})db_data2.loc[:, 时间2] [i[:16] for i in db_data2[时间]]db_data2 db_data2.drop_duplicates(subset[时间2], keeplast) # 一分钟内多次数值取一个即可else:db_data2 pd.DataFrame()if len(db_data2) 1:return pd.DataFrame()db_data2.loc[:, 时间戳] [time.mktime(time.strptime(i, %Y-%m-%d %H:%M:%S)) for i in db_data2[时间]]db_data2 db_data2.sort_values(by[时间戳], ascendingFalse) # 将最新的数值放最前面db_data3 db_data2.drop(columns[时间2, 时间戳])db_data3.columns [time, value]return db_data3if __name__ __main__:begin_time 2023-08-22 00:00:00end_time 2023-08-23 00:00:00hbase_master 142.21.8.22hbase_port 9097table_prefix baotouyiqitable_name DB_level0onedata [62340, 20, 204]dataget get_data_by_tum(hbase_master, hbase_port, table_prefix, table_name,onedata, begin_time, end_time)print(dataget)