Синхронизация преобразованных данных между MongoDB и Elasticsearch с помощью Transporter в Ubuntu 16.04
Java, Ubuntu | Комментировать запись
Transporter – это инструмент с открытым исходным кодом для перемещения данных между разными хранилищами данных. Разработчики часто пишут одноразовые скрипты для выполнения таких задач, как перемещение данных между несколькими базами данных, перемещение данных из файлов в БД или наоборот. Но использование такого инструмента, как Transporter, имеет ряд преимуществ.
Transporter позволяет создавать конвейеры, которые определяют поток из источника (source, откуда читаются данные) в приемник данных (sink, куда записываются данные). Источниками и приемниками могут быть базы данных SQL или NoSQL, плоские файлы и другие ресурсы. Для связи с этими ресурсами Transporter использует адаптеры – подключаемые расширения; проект по умолчанию включает несколько адаптеров для популярных баз данных.
Кроме перемещения данных Transporter также позволяет изменять данные при прохождении через конвейер с помощью трансформаторов. Transporter по умолчанию предоставляет несколько трансформаторов. Также вы можете написать свои собственные трансформаторы, чтобы определить пользовательские настройки данных.
В этом мануале мы рассмотрим пример перемещения и обработки данных из БД MongoDB в Elasticsearch с помощью встроенных адаптеров Transporter и пользовательского трансформатора, написанного на JavaScript.
Требования
- Сервер Ubuntu 16.04, настроенный по этому мануалу.
- База данных MongoDB, установленная по мануалу Установка MongoDB в Ubuntu 16.04.
- Elasticsearch (инструкции вы найдете в мануале Установка и настройка Elasticsearch в Ubuntu 16.04).
Конвейеры Transporter пишутся в JavaScript. Вам не обязательно нужно иметь навыки работы с JavaScript, но лишними они не будут. Больше о JavaScript можно прочитать в этом разделе Информатория.
1: Установка Transporter
Transporter предоставляет двоичные файлы для большинства популярных операционных систем. Процесс установки для Ubuntu состоит из двух этапов: загрузки двоичного файла Linux и создания его исполняемого файла.
Сначала получите ссылку на последнюю версию Transporter на GitHub. Скопируйте ссылку, которая заканчивается на -linux-amd64. В этом руководстве используется v0.5.2 (последняя на момент написания версия).
Загрузите бинарный файл в домашний каталог:
cd
wget https://github.com/compose/transporter/releases/download/v0.5.2/transporter-0.5.2-linux-amd64
Читайте также: Технические и практические рекомендации к руководствам 8host
Переместите файл в /usr/local/bin/:
mv transporter-*-linux-amd64 /usr/local/bin/transporter
- Сделайте его исполняемым:
chmod +x /usr/local/bin/transporter
- Чтобы убедиться, что установка Transporter прошла успешно, запустите бинарный файл:
- transporter
В выводе вы увидите справочную информацию и номер версии:
USAGE
transporter <command> [flags]
COMMANDS
run run pipeline loaded from a file
. . .
VERSION
0.5.2
Чтобы использовать Transporter для перемещения данных из MongoDB в Elasticsearch, нужны две вещи: данные MongoDB, которые нужно переместить, и конвейер, который сообщит Transporter, как их перемещать. Создайте тестовые данные. Если у вас уже есть база данных MongoDB, которую вы хотите переместить, вы можете пропустить следующий раздел и перейти прямо к разделу 3.
2: Создание тестовых данных MongoDB
Теперь нужно создать тестовую базу данных MongoDB с одной коллекцией и добавить в нее несколько документов. Затем вы попробуете перенести и преобразовать эти тестовые данные через конвейер Transporter.
Сначала подключитесь к базе данных MongoDB.
mongo
Командная строка изменится:
mongo>
Выберите БД, с которой нужно работать. Здесь это БД my_application:
use my_application
В MongoDB не нужно явно создавать базу данных или коллекцию. После того, как вы начнете добавлять данные в БД, имя которой вы указали в предыдущей команде, эта база данных будет автоматически создана.
Итак, чтобы создать базу данных my_application, сохраните два документа в своей коллекции users: один из них представляет Sandy Shark, а другой – Gilly Glowfish. Это будут ваши тестовые данные.
db.users.save({"firstName": "Sandy", "lastName": "Shark"});
db.users.save({"firstName": "Gilly", "lastName": "Glowfish"});
Добавив документы, вы можете отправить запрос в коллекцию users:
db.users.find().pretty();
Вы увидите примерно такой вывод (столбцы _id будут отличаться). MongoDB автоматически добавляет уникальные ID объектов для идентификации документов в коллекции.
{
"_id" : ObjectId("59299ac7f80b31254a916456"),
"firstName" : "Sandy",
"lastName" : "Shark"
}
{
"_id" : ObjectId("59299ac7f80b31254a916457"),
"firstName" : "Gilly",
"lastName" : "Glowfish"
}
Нажмите CTRL+C, чтобы закрыть оболочку MongoDB.
Теперь нужно создать конвейер Transporter, чтобы переместить данные из MongoDB в Elasticsearch.
3: Создание базового конвейера
Конвейер в Transporter определяется по умолчанию файлом JavaScript по имени pipeline.js. Встроенная команда init создает базовый файл конфигурации в правильном каталоге, учитывая источник и приемник.
Инициализируйте базовый файл pipeline.js, где MongoDB выступает как источник, а Elasticsearch как приемник.
transporter init mongodb elasticsearch
Вы увидите:
Writing pipeline.js...
Редактировать pipeline.js на данном этапе не нужно, но давайте посмотрим, как он работает.
Файл выглядит так. Вы можете просмотреть содержимое файла, используя команду cat pipeline.js и less pipeline.js (остановить less можно с помощью q) или с помощью текстового редактора.
var source = mongodb({
"uri": "${MONGODB_URI}"
// "timeout": "30s",
// "tail": false,
// "ssl": false,
// "cacerts": ["/path/to/cert.pem"],
// "wc": 1,
// "fsync": false,
// "bulk": false,
// "collection_filters": "{}",
// "read_preference": "Primary"
})
var sink = elasticsearch({
"uri": "${ELASTICSEARCH_URI}"
// "timeout": "10s", // defaults to 30s
// "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service
// "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service
// "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch
})
t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")
Строки, которые начинаются с var source и var sink, определяют переменные JavaScript для адаптеров MongoDB и Elasticsearch соответственно. Определите переменные среды ODB_URI и ELASTICSEARCH_URI, которые адаптеры смогут использовать позже.
Строки, которые начинаются с //, закомментированы. Они выделяют некоторые общие параметры конфигурации, которые вы можете установить для своего конвейера, но в данном случае они не используются для основного конвейера.
Читайте также: Переменные, области и поднятие переменных в JavaScript
Последняя строка соединяет источник и приемник. Переменная transporter или t позволяет получить доступ к конвейеру. Функции .Source() и .Save() позволяют добавить источник и приемник, используя переменные source и sink, определенные ранее в файле.
Читайте также: Определение функций в JavaScript
Третьим аргументом функций Source() и Save() является namespace. Передавая /.*/ в качестве последнего аргумента, вы указываете, что нужно перенести все данные из MongoDB и сохранить их в том же пространстве имен в Elasticsearch.
Прежде чем запустить этот конвейер, нужно установить переменные среды для MongoDB URI и Elasticsearch URI. В этом примере обе переменные хранятся локально с настройками по умолчанию (обязательно отредактируйте эти параметры, если вы используете существующие экземпляры MongoDB или Elasticsearch).
Читайте также: Чтение и установка переменных среды и оболочки на сервере Linux
export MONGODB_URI='mongodb://localhost/my_application'
export ELASTICSEARCH_URI='http://localhost:9200/my_application'
Теперь можно запустить конвейер:
transporter run pipeline.js
Вы увидите вывод, который заканчивается так:
. . .
INFO[0001] metrics source records: 2 path=source ts=1522942118483391242
INFO[0001] metrics source/sink records: 2 path="source/sink" ts=1522942118483395960
INFO[0001] exit map[source:mongodb sink:elasticsearch] ts=1522942118483396878
Во второй и третьей строках этот вывод показывает, что в источнике было 2 записи, и что 2 записи были перенесены в приемник.
Чтобы подтвердить, что обе записи были обработаны, вы можете запросить содержимое базы данных my_application в Elasticsearch.
curl $ELASTICSEARCH_URI/_search?pretty=true
Параметр ?pretty=true делает вывод удобочитаемым:
{
"took" : 5,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e9c6687d9f638ced4fe",
"_score" : 1.0,
"_source" : {
"firstName" : "Gilly",
"lastName" : "Glowfish"
}
},
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e986687d9f638ced4fd",
"_score" : 1.0,
"_source" : {
"firstName" : "Sandy",
"lastName" : "Shark"
}
}
]
}
}
Базы данных и коллекции в MongoDB – это аналоги индексов и типов в Elasticsearch. Имея это в виду, вы должны увидеть:
- Поле _index имеет значение my_application (имя исходной базы данных MongoDB).
- Поле _type содержит users, имя коллекции MongoDB.
- Поля firstName и lastName заполняются значениями «Sandy» «Shark» и «Gilly» «Glowfish» соответственно.
Это подтверждает, что обе записи MongoDB были успешно обработаны в Transporter и загружены в Elasticsearch. Теперь добавьте промежуточный этап обработки, который может преобразовывать входные данные.
4: Создание трансформатора
Как следует из названия, трансформаторы изменяют исходные данные перед загрузкой в приемник. Например, они позволяют добавлять новые поля, удалять или изменять существующие поля. Transporter поставляется с предопределенными трансформаторами по умолчанию. Также Transporter поддерживает пользовательские трансформаторы.
Как правило, пользовательские трансформаторы записываются как функции JavaScript и сохраняются в отдельном файле. Чтобы использовать их, нужно добавить ссылку на файл трансформатора в файл pipeline.js. Transporter включает в себя движки Otto и Goja JavaScript. Поскольку Goja является более новым и, как правило, работает быстрее, мы будем использовать его. Единственным функциональным отличием является синтаксис.
Создайте файл transform.js, в который нужно записать функцию трансформатора:
nano transform.js
Вот функция, которую нужно использовать. Она создаст новое поле fullName, значением которого будут значения полей firstName и lastName, разделенные пробелом (например, Sandy Shark).
function transform(msg) {
msg.data.fullName = msg.data.firstName + " " + msg.data.lastName;
return msg
}
Рассмотрим строки файла подробнее:
- Первая строка, function transform(msg) – это определение функции.
- msg – это объект JavaScript, который содержит данные об исходном документе. С его помощью можно получить доступ к данным в конвейере.
- Первая строка функции объединяет два поля и присваивает полученное значение полю fullName.
- Последняя строка функции выдает новый объект msg для использования в остальной част конвейера.
Читайте также:
- Определение функций в JavaScript
- Типы данных в JavaScript
- Объекты в JavaScript
- Работа со строками в JavaScript
Теперь нужно настроить конвейер для поддержки этого трансформатора. Откройте pipeline.js:
nano pipeline.js
В последней строке нужно вставить вызов функции Transform(), чтобы добавить трансформатор в конвейер между вызовами Source() и Save(), вот так:
. . .
t.Source("source", source, "/.*/")
.Transform(goja({"filename": "transform.js"}))
.Save("sink", sink, "/.*/")
Аргумент, переданный Transform(), является типом преобразования (в этом случае Goja). В функции goja мы указываем имя файла трансформатора, используя его относительный путь.
Читайте также: Основы навигации и управления файлами в Linux
Сохраните и закройте файл. Прежде чем перезапустить конвейер, очистите существующие данные в Elasticsearch от предыдущего теста.
curl -XDELETE $ELASTICSEARCH_URI
Вы увидите этот вывод, подтверждающий, что команда выполнена успешно.
{"acknowledged":true}
Теперь запустите конвейер.
transporter run pipeline.js
Результат будет очень похож на предыдущий тест. В последних нескольких строках можно увидеть, успешно ли завершилась работа конвейера. Разумеется, вы можете снова проверить Elasticsearch, чтобы узнать, применился ли к данным указанный формат.
curl $ELASTICSEARCH_URI/_search?pretty=true
В новом выводе вы должны увидеть поле fullName:
{
"took" : 9,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e9c6687d9f638ced4fe",
"_score" : 1.0,
"_source" : {
"firstName" : "Gilly",
"fullName" : "Gilly Glowfish",
"lastName" : "Glowfish"
}
},
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e986687d9f638ced4fd",
"_score" : 1.0,
"_source" : {
"firstName" : "Sandy",
"fullName" : "Sandy Shark",
"lastName" : "Shark"
}
}
]
}
}
Обратите внимание, что поле fullName с установленными значениями было добавлено в обоих документах. Теперь вы знаете, как добавить пользовательские трансформаторы в конвейер Transporter.
Заключение
Вы создали базовый конвейер Transporter с трансформатором для копирования и изменения данных MongoDB в Elasticsearch. Вы можете применять более сложные трансформаторы таким же образом, соединять несколько трансформаторов в одном конвейере и многое другое. MongoDB и Elasticsearch – это лишь два адаптера Transporter. Он также поддерживает плоские файлы, базы данных SQL, такие как Postgres, и многие другие источники данных.
Вы можете обратиться к странице проекта Transporter на GitHub и посетить вики Transporter для получения более подробной информации о том, как использовать адаптеры, трансформаторы и другие функции Transporter.
Tags: ElasticSearch, MongoDB, Transporter, Ubuntu 16.04