Синхронизация преобразованных данных между MongoDB и Elasticsearch с помощью Transporter в Ubuntu 16.04

Transporter – это инструмент с открытым исходным кодом для перемещения данных между разными хранилищами данных. Разработчики часто пишут одноразовые скрипты для выполнения таких задач, как перемещение данных между несколькими базами данных, перемещение данных из файлов в БД или наоборот. Но использование такого инструмента, как Transporter, имеет ряд преимуществ.

Transporter позволяет создавать конвейеры, которые определяют поток из источника (source, откуда читаются данные) в приемник данных (sink, куда записываются данные). Источниками и приемниками могут быть базы данных SQL или NoSQL, плоские файлы и другие ресурсы. Для связи с этими ресурсами Transporter использует адаптеры – подключаемые расширения; проект по умолчанию включает несколько адаптеров для популярных баз данных.

Кроме перемещения данных Transporter также позволяет изменять данные при прохождении через конвейер с помощью трансформаторов. Transporter по умолчанию предоставляет несколько трансформаторов. Также вы можете написать свои собственные трансформаторы, чтобы определить пользовательские настройки данных.

В этом мануале мы рассмотрим пример перемещения и обработки данных из БД MongoDB в Elasticsearch с помощью встроенных адаптеров Transporter и пользовательского трансформатора, написанного на JavaScript.

Требования

Конвейеры Transporter пишутся в JavaScript. Вам не обязательно нужно иметь навыки работы с JavaScript, но лишними они не будут. Больше о JavaScript можно прочитать в этом разделе Информатория.

1: Установка Transporter

Transporter предоставляет двоичные файлы для большинства популярных операционных систем. Процесс установки для Ubuntu состоит из двух этапов: загрузки двоичного файла Linux и создания его исполняемого файла.

Сначала получите ссылку на последнюю версию Transporter на GitHub. Скопируйте ссылку, которая заканчивается на -linux-amd64. В этом руководстве используется v0.5.2 (последняя на момент написания версия).

Загрузите бинарный файл в домашний каталог:

cd
wget https://github.com/compose/transporter/releases/download/v0.5.2/transporter-0.5.2-linux-amd64

Читайте также: Технические и практические рекомендации к руководствам 8host

Переместите файл в /usr/local/bin/:

mv transporter-*-linux-amd64 /usr/local/bin/transporter

  • Сделайте его исполняемым:

chmod +x /usr/local/bin/transporter

  • Чтобы убедиться, что установка Transporter прошла успешно, запустите бинарный файл:
  • transporter

В выводе вы увидите справочную информацию и номер версии:

USAGE
transporter <command> [flags]
COMMANDS
run       run pipeline loaded from a file
. . .
VERSION
0.5.2

Чтобы использовать Transporter для перемещения данных из MongoDB в Elasticsearch, нужны две вещи: данные MongoDB, которые нужно переместить, и конвейер, который сообщит Transporter, как их перемещать. Создайте тестовые данные. Если у вас уже есть база данных MongoDB, которую вы хотите переместить, вы можете пропустить следующий раздел и перейти прямо к разделу 3.

2: Создание тестовых данных MongoDB

Теперь нужно создать тестовую базу данных MongoDB с одной коллекцией и добавить в нее несколько документов. Затем вы попробуете перенести и преобразовать эти тестовые данные  через конвейер Transporter.

Сначала подключитесь к базе данных MongoDB.

mongo

Командная строка изменится:

mongo>

Выберите БД, с которой нужно работать. Здесь это БД my_application:

use my_application

В MongoDB не нужно явно создавать базу данных или коллекцию. После того, как вы начнете добавлять данные в БД, имя которой вы указали в предыдущей команде, эта база данных будет автоматически создана.

Итак, чтобы создать базу данных my_application, сохраните два документа в своей коллекции users: один из них представляет Sandy Shark, а другой — Gilly Glowfish. Это будут ваши тестовые данные.

db.users.save({"firstName": "Sandy", "lastName": "Shark"});
db.users.save({"firstName": "Gilly", "lastName": "Glowfish"});

Добавив документы, вы можете отправить запрос в коллекцию users:

db.users.find().pretty();

Вы увидите примерно такой вывод (столбцы _id будут отличаться). MongoDB автоматически добавляет уникальные ID объектов для идентификации документов в коллекции.

