Python语言学习之使用Python实现HIVE的UDF函数
小职 2021-04-14 来源 :数师兄 阅读 521 评论 0

摘要:本文主要介绍了Python语言学习之使用Python实现HIVE的UDF函数,通过具体的内容向大家展现,希望对大家Python开发的学习有所帮助。

 本文主要介绍了Python语言学习之使用Python实现HIVE的UDF函数,通过具体的内容向大家展现,希望对大家Python开发的学习有所帮助。

Python语言学习之使用Python实现HIVE的UDF函数

在处理一些复杂逻辑时候,python这种面向过程的语言相比于SQL更符合人的思维方式。相信有不少同学曾经感慨,如果能用python处理数据库中的数据就好了。那么今天它来了。

 

首先用python写处理复杂逻辑的自定义的函数(一阳指),再将函数代码嵌入SQL(狮吼功)就能合并成了一整招:UDF

 

下面我用一个栗子来说明一些两者处理数据过程中的差异,在介绍栗子之前,先介绍一些with as。与python 创建函数或者类一样,with as 用于创建中间表

 

简单来做个介绍

 

select

*

from(select * from table where dt='2021-03-30')a

可以写成

 

with a as (select * from table where dt='2021-03-30' )

select * from a

简单的SQL看不出这样的优势(甚至有点多此一举),但是当逻辑复杂了之后我们就能看出这种语法的优势,他能从底层抽取中间表格,让我们只专注于当前使用的表格,进而可以将复杂的处理逻辑分解成简单的步骤。

 

如下面地表格记录了用户适用app过程中每个行为日志地时间戳,我们想统计一下用户今天用了几次app,以及每次的起始时间和结束时间是什么时候,这个问题怎么解呢?

 Python语言学习之使用Python实现HIVE的UDF函数

 

 

SQL实现方式

首先用with as 构建一个中间表(注意看on 和 where条件)

 

with t1 as

(select

x.uid,

case when x.rank=1 then y.timestamp_ms

else x.timestamp_ms

end as start_time,

case when x.rank=1 then x.timestamp_ms

else y.timestamp_ms end as end_time

from

(select

uid,

timestamp_ms,

row_number()over(partition by uid order by timestamp_ms) rank

from tmp.tmpx) x

left outer join

(select

uid,

timestamp_ms,

row_number()over(partition by uid order by timestamp_ms) rank

from tmp.tmpx) y

on x.uid=y.uid and x.rank=y.rank-1

where x.rank=1 or y.rank is null or y.timestamp_ms-x.timestamp_ms>=300)

首先我们用开窗函数错位相减,用where条件筛选出我们需要的列,其中

 

x.rank=1 抽取出第一行

 

y.rank is null 抽取最后一样

 

y.timestamp_ms-x.timestamp_ms>=300抽取满足条件的行,如下:

 Python语言学习之使用Python实现HIVE的UDF函数

 

 

当然这个结果并不是我们要的结果,需要将上述表格中某一行数据的end-time和下一条数据的start-time结合起来起来,构造出时间段

 Python语言学习之使用Python实现HIVE的UDF函数

 

 

好的,按照上面我们所说的那么下面我们不用关心底层的逻辑,将注意力专注于这张中间表t1

 

select

a.uid,end_time as start_time,start_time as end_time

from

(select uid,start_time,row_number()over(partition by uid order by start_time) as rank from t1) a

join

(select uid,end_time,row_number()over(partition by uid order by end_time) as rank from t1)b

on

a.uid=b.uid and a.rank=b.rank+1

同样,排序后错位相减,然后就可以打完收工了~

 

UDF实现方式

首先我们假设上述数据存储在csv中,

 

用python 处理本地文件data.csv,按照python的处理方式写代码(这里就不一句句解释了,会python的同学可以跳过,不会的同学不妨自己动手写一下)

 

def life_cut(files):

f=open(files)

act_list=[]

act_dict={}

for line in f:

    line_list=line.strip().split()

    key=tuple(line_list[0:1])

    if key not in act_dict:

        act_dict.setdefault(key,[])

        act_dict[key].append(line_list[1])

    else:

        act_dict[key].append(line_list[1])

 

for k,v in act_dict.items():

    k_str=k[0]+"\t"

    start_time = v[0]

    last_time=v[0]

    i=1

    while i<len(v)-1:

        if int(v[i])-int(last_time)>=300:

            print(k_str+"\t"+start_time+"\t"+v[i-1])

            start_time=v[i]

            last_time = v[i]

            i=i+1

        else:

            last_time = v[i]

            i=i+1

    print(k_str+"\t"+start_time+"\t"+v[len(v)-1])

    # print(k_str + "\t" + start_time + "\t" + v[i])

if __name__=="__main__":

life_cut("data.csv")

得到结果如下:

 Python语言学习之使用Python实现HIVE的UDF函数

 

那么下面我们将上述函数写成udf的形式:

 

#!/usr/bin/env python

# -*- encoding:utf-8 -*-

import sys

act_list=[]

act_dict={}

for line in sys.stdin:

line_list=line.strip().split("\t")

key=tuple(line_list[0:1])

if key not in act_dict:

    act_dict.setdefault(key,[])

    act_dict[key].append(line_list[1])

else:

    act_dict[key].append(line_list[1])

 

for k,v in act_dict.items():

k_str=k[0]+"\t"

start_time = v[0]

last_time=v[0]

i=1

while i<len(v)-1:

    if int(v[i])-int(last_time)>=300:

      print(k_str+"\t"+start_time+"\t"+v[i-1])

      start_time=v[i]

      last_time = v[i]

      i=i+1

    else:

      last_time = v[i]

      i=i+1

print(k_str+"\t"+start_time+"\t"+v[len(v)-1])

这个变化过程的关键点是将 for line in f 替换成 for line in sys.stdin,其他基本上没什么变化

 

然后我们再来引用这个函数

 

先add这个函数的路径add file /xxx/life_cut.py 加载udf路径,然后再使用

 

select

TRANSFORM (uid,timestamp_ms) USING "python life_cut.py" as (uid,start_time,end_time)

from tmp.tmpx

总结

从上述案例我们可以看出,

 

UDF和SQL的区别在于,在处理复杂逻辑时候,UDF相比SQL能更高效地组织起来逻辑并落地实现功能。UDF和普通脚本的关键区别所在在于将 for line in f 替换成 for line in sys.stdin,常规函数一般是将文件一行行读入,UDF是从标准输入一行行加载数据。希望大家平时没事的时候好好练练python,切莫书到用时方恨少。 


我是小职,记得找我

✅ 解锁高薪工作

✅ 免费获取基础课程·答疑解惑·职业测评

Python语言学习之使用Python实现HIVE的UDF函数

本文由 @小职 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程