HBase
 sql >> Cơ Sở Dữ Liệu >  >> NoSQL >> HBase

Cách thực hiện:Quét các bảng Apache HBase muối với các phạm vi chính theo khu vực cụ thể trong MapReduce

Cảm ơn Pengyu Wang, nhà phát triển phần mềm tại FINRA, đã cho phép xuất bản lại bài đăng này.

Bảng Apache HBase muối với phân tách trước là một giải pháp HBase hiệu quả đã được chứng minh để cung cấp phân phối khối lượng công việc đồng nhất trên các Máy chủ Vùng và ngăn chặn các điểm nóng trong quá trình ghi hàng loạt. Trong thiết kế này, một phím hàng được làm bằng phím hợp lý cộng với muối ở đầu. Một cách để tạo muối là tính toán n (số vùng) mô-đun trên mã băm của khóa hàng logic (ngày, v.v.).

Các phím hàng muối

Ví dụ:một bảng chấp nhận tải dữ liệu hàng ngày có thể sử dụng các khóa hàng logic bắt đầu bằng ngày và chúng tôi muốn chia trước bảng này thành 1.000 vùng. Trong trường hợp này, chúng tôi dự kiến ​​sẽ tạo ra 1.000 loại muối khác nhau. Ví dụ:muối có thể được tạo ra như:

StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey 

logicalKey = 2015-04-26|abc
rowKey = 893|2015-04-26|abc

Kết quả từ hashCode() với modulo cung cấp tính ngẫu nhiên cho giá trị muối từ “000” đến “999”. Với biến đổi chính này, bảng được phân chia trước trên các ranh giới muối khi nó được tạo. Điều này sẽ làm cho khối lượng hàng được phân phối đồng nhất trong khi tải các tệp HF với tải hàng loạt MapReduce. Nó đảm bảo rằng các khóa hàng có cùng muối rơi vào cùng một vùng.

Trong nhiều trường hợp sử dụng, chẳng hạn như lưu trữ dữ liệu, bạn cần quét hoặc sao chép dữ liệu qua một phạm vi khóa logic cụ thể (phạm vi ngày) bằng cách sử dụng công việc MapReduce. Các công việc MapReduce trong bảng tiêu chuẩn được thiết lập bằng cách cung cấp Scan ví dụ với các thuộc tính phạm vi chính.

Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);
scan.setStartRow(Bytes.toBytes("2015-04-26"));
scan.setStopRow(Bytes.toBytes("2015-04-27"));

/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job, 
true, 
TableInputFormat.class
);
…

Tuy nhiên, việc thiết lập một công việc như vậy sẽ trở nên khó khăn đối với các bảng đã tách sẵn muối. Các phím hàng bắt đầu và dừng sẽ khác nhau đối với từng vùng vì mỗi vùng có một muối riêng. Và chúng tôi không thể chỉ định nhiều phạm vi cho một Scan ví dụ.

Để giải quyết vấn đề này, chúng ta cần xem xét cách hoạt động của bảng MapReduce. Nói chung, khung công tác MapReduce tạo ra một tác vụ bản đồ để đọc và xử lý từng phần tách đầu vào. Mỗi phần tách được tạo trong InputFormat cơ sở lớp, theo phương thức getSplits() .

Trong công việc MapReduce bảng HBase, TableInputFormat được sử dụng làm InputFormat . Bên trong triển khai, getSplits() phương thức được ghi đè để truy xuất các phím hàng bắt đầu và dừng từ Scan ví dụ. Khi các phím hàng bắt đầu và dừng kéo dài trên nhiều vùng, phạm vi được chia theo ranh giới vùng và trả về danh sách TableSplit các đối tượng bao phủ phạm vi phím quét. Thay vì dựa trên khối HDFS, TableSplit s được dựa trên khu vực. Bằng cách ghi đè getSplits() , chúng tôi có thể kiểm soát TableSplit .

Xây dựng bảng tùy chỉnhInputFormat

Để thay đổi hành vi của getSplits() phương thức, một lớp tùy chỉnh mở rộng TableInputFormat bắt buộc. Mục đích của getSplits() ở đây là bao gồm phạm vi khóa hợp lý trong mỗi khu vực, xây dựng phạm vi khóa hàng của chúng bằng muối duy nhất của chúng. Lớp HTable cung cấp phương thức getStartEndKeys() trả về các phím hàng bắt đầu và kết thúc cho mỗi vùng. Từ mỗi phím bắt đầu, hãy phân tích cú pháp muối tương ứng cho vùng.

Pair keys = table.getStartEndKeys();
for (int i = 0; i < keys.getFirst().length; i++) {

// The first 3 bytes is the salt, for the first region, start key is empty, so apply “000”
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}
…
}

Cấu hình công việc vượt qua phạm vi khóa logic

