便利的赣州网站建设,网站设计中超链接怎么做,湖北网站建设推荐,广东建设信息网行业服务版官网在实践中,性能影响几乎与您省略了partitionBy子句相同.所有记录将被洗牌到一个分区,在本地排序并逐个顺序迭代.差异仅在于总共创建的分区数.让我们举例说明使用包含10个分区和1000个记录的简单数据集的示例#xff1a;df spark.range(0, 1000, 1, 10).toDF(index…在实践中,性能影响几乎与您省略了partitionBy子句相同.所有记录将被洗牌到一个分区,在本地排序并逐个顺序迭代.差异仅在于总共创建的分区数.让我们举例说明使用包含10个分区和1000个记录的简单数据集的示例df spark.range(0, 1000, 1, 10).toDF(index).withColumn(col1, f.randn(42))如果您定义没有partition by子句的框架w_unpart Window.orderBy(f.col(index).asc())并使用滞后df_lag_unpart df.withColumn(diffs_col1, f.lag(col1, 1).over(w_unpart) - f.col(col1))总共只有一个分区df_lag_unpart.rdd.glom().map(len).collect()[1000]与具有虚拟索引的帧定义相比(与您的代码相比简化了一点w_part Window.partitionBy(f.lit(0)).orderBy(f.col(index).asc())将使用等于spark.sql.shuffle.partitions的分区数spark.conf.set(spark.sql.shuffle.partitions, 11)df_lag_part df.withColumn(diffs_col1, f.lag(col1, 1).over(w_part) - f.col(col1))df_lag_part.rdd.glom().count()11只有一个非空分区df_lag_part.rdd.glom().filter(lambda x: x).count()1遗憾的是,没有通用的解决方案可以用来解决PySpark中的这个问题.这只是实现的固有机制与分布式处理模型相结合.由于索引列是顺序的,因此您可以生成每个块具有固定数量记录的人工分区键rec_per_block df.count() // int(spark.conf.get(spark.sql.shuffle.partitions))df_with_block df.withColumn(block, (f.col(index) / rec_per_block).cast(int))并用它来定义框架规范w_with_block Window.partitionBy(block).orderBy(index)df_lag_with_block df_with_block.withColumn(diffs_col1, f.lag(col1, 1).over(w_with_block) - f.col(col1))这将使用预期的分区数df_lag_with_block.rdd.glom().count()11大致统一的数据分布(我们无法避免哈希冲突)df_lag_with_block.rdd.glom().map(len).collect()[0, 180, 0, 90, 90, 0, 90, 90, 100, 90, 270]但是在块边界上有许多空白df_lag_with_block.where(f.col(diffs_col1).isNull()).count()12由于边界易于计算from itertools import chainboundary_idxs sorted(chain.from_iterable(# Here we depend on sequential identifiers# This could be generalized to any monotonically increasing# id by taking min and max per block(idx - 1, idx) for idx indf_lag_with_block.groupBy(block).min(index).drop(block).rdd.flatMap(lambda x: x).collect()))[2:] # The first boundary doesnt carry useful inf.你总是可以选择missing df_with_block.where(f.col(index).isin(boundary_idxs))并分别填写# We use window without partitions here. Since number of records# will be small this wont be a performance issue# but will generate Moving all data to a single partition warningmissing_with_lag missing.withColumn(diffs_col1, f.lag(col1, 1).over(w_unpart) - f.col(col1)).select(index, f.col(diffs_col1).alias(diffs_fill))并加入combined (df_lag_with_block.join(missing_with_lag, [index], leftouter).withColumn(diffs_col1, f.coalesce(diffs_col1, diffs_fill)))获得理想的结果mismatched combined.join(df_lag_unpart, [index], outer).where(combined[diffs_col1] ! df_lag_unpart[diffs_col1])assert mismatched.count() 0