Scala :
Nếu tất cả những gì bạn cần là các số duy nhất, bạn có thể sử dụng zipWithUniqueId
và tạo lại DataFrame. Đầu tiên, một số dữ liệu nhập và dữ liệu giả:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
Trích xuất lược đồ để sử dụng thêm:
val schema = df.schema
Thêm trường id:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
Tạo DataFrame:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
Điều tương tự trong Python :
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
Nếu bạn thích số liên tiếp, bạn có thể thay thế zipWithUniqueId
với zipWithIndex
nhưng nó đắt hơn một chút.
Trực tiếp với DataFrame
API :
(phổ quát Scala, Python, Java, R với khá nhiều cú pháp giống nhau)
Trước đây, tôi đã bỏ lỡ monotonicallyIncreasingId
hàm này sẽ hoạt động tốt miễn là bạn không yêu cầu các số liên tiếp:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
Mặc dù hữu ích monotonicallyIncreasingId
là không xác định. Không chỉ id có thể khác từ khi thực thi đến khi thực thi mà không có thủ thuật bổ sung nào không thể được sử dụng để xác định các hàng khi các hoạt động tiếp theo chứa bộ lọc.
Lưu ý :
Cũng có thể sử dụng rowNumber
chức năng cửa sổ:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
Thật không may:
CẢNH BÁO Cửa sổ:Không có phân vùng nào được xác định cho hoạt động của cửa sổ! Di chuyển tất cả dữ liệu vào một phân vùng duy nhất, điều này có thể làm giảm hiệu suất nghiêm trọng.
Vì vậy, trừ khi bạn có một cách tự nhiên để phân vùng dữ liệu của mình và đảm bảo tính duy nhất không đặc biệt hữu ích vào lúc này.