HBase
 sql >> Cơ Sở Dữ Liệu >  >> NoSQL >> HBase

Tuần tự hóa thư mạnh mẽ trong Apache Kafka bằng Apache Avro, Phần 1

Trong Apache Kafka, các ứng dụng Java được gọi là nhà sản xuất viết các thông điệp có cấu trúc tới một cụm Kafka (được tạo thành từ các nhà môi giới). Tương tự, các ứng dụng Java được gọi là người tiêu dùng đọc các thông báo này từ cùng một cụm. Trong một số tổ chức, có các nhóm khác nhau chịu trách nhiệm viết và quản lý người sản xuất và người tiêu dùng. Trong những trường hợp như vậy, một điểm khó khăn lớn có thể nằm ở việc phối hợp định dạng thông điệp đã thống nhất giữa người sản xuất và người tiêu dùng.

Ví dụ này trình bày cách sử dụng Apache Avro để tuần tự hóa các bản ghi được tạo cho Apache Kafka trong khi cho phép phát triển các lược đồ và cập nhật không đồng bộ các ứng dụng của nhà sản xuất và người tiêu dùng.

Serialization và Deserialization

Một bản ghi Kafka (trước đây được gọi là tin nhắn) bao gồm một khóa, một giá trị và các tiêu đề. Kafka không biết về cấu trúc của dữ liệu trong khóa và giá trị của bản ghi. Nó xử lý chúng dưới dạng mảng byte. Nhưng các hệ thống đọc bản ghi từ Kafka thực sự quan tâm đến dữ liệu trong các bản ghi đó. Vì vậy, bạn cần tạo ra dữ liệu ở định dạng có thể đọc được. Định dạng dữ liệu bạn sử dụng nên

  • Nhỏ gọn
  • Hãy nhanh chóng mã hóa và giải mã
  • Cho phép tiến hóa
  • Cho phép các hệ thống ngược dòng (những hệ thống ghi vào một cụm Kafka) và các hệ thống hạ nguồn (những hệ thống đọc từ cùng một cụm Kafka) nâng cấp lên các lược đồ mới hơn vào các thời điểm khác nhau

JSON, chẳng hạn, là tự giải thích nhưng không phải là một định dạng dữ liệu nhỏ gọn và phân tích cú pháp chậm. Avro là một khuôn khổ tuần tự hóa nhanh chóng tạo ra đầu ra tương đối nhỏ gọn. Nhưng để đọc các bản ghi Avro, bạn yêu cầu lược đồ mà dữ liệu đã được tuần tự hóa.

Một tùy chọn là lưu trữ và chuyển lược đồ với chính bản ghi. Điều này là tốt trong một tệp nơi bạn lưu trữ lược đồ một lần và sử dụng nó cho một số lượng lớn các bản ghi. Tuy nhiên, việc lưu trữ lược đồ trong mỗi và mọi bản ghi Kafka sẽ bổ sung thêm chi phí đáng kể về không gian lưu trữ và sử dụng mạng. Một tùy chọn khác là có một tập hợp các ánh xạ lược đồ-định danh đã được thỏa thuận và tham chiếu đến các lược đồ bởi các số nhận dạng của chúng trong bản ghi.

Từ Vật thể đến Bản ghi Kafka và Quay lại

Các ứng dụng của nhà sản xuất không cần phải chuyển đổi dữ liệu trực tiếp sang mảng byte. KafkaProduction là một lớp chung cần người dùng của nó chỉ định các loại khóa và giá trị. Sau đó, nhà sản xuất chấp nhận các bản sao của ProducerRecord có cùng loại tham số. Việc chuyển đổi từ đối tượng sang mảng byte được thực hiện bởi bộ nối tiếp. Kafka cung cấp một số trình tuần tự nguyên thủy:ví dụ:IntegerSerializer , ByteArraySerializer , StringSerializer . Về phía người tiêu dùng, các Deserializers tương tự chuyển đổi mảng byte thành một đối tượng mà ứng dụng có thể xử lý.

Vì vậy, thật hợp lý khi kết nối ở cấp độ Serializer và Deserializer và cho phép các nhà phát triển ứng dụng nhà sản xuất và người tiêu dùng sử dụng giao diện thuận tiện do Kafka cung cấp. Mặc dù các phiên bản mới nhất của Kafka cho phép ExtendedSerializersExtendedDeserializers để truy cập các tiêu đề, chúng tôi quyết định đưa mã nhận dạng giản đồ vào khóa và giá trị của bản ghi Kafka thay vì thêm tiêu đề bản ghi.

Avro Essentials

Avro là một khuôn khổ tuần tự hóa dữ liệu (và gọi thủ tục từ xa). Nó sử dụng một tài liệu JSON được gọi là lược đồ để mô tả cấu trúc dữ liệu. Hầu hết việc sử dụng Avro là thông qua GenericRecord hoặc các lớp con của SpecificRecord. Các lớp Java được tạo từ các lược đồ Avro là các lớp con của lược đồ sau, trong khi các lớp trước có thể được sử dụng mà không cần biết trước về cấu trúc dữ liệu được làm việc với.

