HBase
 sql >> Cơ Sở Dữ Liệu >  >> NoSQL >> HBase

Spark-on-HBase:Đầu nối HBase dựa trên DataFrame

Bài đăng trên blog này đã được xuất bản trên Hortonworks.com trước khi hợp nhất với Cloudera. Một số liên kết, tài nguyên hoặc tham chiếu có thể không còn chính xác.

Chúng tôi tự hào thông báo bản xem trước kỹ thuật của Trình kết nối Spark-HBase, do Hortonworks hợp tác với Bloomberg phát triển.

Trình kết nối Spark-HBase tận dụng API nguồn dữ liệu (SPARK-3247) được giới thiệu trong Spark-1.2.0. Nó thu hẹp khoảng cách giữa kho lưu trữ Giá trị khóa HBase đơn giản và các truy vấn SQL quan hệ phức tạp và cho phép người dùng thực hiện phân tích dữ liệu phức tạp trên HBase bằng Spark. HBase DataFrame là một Spark DataFrame tiêu chuẩn và có thể tương tác với bất kỳ nguồn dữ liệu nào khác như Hive, ORC, Parquet, JSON, v.v.

Bối cảnh

Có một số đầu nối Spark HBase nguồn mở có sẵn dưới dạng gói Spark, dưới dạng các dự án độc lập hoặc trong thân HBase.

Spark đã chuyển sang API Dataset / DataFrame, cung cấp khả năng tối ưu hóa kế hoạch truy vấn được tích hợp sẵn. Giờ đây, người dùng cuối thích sử dụng giao diện dựa trên DataFrames / Datasets.

Đầu nối HBase trong thân HBase có hỗ trợ phong phú ở mức RDD, ví dụ:BulkPut, v.v., nhưng hỗ trợ DataFrame của nó không phong phú. Đầu nối trung kế HBase dựa trên HadoopRDD tiêu chuẩn với TableInputFormat tích hợp HBase có một số hạn chế về hiệu suất. Ngoài ra, BulkGet được thực hiện trong trình điều khiển có thể là một điểm lỗi duy nhất.

Có một số triển khai thay thế khác. Lấy Spark-SQL-on-HBase như một ví dụ. Nó áp dụng các kỹ thuật tối ưu hóa tùy chỉnh rất tiên tiến bằng cách nhúng kế hoạch tối ưu hóa truy vấn của riêng mình vào bên trong động cơ Spark Catalyst tiêu chuẩn, gửi RDD tới HBase và thực hiện các tác vụ phức tạp, chẳng hạn như tổng hợp một phần, bên trong bộ đồng xử lý HBase. Cách tiếp cận này có thể đạt được hiệu suất cao, nhưng khó duy trì do tính phức tạp và sự phát triển nhanh chóng của Spark. Ngoài ra, việc cho phép mã tùy ý chạy bên trong bộ đồng xử lý có thể gây ra rủi ro bảo mật.

Spark-on-HBase Connector (SHC) đã được phát triển để khắc phục những điểm yếu và tắc nghẽn tiềm ẩn này. Nó triển khai API Nguồn dữ liệu Spark tiêu chuẩn và thúc đẩy công cụ Spark Catalyst để tối ưu hóa truy vấn. Song song đó, RDD được xây dựng từ đầu thay vì sử dụng TableInputFormat để đạt được hiệu suất cao. Với RDD tùy chỉnh này, tất cả các kỹ thuật quan trọng có thể được áp dụng và thực hiện đầy đủ, chẳng hạn như cắt bỏ phân vùng, cắt bỏ cột, đẩy xuống vị từ và định vị dữ liệu. Thiết kế giúp việc bảo trì trở nên rất dễ dàng, đồng thời đạt được sự cân bằng tốt giữa hiệu suất và sự đơn giản.

Kiến trúc

Chúng tôi giả định Spark và HBase được triển khai trong cùng một cụm và những người thực thi Spark được đặt cùng vị trí với các máy chủ khu vực, như được minh họa trong hình bên dưới.

Hình 1. Kiến trúc trình kết nối Spark-on-HBase