{
"_id" : ObjectId("59299ac7f80b31254a916456"),
"firstName" : "Sandy",
"lastName" : "Shark"
}
{
"_id" : ObjectId("59299ac7f80b31254a916457"),
"firstName" : "Gilly",
"lastName" : "Glowfish"
}

Нажмите CTRL+C, чтобы закрыть оболочку MongoDB.

Теперь нужно создать конвейер Transporter, чтобы переместить данные из MongoDB в Elasticsearch.

3: Создание базового конвейера

Конвейер в Transporter определяется по умолчанию файлом JavaScript по имени pipeline.js. Встроенная команда init создает базовый файл конфигурации в правильном каталоге, учитывая источник и приемник.

Инициализируйте базовый файл pipeline.js, где MongoDB выступает как источник, а Elasticsearch как приемник.

transporter init mongodb elasticsearch

Вы увидите:

Writing pipeline.js...

Редактировать pipeline.js на данном этапе не нужно, но давайте посмотрим, как он работает.

Файл выглядит так. Вы можете просмотреть содержимое файла, используя команду cat pipeline.js и less pipeline.js (остановить less можно с помощью q) или с помощью текстового редактора.

var source = mongodb({
"uri": "${MONGODB_URI}"
// "timeout": "30s",
// "tail": false,
// "ssl": false,
// "cacerts": ["/path/to/cert.pem"],
// "wc": 1,
// "fsync": false,
// "bulk": false,
// "collection_filters": "{}",
// "read_preference": "Primary"
})
var sink = elasticsearch({
"uri": "${ELASTICSEARCH_URI}"
// "timeout": "10s", // defaults to 30s
// "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service
// "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service
// "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch
})
t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")

Строки, которые начинаются с var source и var sink, определяют переменные JavaScript для адаптеров MongoDB и Elasticsearch соответственно. Определите переменные среды ODB_URI и ELASTICSEARCH_URI, которые адаптеры смогут использовать позже.

Строки, которые начинаются с //, закомментированы. Они выделяют некоторые общие параметры конфигурации, которые вы можете установить для своего конвейера, но в данном случае они не используются для основного конвейера.

Читайте также: Переменные, области и поднятие переменных в JavaScript

Последняя строка соединяет источник и приемник. Переменная transporter или t позволяет получить доступ к конвейеру. Функции .Source() и .Save() позволяют добавить источник и приемник, используя переменные source и sink, определенные ранее в файле.

Читайте также: Определение функций в JavaScript

Третьим аргументом функций Source() и Save() является namespace. Передавая /.*/ в качестве последнего аргумента, вы указываете, что нужно перенести все данные из MongoDB и сохранить их в том же пространстве имен в Elasticsearch.

Прежде чем запустить этот конвейер, нужно установить переменные среды для MongoDB URI и Elasticsearch URI. В этом примере обе переменные хранятся локально с настройками по умолчанию (обязательно отредактируйте эти параметры, если вы используете существующие экземпляры MongoDB или Elasticsearch).

Читайте также: Чтение и установка переменных среды и оболочки на сервере Linux

export MONGODB_URI='mongodb://localhost/my_application'
export ELASTICSEARCH_URI='http://localhost:9200/my_application'

Теперь можно запустить конвейер:

transporter run pipeline.js

Вы увидите вывод, который заканчивается так:

. . .
INFO[0001] metrics source records: 2                     path=source ts=1522942118483391242
INFO[0001] metrics source/sink records: 2                path="source/sink" ts=1522942118483395960
INFO[0001] exit map[source:mongodb sink:elasticsearch]   ts=1522942118483396878

Во второй и третьей строках этот вывод показывает, что в источнике было 2 записи, и что 2 записи были перенесены в приемник.

Чтобы подтвердить, что обе записи были обработаны, вы можете запросить содержимое базы данных my_application в Elasticsearch.

curl $ELASTICSEARCH_URI/_search?pretty=true

Параметр ?pretty=true делает вывод удобочитаемым:

{
"took" : 5,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e9c6687d9f638ced4fe",
"_score" : 1.0,
"_source" : {
"firstName" : "Gilly",
"lastName" : "Glowfish"
}
},
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e986687d9f638ced4fd",
"_score" : 1.0,
"_source" : {
"firstName" : "Sandy",
"lastName" : "Shark"
}
}
]
}
}

Базы данных и коллекции в MongoDB – это аналоги индексов и типов в Elasticsearch. Имея это в виду, вы должны увидеть:

  • Поле _index имеет значение my_application (имя исходной базы данных MongoDB).
  • Поле _type содержит users, имя коллекции MongoDB.
  • Поля firstName и lastName заполняются значениями «Sandy» «Shark» и «Gilly» «Glowfish» соответственно.