Khi hai lược đồ thỏa mãn một tập hợp các quy tắc tương thích, dữ liệu được viết bằng một lược đồ (được gọi là lược đồ người viết) có thể được đọc như thể nó được viết bằng lược đồ kia (được gọi là lược đồ người đọc). Các lược đồ có dạng chuẩn có tất cả các chi tiết không liên quan đến quá trình tuần tự hóa, chẳng hạn như nhận xét, được lược bỏ để hỗ trợ kiểm tra tính tương đương.

VersonedSchema và SchemaProvider

Như đã đề cập trước đây, chúng ta cần ánh xạ 1-1 giữa các lược đồ và số nhận dạng của chúng. Đôi khi, việc tham chiếu đến các lược đồ theo tên sẽ dễ dàng hơn. Khi một lược đồ tương thích được tạo, nó có thể được coi là phiên bản tiếp theo của lược đồ. Do đó chúng ta có thể tham chiếu đến các lược đồ với một cặp tên, phiên bản. Chúng ta hãy gọi lược đồ, số nhận dạng, tên và phiên bản của nó cùng nhau là VersionedSchema . Đối tượng này có thể chứa siêu dữ liệu bổ sung mà ứng dụng yêu cầu.

public class VersionedSchema {
  private final int id;
  private final String name;
  private final int version;
  private final Schema schema;

  public VersionedSchema(int id, String name, int version, Schema schema) {
    this.id = id;
    this.name = name;
    this.version = version;
    this.schema = schema;
  }

  public String getName() {
    return name;
  }

  public int getVersion() {
    return version;
  }

  public Schema getSchema() {
    return schema;
  }
    
  public int getId() {
    return id;
  }
}

SchemaProvider các đối tượng có thể tra cứu các bản sao của VersionedSchema .

public interface SchemaProvider extends AutoCloseable {
  public VersionedSchema get(int id);
  public VersionedSchema get(String schemaName, int schemaVersion);
  public VersionedSchema getMetadata(Schema schema);
}

Cách triển khai giao diện này được đề cập trong “Triển khai Cửa hàng lược đồ” trong một bài đăng blog trong tương lai.

Sắp xếp thứ tự dữ liệu chung

Khi tuần tự hóa một bản ghi, trước tiên chúng ta cần tìm ra Lược đồ nào sẽ sử dụng. Mỗi bản ghi có một getSchema phương pháp. Nhưng việc tìm ra mã định danh từ lược đồ có thể tốn nhiều thời gian. Nói chung sẽ hiệu quả hơn nếu đặt lược đồ tại thời điểm khởi tạo. Điều này có thể được thực hiện trực tiếp bằng mã định danh hoặc theo tên và phiên bản. Hơn nữa, khi tạo ra nhiều chủ đề, chúng tôi có thể muốn đặt các lược đồ khác nhau cho các chủ đề khác nhau và tìm ra lược đồ từ tên chủ đề được cung cấp dưới dạng tham số cho phương thức serialize(T, String) . Logic này được bỏ qua trong các ví dụ của chúng tôi vì mục đích ngắn gọn và đơn giản.

private VersionedSchema getSchema(T data, String topic) {
  return schemaProvider.getMetadata( data.getSchema());
}

Với lược đồ trong tay, chúng ta cần lưu trữ nó trong tin nhắn của mình. Việc sắp xếp thứ tự ID như một phần của tin nhắn cung cấp cho chúng tôi một giải pháp nhỏ gọn, vì tất cả điều kỳ diệu xảy ra trong Serializer / Deserializer. Nó cũng cho phép tích hợp rất dễ dàng với các khung và thư viện khác đã hỗ trợ Kafka và cho phép người dùng sử dụng bộ tuần tự của riêng họ (chẳng hạn như Spark).

Sử dụng cách tiếp cận này, trước tiên chúng tôi viết mã định danh giản đồ trên bốn byte đầu tiên.

private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {
    try (DataOutputStream os = new DataOutputStream(stream)) {
    os.writeInt(id);
  }
}

Sau đó, chúng ta có thể tạo một DatumWriter và tuần tự hóa đối tượng.

private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {
  BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
  DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema);
  datumWriter.write(data, encoder);
  encoder.flush();
}

Kết hợp tất cả những điều này lại với nhau, chúng tôi đã triển khai một trình tuần tự hóa dữ liệu chung.