TableInputFormat truy xuất phím bắt đầu và dừng từ Scan ví dụ. Vì chúng tôi không thể sử dụng Scan trong công việc MapReduce của chúng tôi, chúng tôi có thể sử dụng Configuration thay vào đó, để chuyển hai biến này và chỉ có khóa bắt đầu và dừng hợp lý là đủ tốt (một biến có thể là ngày tháng hoặc thông tin kinh doanh khác). getSplits() phương thức có JobContext đối số, cá thể cấu hình có thể được đọc là context.getConfiguration() .

Trong trình điều khiển MapReduce:

Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);

conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");

Trong Custom TableInputFormat :

@Override 
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
String scanStart = conf.get("logical.scan.start");
String scanStop = conf.get("logical.scan.stop");
…
}

Xây dựng lại phạm vi khóa muối theo khu vực

Bây giờ chúng ta đã có muối và khóa bắt đầu / dừng hợp lý cho từng vùng, chúng ta có thể xây dựng lại phạm vi khóa hàng thực tế.

byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

Tạo TableSplit cho từng khu vực

Với phạm vi khóa hàng, bây giờ chúng ta có thể khởi tạo TableSplit ví dụ cho khu vực.

List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {
…
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}

Một điều nữa cần xem xét là địa phương dữ liệu. Khung sử dụng thông tin vị trí trong mỗi phần tách đầu vào để gán một nhiệm vụ bản đồ trong máy chủ cục bộ của nó. Đối với TableInputFormat của chúng tôi , chúng tôi sử dụng phương thức getTableRegionLocation() để truy xuất vị trí khu vực phục vụ khóa hàng.

Vị trí này sau đó được chuyển đến TableSplit constructor. Điều này sẽ đảm bảo rằng trình ánh xạ xử lý tách bảng nằm trên cùng một máy chủ khu vực. Một phương thức, được gọi là DNS.reverseDns() , yêu cầu địa chỉ cho máy chủ định danh HBase. Thuộc tính này được lưu trữ trong cấu hình “hbase.nameserver.address “.

this.nameServer = context.getConfiguration().get("hbase.nameserver.address", null);
…

public String getTableRegionLocation(HTable table, byte[] rowKey) throws IOException {
HServerAddress regionServerAddress = table.getRegionLocation(rowKey).getServerAddress();
InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress();
String regionLocation;
try {
regionLocation = reverseDNS(regionAddress);
} catch (NamingException e) {
regionLocation = regionServerAddress.getHostname();
}
return regionLocation;
}

protected String reverseDNS(InetAddress ipAddress) throws NamingException {
String hostName = this.reverseDNSCacheMap.get(ipAddress);
if (hostName == null) {
hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer));
this.reverseDNSCacheMap.put(ipAddress, hostName);
}
return hostName;
}

Mã hoàn chỉnh của getSplits sẽ trông như thế này:

@Override 
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
table = getHTable(conf);
if (table == null) {
throw new IOException("No table was provided.");
}

// Get the name server address and the default value is null.
this.nameServer = conf.get("hbase.nameserver.address", null);
String scanStart = conf.get("region.scan.start");
String scanStop = conf.get("region.scan.stop");

Pair keys = table.getStartEndKeys();
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
throw new RuntimeException("At least one region is expected");
}
List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {

String regionLocation = getTableRegionLocation(table, keys.getFirst()[i]);

String regionSalt = null;
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}
log.info("Total table splits: " + splits.size());
return splits;
}

Sử dụng TableInoutFormat tùy chỉnh trong Trình điều khiển MapReduce

Bây giờ chúng ta cần thay thế TableInputFormat lớp với bản dựng tùy chỉnh mà chúng tôi đã sử dụng để thiết lập công việc MapReduce bảng.

Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);
HTableInterface status_table = new HTable(conf, status_tablename);

conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");

Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);

/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job, 
true, 
MultiRangeTableInputFormat.class
);

Cách tiếp cận của TableInputFormat tùy chỉnh cung cấp khả năng quét hiệu quả và có thể mở rộng cho các bảng HBase được thiết kế để sử dụng muối cho tải dữ liệu cân bằng. Vì quá trình quét có thể bỏ qua bất kỳ khóa hàng không liên quan nào, bất kể bảng lớn như thế nào, độ phức tạp của quá trình quét chỉ bị giới hạn ở kích thước của dữ liệu đích. Trong hầu hết các trường hợp sử dụng, điều này có thể đảm bảo thời gian xử lý tương đối nhất quán khi bảng phát triển.


  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Apache Phoenix dành cho CDH

  2. Cách thực hiện:Quản lý dữ liệu HBase qua Hue

  3. Cloudera Replication Plugin cho phép nhân rộng nền tảng x cho Apache HBase

  4. Hướng dẫn HDFS - Giới thiệu đầy đủ về HDFS cho người mới bắt đầu

  5. Mã tên HDFS Tính khả dụng cao trong Hadoop