PostgreSQL
 sql >> Cơ Sở Dữ Liệu >  >> RDS >> PostgreSQL

Các khóa chính với Apache Spark

Scala :

Nếu tất cả những gì bạn cần là các số duy nhất, bạn có thể sử dụng zipWithUniqueId và tạo lại DataFrame. Đầu tiên, một số dữ liệu nhập và dữ liệu giả:

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

Trích xuất lược đồ để sử dụng thêm:

val schema = df.schema

Thêm trường id:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

Tạo DataFrame:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

Điều tương tự trong Python :

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

Nếu bạn thích số liên tiếp, bạn có thể thay thế zipWithUniqueId với zipWithIndex nhưng nó đắt hơn một chút.

Trực tiếp với DataFrame API :

(phổ quát Scala, Python, Java, R với khá nhiều cú pháp giống nhau)

Trước đây, tôi đã bỏ lỡ monotonicallyIncreasingId hàm này sẽ hoạt động tốt miễn là bạn không yêu cầu các số liên tiếp:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

Mặc dù hữu ích monotonicallyIncreasingId là không xác định. Không chỉ id có thể khác từ khi thực thi đến khi thực thi mà không có thủ thuật bổ sung nào không thể được sử dụng để xác định các hàng khi các hoạt động tiếp theo chứa bộ lọc.

Lưu ý :

Cũng có thể sử dụng rowNumber chức năng cửa sổ:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

Thật không may:

CẢNH BÁO Cửa sổ:Không có phân vùng nào được xác định cho hoạt động của cửa sổ! Di chuyển tất cả dữ liệu vào một phân vùng duy nhất, điều này có thể làm giảm hiệu suất nghiêm trọng.

Vì vậy, trừ khi bạn có một cách tự nhiên để phân vùng dữ liệu của mình và đảm bảo tính duy nhất không đặc biệt hữu ích vào lúc này.



  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Cách thay đổi người dùng thành Superuser trong PostgreSQL

  2. Yêu cầu Giá trị SSLMode Postgres Không hợp lệ khi Hỗ trợ SSL không được biên dịch bằng Trình gói dữ liệu nước ngoài

  3. Postgresql thay đổi loại cột từ int thành UUID

  4. Cách triển khai các ưu tiên trong SQL (postgres)

  5. Cách triển khai PostgreSQL vào Docker Container bằng ClusterControl