Ở cấp độ cao, trình kết nối xử lý cả Quét và Lấy theo cách tương tự và cả hai hành động đều được thực hiện trong trình thực thi. Trình điều khiển xử lý truy vấn, tổng hợp các lần quét / lấy dựa trên siêu dữ liệu của khu vực và tạo nhiệm vụ cho mỗi khu vực. Các tác vụ được gửi đến những người thực thi ưu tiên cùng đặt với máy chủ khu vực và được thực hiện song song trong những người thực thi để đạt được tính đồng thời và cục bộ dữ liệu tốt hơn. Nếu một vùng không chứa dữ liệu cần thiết, máy chủ vùng đó sẽ không được giao bất kỳ nhiệm vụ nào. Một tác vụ có thể bao gồm nhiều Quét và Tổng hợp, và các yêu cầu dữ liệu của một tác vụ chỉ được truy xuất từ ​​một máy chủ vùng và máy chủ vùng này cũng sẽ là tùy chọn cục bộ cho tác vụ. Lưu ý rằng trình điều khiển không tham gia vào việc thực thi công việc thực ngoại trừ các tác vụ lên lịch. Điều này tránh cho trình điều khiển trở thành nút cổ chai.

Danh mục bảng

Để đưa bảng HBase dưới dạng bảng quan hệ vào Spark, chúng tôi xác định ánh xạ giữa bảng HBase và bảng Spark, được gọi là Danh mục bảng. Có hai phần quan trọng của danh mục này. Một là định nghĩa khóa hàng và một là ánh xạ giữa cột bảng trong Spark và họ cột và bộ định nghĩa cột trong HBase. Vui lòng tham khảo phần Sử dụng để biết thêm chi tiết.

Hỗ trợ Avro gốc

Trình kết nối hỗ trợ định dạng Avro nguyên bản, vì nó là một thực tế rất phổ biến để duy trì dữ liệu có cấu trúc thành HBase dưới dạng một mảng byte. Người dùng có thể lưu trực tiếp bản ghi Avro vào HBase. Bên trong, lược đồ Avro được tự động chuyển đổi thành kiểu dữ liệu Spark Catalyst. Lưu ý rằng cả hai phần khóa-giá trị trong bảng HBase đều có thể được xác định ở định dạng Avro. Vui lòng tham khảo các ví dụ / trường hợp thử nghiệm trong repo để biết cách sử dụng chính xác.

Đẩy xuống vị ngữ

Trình kết nối chỉ truy xuất các cột được yêu cầu từ máy chủ khu vực để giảm chi phí mạng và tránh xử lý dư thừa trong công cụ Spark Catalyst. Các bộ lọc HBase tiêu chuẩn hiện tại được sử dụng để thực hiện đẩy xuống vị từ mà không cần tận dụng khả năng của bộ đồng xử lý. Bởi vì HBase không biết về kiểu dữ liệu ngoại trừ mảng byte và sự mâu thuẫn thứ tự giữa các kiểu nguyên thủy Java và mảng byte, chúng tôi phải xử lý trước điều kiện bộ lọc trước khi đặt bộ lọc trong thao tác Quét để tránh mất mát dữ liệu. Bên trong máy chủ khu vực, các bản ghi không phù hợp với điều kiện truy vấn sẽ được lọc ra.

Tỉa phân vùng

Bằng cách trích xuất khóa hàng từ các vị từ, chúng tôi chia Scan / BulkGet thành nhiều phạm vi không chồng chéo, chỉ các máy chủ khu vực có dữ liệu được yêu cầu mới thực hiện Scan / BulkGet. Hiện tại, việc cắt tỉa phân vùng được thực hiện trên chiều đầu tiên của các phím hàng. Ví dụ:nếu khóa hàng là “key1:key2:key3”, việc lược bớt phân vùng sẽ chỉ dựa trên “key1”. Lưu ý rằng các điều kiện WHERE cần được xác định cẩn thận. Nếu không, việc cắt tỉa phân vùng có thể không có hiệu lực. Ví dụ:WHERE rowkey1> “abc” OR column =“xyz” (trong đó rowkey1 là thứ nguyên đầu tiên của rowkey và cột là cột hbase thông thường) sẽ dẫn đến việc quét toàn bộ, vì chúng tôi phải bao gồm tất cả các phạm vi vì trong số HOẶC logic.

Vị trí dữ liệu

Khi trình thực thi Spark được đặt cùng vị trí với máy chủ khu vực HBase, vị trí dữ liệu đạt được bằng cách xác định vị trí máy chủ khu vực và cố gắng tốt nhất để đồng định vị tác vụ với máy chủ khu vực. Mỗi người thực thi thực hiện Scan / BulkGet trên một phần của dữ liệu nằm trên cùng một máy chủ.

Quét và Lấy hàng loạt