public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> {

  private SchemaProvider schemaProvider;

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    schemaProvider = SchemaUtils.getSchemaProvider(configs);
  }

  @Override
  public byte[] serialize(String topic, T data) {
    try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
      VersionedSchema schema = getSchema(data, topic);
   
      writeSchemaId(stream, schema.getId());
      writeSerializedAvro(stream, data, schema.getSchema());
      return stream.toByteArray();
    } catch (IOException e) {
      throw new RuntimeException("Could not serialize data", e);
    }
  }

  private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...}

  private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...}

  private VersionedSchema getSchema(T data, String topic) {...}

  @Override
  public void close() {
    try {
      schemaProvider.close();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

Hủy số hóa dữ liệu chung

Deserialization có thể hoạt động với một giản đồ (dữ liệu giản đồ được viết bằng) nhưng bạn có thể chỉ định một lược đồ đọc khác. Lược đồ trình đọc phải tương thích với lược đồ mà dữ liệu đã được tuần tự hóa, nhưng không cần phải tương đương. Vì lý do này, chúng tôi đã giới thiệu tên lược đồ. Bây giờ chúng tôi có thể chỉ định rằng chúng tôi muốn đọc dữ liệu với phiên bản cụ thể của một lược đồ. Tại thời điểm khởi tạo, chúng tôi đọc các phiên bản lược đồ mong muốn cho mỗi tên lược đồ và lưu trữ siêu dữ liệu trong readerSchemasByName để truy cập nhanh. Giờ đây, chúng tôi có thể đọc mọi bản ghi được viết bằng phiên bản tương thích của lược đồ như thể nó được viết với phiên bản được chỉ định.

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  this.schemaProvider = SchemaUtils.getSchemaProvider(configs);
  this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider);
}

Khi một bản ghi cần được giải mã hóa, trước tiên chúng ta đọc mã định danh của lược đồ người viết. Điều này cho phép tra cứu lược đồ người đọc theo tên. Với cả hai lược đồ có sẵn, chúng ta có thể tạo một GeneralDatumReader và đọc hồ sơ.

@Override
public GenericData.Record deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {

    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);

    VersionedSchema readerSchema =
        readerSchemasByName.get(writerSchema.getName());
    GenericData.Record avroRecord = readAvroRecord(stream,
        writerSchema.getSchema(), readerSchema.getSchema());
    return avroRecord;
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private int readSchemaId(InputStream stream ) throws IOException {
  try(DataInputStream is = new DataInputStream(stream)) {
    return is.readInt();
  }
}

private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema,
      readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  GenericData.Record record = new GenericData.Record(readerSchema);
  datumReader.read(record, decoder);
  return record;
}

Xử lý các từ cụ thể

Thường xuyên có một lớp chúng tôi muốn sử dụng cho các bản ghi của mình. Sau đó, lớp này thường được tạo từ một lược đồ Avro. Apache Avro cung cấp các công cụ để tạo mã Java từ các lược đồ. Một trong những công cụ như vậy là plugin Avro Maven. Các lớp đã tạo có lược đồ mà chúng được tạo từ có sẵn trong thời gian chạy. Điều này làm cho việc tuần tự hóa và giải mã hóa đơn giản và hiệu quả hơn. Để tuần tự hóa, chúng ta có thể sử dụng lớp để tìm hiểu về mã định danh giản đồ sẽ sử dụng.

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString();
  try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) {
    Class<?> recordClass = Class.forName(className);
    Schema writerSchema = new
        SpecificData(recordClass.getClassLoader()).getSchema(recordClass);
    this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId();
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}

Vì vậy, chúng ta không cần logic để xác định lược đồ từ chủ đề và dữ liệu. Chúng tôi sử dụng lược đồ có sẵn trong lớp bản ghi để ghi bản ghi.

Tương tự như vậy, đối với quá trình deserialization, lược đồ người đọc có thể được tìm ra từ chính lớp đó. Logic hủy số liệu hóa trở nên đơn giản hơn vì lược đồ trình đọc được cố định tại thời điểm cấu hình và không cần phải tra cứu theo tên lược đồ.

@Override
public T deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {
    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);
    return readAvroRecord(stream, writerSchema.getSchema(), readerSchema);
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  return datumReader.read(null, decoder);
}

Đọc bổ sung

Để biết thêm thông tin về khả năng tương thích của lược đồ, hãy tham khảo thông số kỹ thuật Avro cho Độ phân giải lược đồ.

Để biết thêm thông tin về các biểu mẫu chuẩn, hãy tham khảo thông số kỹ thuật Avro để phân tích cú pháp Biểu mẫu chuẩn cho các lược đồ.

Lần tới…

Phần 2 sẽ trình bày việc triển khai một hệ thống để lưu trữ các định nghĩa lược đồ Avro.


  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. tạo nhanh một bảng hbase mẫu

  2. Điều chỉnh Bộ sưu tập rác Java cho HBase

  3. Đường dẫn ghi Apache HBase

  4. Kho dữ liệu thế hệ tiếp theo tại Santander Vương quốc Anh

  5. Apache Phoenix dành cho CDH