Это подтверждает, что обе записи MongoDB были успешно обработаны в Transporter и загружены в Elasticsearch. Теперь добавьте промежуточный этап обработки, который может преобразовывать входные данные.

4: Создание трансформатора

Как следует из названия, трансформаторы изменяют исходные данные перед загрузкой в приемник. Например, они позволяют добавлять новые поля, удалять или изменять существующие поля. Transporter поставляется с предопределенными трансформаторами по умолчанию. Также Transporter поддерживает пользовательские трансформаторы.

Как правило, пользовательские трансформаторы записываются как функции JavaScript и сохраняются в отдельном файле. Чтобы использовать их, нужно добавить ссылку на файл трансформатора в файл pipeline.js. Transporter включает в себя движки Otto и Goja JavaScript. Поскольку Goja является более новым и, как правило, работает быстрее, мы будем использовать его. Единственным функциональным отличием является синтаксис.

Создайте файл transform.js, в который нужно записать функцию трансформатора:

nano transform.js

Вот функция, которую нужно использовать. Она создаст новое поле fullName, значением которого будут значения полей firstName и lastName, разделенные пробелом (например, Sandy Shark).

function transform(msg) {
msg.data.fullName = msg.data.firstName + " " + msg.data.lastName;
return msg
}

Рассмотрим строки файла подробнее:

  • Первая строка, function transform(msg) – это определение функции.
  • msg – это объект JavaScript, который содержит данные об исходном документе. С его помощью можно получить доступ к данным в конвейере.
  • Первая строка функции объединяет два поля и присваивает полученное значение полю fullName.
  • Последняя строка функции выдает новый объект msg для использования в остальной част конвейера.

Читайте также:

Теперь нужно настроить конвейер для поддержки этого трансформатора. Откройте pipeline.js:

nano pipeline.js

В последней строке нужно вставить вызов функции Transform(), чтобы добавить трансформатор в конвейер между вызовами Source() и Save(), вот так:

. . .
t.Source("source", source, "/.*/")
.Transform(goja({"filename": "transform.js"}))
.Save("sink", sink, "/.*/")

Аргумент, переданный Transform(), является типом преобразования (в этом случае Goja). В функции goja мы указываем имя файла трансформатора, используя его относительный путь.

Читайте также: Основы навигации и управления файлами в Linux

Сохраните и закройте файл. Прежде чем перезапустить конвейер, очистите существующие данные в Elasticsearch от предыдущего теста.

curl -XDELETE $ELASTICSEARCH_URI

Вы увидите этот вывод, подтверждающий, что команда выполнена успешно.

{"acknowledged":true}

Теперь запустите конвейер.

transporter run pipeline.js

Результат будет очень похож на предыдущий тест. В последних нескольких строках можно увидеть, успешно ли завершилась работа конвейера. Разумеется, вы можете снова проверить Elasticsearch, чтобы узнать, применился ли к данным указанный формат.

curl $ELASTICSEARCH_URI/_search?pretty=true

В новом выводе вы должны увидеть поле fullName:

{
"took" : 9,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e9c6687d9f638ced4fe",
"_score" : 1.0,
"_source" : {
"firstName" : "Gilly",
"fullName" : "Gilly Glowfish",
"lastName" : "Glowfish"
}
},
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e986687d9f638ced4fd",
"_score" : 1.0,
"_source" : {
"firstName" : "Sandy",
"fullName" : "Sandy Shark",
"lastName" : "Shark"
}
}
]
}
}

Обратите внимание, что поле fullName с установленными значениями было добавлено в обоих документах. Теперь вы знаете, как добавить пользовательские трансформаторы в конвейер Transporter.

Заключение

Вы создали базовый конвейер Transporter с трансформатором для копирования и изменения данных MongoDB в Elasticsearch. Вы можете применять более сложные трансформаторы таким же образом, соединять несколько трансформаторов в одном конвейере и многое другое. MongoDB и Elasticsearch – это лишь два адаптера Transporter. Он также поддерживает плоские файлы, базы данных SQL, такие как Postgres, и многие другие источники данных.

Вы можете обратиться к странице проекта Transporter на GitHub и посетить вики Transporter для получения более подробной информации о том, как использовать адаптеры, трансформаторы и другие функции Transporter.

Tags: , , ,