Hai toán tử này được hiển thị cho người dùng bằng cách chỉ định WHERE CLAUSE, ví dụ:cột WHERE> x và cột để quét và cột WHERE =x quên đi. Các hoạt động được thực hiện trong trình thực thi và trình điều khiển chỉ xây dựng các hoạt động này. Bên trong chúng được chuyển đổi để quét và / hoặc lấy, và Iterator [Hàng] được trả về công cụ xúc tác để xử lý lớp trên.

Cách sử dụng

Sau đây minh họa quy trình cơ bản về cách sử dụng trình kết nối. Để biết thêm chi tiết và trường hợp sử dụng nâng cao, chẳng hạn như hỗ trợ Avro và khóa tổng hợp, vui lòng tham khảo các ví dụ trong kho lưu trữ.

1) Xác định danh mục cho ánh xạ lược đồ:

 [code language ="scala"] def catalog =s "" "{|" table ":{" namespace ":" default "," name ":" table1 "}, |" rowkey ":" key " , | "Cột":{| "col0":{"cf":"rowkey", "col":"key", "type":"string"}, | "col1":{"cf":"cf1 "," col ":" col1 "," type ":" boolean "}, |" col2 ":{" cf ":" cf2 "," col ":" col2 "," type ":" double "}, | "col3":{"cf":"cf3", "col":"col3", "type":"float"}, | "col4":{"cf":"cf4", "col":" col4 "," type ":" int "}, |" col5 ":{" cf ":" cf5 "," col ":" col5 "," type ":" bigint "}, |" col6 ":{" cf ":" cf6 "," col ":" col6 "," type ":" smallint "}, |" col7 ":{" cf ":" cf7 "," col ":" col7 "," type ":"string"}, | "col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} |} |} "" ". dảiMargin [/ code]  

2) Chuẩn bị dữ liệu và điền vào bảng HBase: Trường hợp
lớp HBaseRecord (col0:String, col1:Boolean, col2:Double, col3:Float, col4:Int, col5:Long, col6:Short, col7:String, col8:Byte)

đối tượng HBaseRecord {def apply (i:Int, t:String):HBaseRecord ={val s =s ”” ”row $ {“% 03d ”.format (i)}” ”” HBaseRecord (s, i% 2 ==0, i.toDouble, i.toFloat, i, i.toLong, i.toShort, s ”String $ i:$ t”, i.toByte)}}

val data =(0 to 255) .map {i => HBaseRecord (i, “extra”)}

sc.parallelize (data) .toDF.write.options (
Map (HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> “5”))
.format (“org.apache.spark. sql.execution.datasources.hbase ”)
.save ()

3) Tải DataFrame:
def withCatalog (cat:String):DataFrame ={
sqlContext
.read
.options (Map (HBaseTableCatalog.tableCatalog-> cat))
.format ( “Org.apache.spark.sql.execution.datasources.hbase”)
.load ()
}

val df =withCatalog (danh mục)

4) Truy vấn tích hợp ngôn ngữ:
val s =df.filter ((($ ”col0 ″ <=“ row050 ″ &&$ ”col0”> “row040”) ||
$ ”col0 ″ ===“ row005 ”||
$ ”col0 ″ ===“ row020 ”||
$” col0 ″ ===“r20” ||
$ ”col0 ″ <=“ row005 ”) &&
($ ”Col4 ″ ===1 ||
$” col4 ″ ===42))
.select (“col0”, “col1”, “col4”)
s .show

5) Truy vấn SQL:
df.registerTempTable (“table”)
sqlContext.sql (“select count (col1) from table”). hiển thị

Định cấu hình Gói Spark

Người dùng có thể sử dụng đầu nối Spark-on-HBase như một gói Spark tiêu chuẩn. Để bao gồm gói trong việc sử dụng ứng dụng Spark của bạn:

spark-shell, pyspark hoặc spark-submit

> $ SPARK_HOME / bin / spark-shell –packages zhzhan:shc:0.0.11-1.6.1-s_2.10

Người dùng cũng có thể bao gồm gói này làm phần phụ thuộc trong tệp SBT của bạn. Định dạng là spark-package-name:version

spDependencies + =“zhzhan / shc:0.0.11-1.6.1-s_2.10”

Đang chạy trong Cụm bảo mật

Để chạy trong một cụm hỗ trợ Kerberos, người dùng phải đưa các lọ liên quan đến HBase vào đường dẫn phân vùng vì việc lấy lại và gia hạn mã thông báo HBase được thực hiện bởi Spark và độc lập với trình kết nối. Nói cách khác, người dùng cần khởi tạo môi trường theo cách bình thường, thông qua kinit hoặc bằng cách cung cấp chính / keytab. Các ví dụ sau đây cho thấy cách chạy trong một cụm an toàn với cả chế độ sợi-khách và cụm sợi. Lưu ý rằng SPARK_CLASSPATH phải được đặt cho cả hai chế độ và jar ví dụ chỉ là một trình giữ chỗ cho Spark.

