Перейти к основному содержанию
Перейти к основному содержанию

CDC из DynamoDB в ClickHouse

На этой странице описано, как настроить CDC из DynamoDB в ClickHouse с использованием ClickPipes. Эта интеграция состоит из двух компонентов:

  1. Начальный снимок данных через S3 ClickPipes
  2. Обновления в реальном времени через Kinesis ClickPipes

Данные будут приниматься в таблицу ReplacingMergeTree. Этот движок таблицы обычно используется в сценариях CDC, чтобы можно было применять операции обновления. Подробнее об этом подходе можно узнать в следующих статьях блога:

1. Настройте поток Kinesis

Сначала необходимо включить поток Kinesis для таблицы DynamoDB, чтобы фиксировать изменения в режиме реального времени. Делайте это до создания снимка, чтобы не пропустить ни одних данных. Подробности см. в руководстве AWS здесь.

Поток DynamoDB Kinesis

2. Создайте снимок

Далее мы создадим снимок таблицы DynamoDB. Это можно сделать, выполнив экспорт AWS в S3. Руководство AWS находится здесь. Необходимо выполнить полную выгрузку («Full export») в формате DynamoDB JSON.

Экспорт DynamoDB в S3

3. Загрузите снимок в ClickHouse

Создайте необходимые таблицы

Данные снимка из DynamoDB будут выглядеть примерно так:

{
  "age": {
    "N": "26"
  },
  "first_name": {
    "S": "sally"
  },
  "id": {
    "S": "0A556908-F72B-4BE6-9048-9E60715358D4"
  }
}

Обратите внимание, что данные имеют вложенный формат. Нам нужно будет развернуть (flatten) эти данные перед загрузкой в ClickHouse. Это можно сделать с помощью функции JSONExtract в ClickHouse в materialized view.

Нам нужно создать три таблицы:

  1. Таблица для хранения исходных данных из DynamoDB
  2. Таблица для хранения итоговых развернутых данных (таблица назначения)
  3. materialized view для разворачивания данных

Для приведённого выше примера данных из DynamoDB таблицы в ClickHouse будут выглядеть следующим образом:

/* Таблица снимков */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
    `item` String
)
ORDER BY tuple();

/* Таблица для финальных денормализованных данных */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
    JSONExtractString(item, 'id', 'S') AS id,
    JSONExtractInt(item, 'age', 'N') AS age,
    JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";

/* Таблица для финальных денормализованных данных */
CREATE TABLE IF NOT EXISTS "default"."destination" (
    "id" String,
    "first_name" String,
    "age" Int8,
    "version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;

Для целевой таблицы есть несколько требований:

  • Эта таблица должна быть таблицей ReplacingMergeTree
  • В таблице должен быть столбец version
    • На последующих шагах мы будем сопоставлять поле ApproximateCreationDateTime из потока Kinesis со столбцом version.
  • Таблица должна использовать ключ партиционирования в качестве ключа сортировки (задается через ORDER BY)
    • Строки с одним и тем же ключом сортировки будут дедуплироваться на основе столбца version.

Создайте ClickPipe для снимка

Теперь вы можете создать ClickPipe для загрузки данных снимка из S3 в ClickHouse. Следуйте руководству по S3 ClickPipe здесь, но используйте следующие настройки:

  • Ingest path: вам нужно будет найти путь к экспортированным json‑файлам в S3. Путь будет выглядеть примерно так:
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
  • Format: JSONEachRow
  • Table: Ваша таблица snapshot (например, default.snapshot в примере выше)

После создания данные начнут поступать в таблицу snapshot и целевую таблицу. Вам не нужно дожидаться завершения загрузки snapshot, чтобы переходить к следующему шагу.

4. Создание Kinesis ClickPipe

Теперь мы можем настроить Kinesis ClickPipe для фиксации изменений в реальном времени из потока Kinesis. Следуйте руководству по Kinesis ClickPipe здесь, при этом используйте следующие настройки:

  • Stream: поток Kinesis, использованный на шаге 1
  • Table: ваша целевая таблица (например, default.destination в примере выше)
  • Flatten object: true
  • Column mappings:
    • ApproximateCreationDateTime: version
    • Сопоставьте остальные поля с соответствующими целевыми столбцами, как показано ниже
Сопоставление столбцов DynamoDB

5. Очистка (необязательно)

После завершения snapshot ClickPipe вы можете удалить snapshot-таблицу и materialized view.

DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";