目录
正文(1) 得到 spark dataframe 全局排序ID(2)分组后保留最大值行正文
作为一个算法工程师,日常学习和工作中,不光要 训练模型关注效果,更多的 时间 是在 准备样本数据与分析数据等,而这些过程 都与 大数据 spark和hadoop生态的若干工具息息相关。
(相关资料图)
今天我们就不在更新 机器学习和 算法模型相关的内容,分享两个 spark函数吧,以前也在某种场景中使用过但没有保存收藏,哎!! 事前不搜藏,临时抱佛脚的感觉 真是 痛苦,太耽误干活了。
so,把这 两个函数 记在这里 以备不时之需~
(1) 得到 spark dataframe 全局排序ID
这个函数的 应用场景就是:根据某一列的数值对 spark 的 dataframe 进行排序, 得到全局多分区排序的全局有序ID,新增一列保存这个rank id ,并且保留别的列的数据无变化。
有用户会说,这不是很容易吗,直接用 orderBy 不就可以了吗,但是难点是:orderBy完记录下全局ID 并且 保持原来全部列的DF数据。
多说无益,遇到这个场景 直接copy用起来 就知道 有多爽 了,同类问题 我们可以 用下面 这个函数 解决 ~
scala 写的 spark 版本代码:
def dfZipWithIndex( df: DataFrame, offset: Int = 1, colName: String ="rank_id", inFront: Boolean = true ) : DataFrame = { df.sqlContext.createDataFrame( df.rdd.zipWithIndex.map(ln => Row.fromSeq( (if (inFront) Seq(ln._2 + offset) else Seq()) ++ ln._1.toSeq ++ (if (inFront) Seq() else Seq(ln._2 + offset)) ) ), StructType( (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]()) ++ df.schema.fields ++ (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false))) ) ) }
函数调用我们可以用这行代码调用: val ranked_df = dfZipWithIndex(raw_df.orderBy($"predict_score".desc))
, 直接复制过去就可以~
python写的 pyspark 版本代码:
from pyspark.sql.types import LongType, StructField, StructType def dfZipWithIndex (df, offset=1, colName="rank_id"): new_schema = StructType( [StructField(colName,LongType(),True)] # new added field in front + df.schema.fields # previous schema ) zipped_rdd = df.rdd.zipWithIndex() new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row))) return spark.createDataFrame(new_rdd, new_schema)
调用 同理, 这里我就不在进行赘述了。
(2)分组后保留最大值行
这个函数的 应用场景就是: 当我们使用 spark 或则 sparkSQL 查找某个 dataframe 数据的时候,在某一天里,任意一个用户可能有多条记录,我们需要 对每一个用户,保留dataframe 中 某列值最大 的那行数据。
其中的 关键点在于:一次性求出对每个用户分组后,求得每个用户的多行记录中,某个值最大的行进行数据保留。
当然,经过 简单修改代码,不一定是最大,最小也是可以的,平均都ok。
scala 写的 spark 版本代码:
// 得到一天内一个用户多个记录里面时间最大的那行用户的记录 import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions val w = Window.partitionBy("user_id") val result_df = raw_df .withColumn("max_time",functions.max("time").over(w)) .where($"time" === $"max_time") .drop($"max_time")
python写的 pyspark 版本代码:
# pyspark dataframe 某列值最大的元素所在的那一行 # GroupBy 列并过滤 Pyspark 中某列值最大的行 # 创建一个Window 以按A列进行分区,并使用它来计算每个组的最大值。然后过滤出行,使 B 列中的值等于最大值 from pyspark.sql import Window w = Window.partitionBy("user_id") result_df = spark.sql(raw_df).withColumn("max_time", fun.max("time").over(w))\ .where(fun.col("time") == fun.col("time")) .drop("max_time")
我们可以看到: 这个函数的关键就是运用了 spark 的 window 函数,灵活运用 威力无穷 哦 !
到这里,spark利器2函数之dataframe全局排序id与分组后保留最大值行的全文 就写完了 ,更多关于spark dataframe全局排序的资料请关注脚本之家其它相关文章!
X 关闭
X 关闭
- 15G资费不大降!三大运营商谁提供的5G网速最快?中国信通院给出答案
- 2联想拯救者Y70发布最新预告:售价2970元起 迄今最便宜的骁龙8+旗舰
- 3亚马逊开始大规模推广掌纹支付技术 顾客可使用“挥手付”结账
- 4现代和起亚上半年出口20万辆新能源汽车同比增长30.6%
- 5如何让居民5分钟使用到各种设施?沙特“线性城市”来了
- 6AMD实现连续8个季度的增长 季度营收首次突破60亿美元利润更是翻倍
- 7转转集团发布2022年二季度手机行情报告:二手市场“飘香”
- 8充电宝100Wh等于多少毫安?铁路旅客禁止、限制携带和托运物品目录
- 9好消息!京东与腾讯续签三年战略合作协议 加强技术创新与供应链服务
- 10名创优品拟通过香港IPO全球发售4100万股 全球发售所得款项有什么用处?