export SPARK_CLASSPATH =/ usr / hdp / current / hbase-client / lib / hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/ usr / hdp / current / hbase- client / lib / hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar

Giả sử hrt_qa là một tài khoản không có headless, người dùng có thể sử dụng lệnh sau cho kinit:

kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa

/ usr / hdp / current / spark-client / bin / spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master fiber-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 –num-Operating 4 –driver-memory 512m –executor-memory 512m –executor-core 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar

/ usr / hdp / current / spark-client / bin / spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master fiber-cluster –files / etc / hbase / conf / hbase -site.xml –packages zhzhan:shc:0.0.11-1.6.1-s_2.10 –num-Operating 4 –driver-memory 512m –executor-memory 512m –executor-core 1 / usr / hdp / current / spark- client / lib / spark -amples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar

Kết hợp tất cả lại với nhau

Chúng tôi vừa đưa ra một cái nhìn tổng quan nhanh về cách HBase hỗ trợ Spark ở cấp DataFrame. Với DataFrame API, các ứng dụng Spark có thể làm việc với dữ liệu được lưu trữ trong bảng HBase dễ dàng như bất kỳ dữ liệu nào được lưu trữ trong các nguồn dữ liệu khác. Với tính năng mới này, dữ liệu trong bảng HBase có thể dễ dàng được sử dụng bởi các ứng dụng Spark và các công cụ tương tác khác, ví dụ:người dùng có thể chạy một truy vấn SQL phức tạp trên đầu bảng HBase bên trong Spark, thực hiện nối bảng với Dataframe hoặc tích hợp với Spark Streaming để triển khai một hệ thống phức tạp hơn.

Tiếp theo là gì?

Hiện tại, trình kết nối được lưu trữ trong repo Hortonworks và được xuất bản dưới dạng gói Spark. Nó đang trong quá trình được chuyển sang thân Apache HBase. Trong quá trình di chuyển, chúng tôi đã xác định được một số lỗi quan trọng trong thân HBase và chúng sẽ được sửa cùng với quá trình hợp nhất. Công việc cộng đồng được theo dõi bởi ô HBase JIRA HBASE-14789, bao gồm HBASE-14795 và HBASE-14796 để tối ưu hóa kiến ​​trúc tính toán cơ bản cho Scan và BulkGet, HBASE-14801 cung cấp giao diện người dùng JSON để dễ sử dụng, HBASE-15336 cho đường dẫn ghi DataFrame, HBASE-15334 để hỗ trợ Avro, HBASE-15333 để hỗ trợ các kiểu nguyên thủy của Java, chẳng hạn như short, int, long, float và double, v.v., HBASE-15335 để hỗ trợ khóa hàng tổng hợp và HBASE-15572 để thêm ngữ nghĩa dấu thời gian tùy chọn. Chúng tôi mong muốn sản xuất một phiên bản tương lai của trình kết nối giúp trình kết nối hoạt động dễ dàng hơn.

Lời cảm ơn

Chúng tôi muốn cảm ơn Hamel Kothari, Sudarshan Kadambi và nhóm Bloomberg đã hướng dẫn chúng tôi trong công việc này và cũng giúp chúng tôi xác thực công việc này. Chúng tôi cũng muốn cảm ơn cộng đồng HBase đã cung cấp phản hồi của họ và làm cho điều này trở nên tốt hơn. Cuối cùng, công việc này đã tận dụng các bài học từ các tích hợp Spark HBase trước đó và chúng tôi muốn cảm ơn các nhà phát triển của họ đã mở đường.

Tham khảo:

SHC:https://github.com/hortonworks/shc-release

Gói Spark:http://spark-packages.org/package/zhzhan/shc

Apache HBase:https://hbase.apache.org/

Apache Spark:http://spark.apache.org/


  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. 10 tính năng hàng đầu của Big Data Hadoop

  2. Sử dụng COD và CML để xây dựng các ứng dụng dự đoán dữ liệu kho

  3. NameNode Automatic Failover trong Hadoop HDFS là gì?

  4. Giao dịch HBase là gì?

  5. Tương lai của Hadoop - Dự đoán về tiền lương và công việc trong phân tích dữ liệu lớn