Хабрахабр

[Перевод] Apache Ignite: распределенные вычисления в оперативной памяти

Привет, Хабр!

Мы продолжаем интересоваться новыми решениями от Apache. Рассчитываем выпустить в мае книгу «High Performance Spark» Холдена Карау (книга в верстке), а в августе — книгу «Kafka: The Definitive Guide» Нии Нархид (еще в переводе). Сегодня же хотим предложить краткую ознакомительную статью об Apache Ignite и оценить масштаб интереса к теме.

Приятного чтения!
Apache Ignite – относительно новое решение, однако, популярность его быстро растет. Сложно отнести его к какому-то конкретному подвиду движков баз данных, поскольку характеристики Ignite роднят его с несколькими инструментами. Основное назначение этого инструмента — хранение распределенных данных в оперативной памяти, а также хранение информации в формате «ключ-значение». Также в Ignite есть некоторые общие функции RDBMS, в частности, поддержка SQL-запросов и ACID-транзакций. Но это не означает, что данное решение – типичная база данных для работы с транзакциями на языке SQL. Здесь не поддерживаются ограничения внешнего ключа, а транзакции доступны лишь на уровне «ключ-значение». Тем не менее, Apache Ignite кажется очень интересным решением.

Apache Ignite легко запустить как узел, встроенный в приложение Spring Boot. Проще всего этого добиться при помощи библиотеки Spring Data Ignite. Apache Ignite реализует интерфейс Spring Data CrudRepository, поддерживающий основные операции CRUD, а также обеспечивающий доступ к гриду Apache Ignite SQL Grid с использованием унифицированных интерфейсов Spring Data. Хотя, в нем обеспечивается сохраняемость данных в дисковом хранилище с поддержкой SQL и парадигмы ACID, мы разработали решение для сохранения объектов кэша оперативной памяти в базу данных MySQL. Архитектура предлагаемого решения показана на рисунке ниже – как видите, она очень проста. Приложение помещает данные в кэш оперативной памяти, устроенный в Apache Ignite. Apache Ignite автоматически синхронизирует эти изменения с базой данных в ходе асинхронной фоновой задачи. Способ считывания данных в этом приложении также не должен вас удивить. Если сущность не кэширована, то она считывается из базы данных и помещается в кэш на будущее.

Здесь я подробно опишу, как разрабатывается приложение такого рода. Результат выложен на GitHub. Я нашел в Интернете еще несколько примеров, но в них затронуты только основы. Я покажу, как сконфигурировать Apache Ignite для записи объектов из кэша в базу данных, а также как создавать более сложные запросы на объединение с использованием нескольких кэшей. Начнем с запуска базы данных.

1. Настраиваем базу данных MySQL

Чтобы запустить базу данных MySQL локально, лучше всего, конечно же, воспользоваться контейнером Docker. База данных MySQL для Docker под Windows в настоящее время доступна по адресу 192.168.99.100:33306.

 docker run -d --name mysql -e MYSQL_DATABASE=ignite -e MYSQL_USER=ignite -e MYSQL_PASSWORD=ignite123 -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -p 33306:3306 mysql

Далее создаем таблицы, используемые сущностями приложения для хранения данных: PERSON, CONTACT. Они относятся к таблицам как 1…N, где в таблице CONTACT содержится внешний ключ, указывающий на PERSON id.

 CREATE TABLE `person` ( `id` int(11) NOT NULL, `first_name` varchar(45) DEFAULT NULL, `last_name` varchar(45) DEFAULT NULL, `gender` varchar(10) DEFAULT NULL, `country` varchar(10) DEFAULT NULL, `city` varchar(20) DEFAULT NULL, `address` varchar(45) DEFAULT NULL, `birth_date` date DEFAULT NULL, PRIMARY KEY (`id`)
); CREATE TABLE `contact` ( `id` int(11) NOT NULL, `location` varchar(45) DEFAULT NULL, `contact_type` varchar(10) DEFAULT NULL, `person_id` int(11) NOT NULL, PRIMARY KEY (`id`)
); ALTER TABLE `ignite`.`contact` ADD INDEX `person_fk_idx` (`person_id` ASC);
ALTER TABLE `ignite`.`contact`
ADD CONSTRAINT `person_fk` FOREIGN KEY (`person_id`) REFERENCES `ignite`.`person` (`id`) ON DELETE CASCADE ON UPDATE CASCADE;

2. Конфигурируем Maven

Чтобы приступить к работе с репозиторием Spring Data для Apache Ignite, проще всего добавить следующую зависимость Maven в файл pom.xml нашего приложения. Все остальные зависимости Ignite будут включены автоматически. Нам также понадобится драйвер MySQL JDBC driver и зависимости Spring JDBC, чтобы сконфигурировать соединение с базой данных. Они необходимы, так как мы встраиваем Apache Ignite в приложение, и требуется подключиться к базе данных MySQL, чтобы можно было синхронизировать кэш с таблицами базы данных.

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope>
</dependency>
<dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-spring-data</artifactId> <version>${ignite.version}</version>
</dependency>

3. Конфигурируем узел Ignite

Класс IgniteConfiguration позволяет сконфигурировать все доступные настройки узла Ignite. В данном случае наиболее важна конфигурация кэша (1). Следует добавить главный ключ и классы сущностей как индексированные типы (2). Далее нужно предусмотреть экспорт обновлений кэша в базу данных (3) и считывать из базы данных ту информацию, которой не окажется в кэше (4). Взаимодействие между узлом Ignite и MySQL можно сконфигурировать при помощи класса CacheJdbcPojoStoreFactory (5). Там нужно передать DataSource @Bean (6), диалект (7) и соответствие между полями объекта и столбцами таблицы (8).

@Bean
public Ignite igniteInstance() { IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setIgniteInstanceName("ignite-1"); cfg.setPeerClassLoadingEnabled(true); CacheConfiguration<Long, Contact> ccfg2 = new CacheConfiguration<>("ContactCache"); // (1) ccfg2.setIndexedTypes(Long.class, Contact.class); // (2) ccfg2.setWriteBehindEnabled(true); ccfg2.setWriteThrough(true); // (3) ccfg2.setReadThrough(true); // (4) CacheJdbcPojoStoreFactory<Long, Contact> f2 = new CacheJdbcPojoStoreFactory<>(); // (5) f2.setDataSource(datasource); // (6) f2.setDialect(new MySQLDialect()); // (7) JdbcType jdbcContactType = new JdbcType(); // (8) jdbcContactType.setCacheName("ContactCache"); jdbcContactType.setKeyType(Long.class); jdbcContactType.setValueType(Contact.class); jdbcContactType.setDatabaseTable("contact"); jdbcContactType.setDatabaseSchema("ignite"); jdbcContactType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id")); jdbcContactType.setValueFields(new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type"), new JdbcTypeField(Types.VARCHAR, "location", String.class, "location"), new JdbcTypeField(Types.INTEGER, "person_id", Long.class, "personId")); f2.setTypes(jdbcContactType); ccfg2.setCacheStoreFactory(f2); CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<>("PersonCache"); ccfg.setIndexedTypes(Long.class, Person.class); ccfg.setWriteBehindEnabled(true); ccfg.setReadThrough(true); ccfg.setWriteThrough(true); CacheJdbcPojoStoreFactory<Long, Person> f = new CacheJdbcPojoStoreFactory<>(); f.setDataSource(datasource); f.setDialect(new MySQLDialect()); JdbcType jdbcType = new JdbcType(); jdbcType.setCacheName("PersonCache"); jdbcType.setKeyType(Long.class); jdbcType.setValueType(Person.class); jdbcType.setDatabaseTable("person"); jdbcType.setDatabaseSchema("ignite"); jdbcType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id")); jdbcType.setValueFields(new JdbcTypeField(Types.VARCHAR, "first_name", String.class, "firstName"), new JdbcTypeField(Types.VARCHAR, "last_name", String.class, "lastName"), new JdbcTypeField(Types.VARCHAR, "gender", Gender.class, "gender"), new JdbcTypeField(Types.VARCHAR, "country", String.class, "country"), new JdbcTypeField(Types.VARCHAR, "city", String.class, "city"), new JdbcTypeField(Types.VARCHAR, "address", String.class, "address"), new JdbcTypeField(Types.DATE, "birth_date", Date.class, "birthDate")); f.setTypes(jdbcType); ccfg.setCacheStoreFactory(f); cfg.setCacheConfiguration(ccfg, ccfg2); return Ignition.start(cfg);
}

Вот конфигурация источника данных Spring для MySQL в виде контейнера Docker.

spring:
datasource:
name: mysqlds
url: jdbc:mysql://192.168.99.100:33306/ignite?useSSL=false
username: ignite
password: ignite123

Здесь необходимо отметить, что Apache Ignite не лишен некоторых недостатков. Например, он отображает Enum на целое число и берет его порядковое значение, хотя, конфигурирует VARCHAR как тип JDCB. Когда такой ряд считывается из базы данных, он неверно отображается на Enum в объекте – в этом поле отклика у вас получится null.

 new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type")

4. Объекты модели

Как упоминалось выше, в схеме нашей базы данных две таблицы. Еще есть два класса модели и две конфигурации кэша, по одной на каждый класс модели. Ниже приведена реализация класса модели. Одна из самых интересных вещей, которую здесь стоит отметить – генерация ID при помощи класса AtomicLong. Это один из базовых компонентов Ignite, который служит генератором последовательностей. Также видим специфичную аннотацию @QuerySqlField; если она сопровождает поле – это означает, что данное поле может использоваться в SQL как параметр запроса.

 @QueryGroupIndex.List( @QueryGroupIndex(name="idx1")
)
public class Person implements Serializable { private static final long serialVersionUID = -1271194616130404625L; private static final AtomicLong ID_GEN = new AtomicLong(); @QuerySqlField(index = true) private Long id; @QuerySqlField(index = true) @QuerySqlField.Group(name = "idx1", order = 0) private String firstName; @QuerySqlField(index = true) @QuerySqlField.Group(name = "idx1", order = 1) private String lastName; private Gender gender; private Date birthDate; private String country; private String city; private String address; private List<Contact> contacts = new ArrayList<>(); public void init() { this.id = ID_GEN.incrementAndGet(); } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } public Gender getGender() { return gender; } public void setGender(Gender gender) { this.gender = gender; } public Date getBirthDate() { return birthDate; } public void setBirthDate(Date birthDate) { this.birthDate = birthDate; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public List<Contact> getContacts() { return contacts; } public void setContacts(List<Contact> contacts) { this.contacts = contacts; } }

5. Репозитории Ignite

Полагаю, вам известно, как в Spring Data JPA создаются репозитории. Обработка репозиториев должна быть обеспечена в классе main или @Configuration.

@SpringBootApplication
@EnableIgniteRepositories
public class IgniteRestApplication { @Autowired DataSource datasource; public static void main(String[] args) { SpringApplication.run(IgniteRestApplication.class, args); } // ...
}

Затем расширяем наш интерфейс @Repository базовым интерфейсом CrudRepository. Он поддерживает только унаследованные методы с параметром id. В приведенном ниже фрагменте PersonRepository я определил несколько поисковых методов, воспользовавшись соглашениями об именовании, принятыми в v Spring Data, и запросами Ignite. Эти примеры демонстрируют, что можно вернуть в результатах запроса либо полный объект, либо избранные поля из него — в зависимости от того, что нам требуется.

@RepositoryConfig(cacheName = "PersonCache")
public interface PersonRepository extends IgniteRepository<Person, Long> { List<Person> findByFirstNameAndLastName(String firstName, String lastName); @Query("SELECT c.* FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?") List<Contact> selectContacts(String firstName, String lastName); @Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?") List<List<?>> selectContacts2(String firstName, String lastName);
}

6. API и тестирование

Вот теперь можно внедрить компоненты репозитория в классы контроллеров REST. API предоставит методы для добавления новых объектов в кэш, обновления или удаления имеющихся объектов, а также для поиска по первичному ключу или по другим, более сложным индексам.

@RestController
@RequestMapping("/person")
public class PersonController { private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class); @Autowired PersonRepository repository; @PostMapping public Person add(@RequestBody Person person) { person.init(); return repository.save(person.getId(), person); } @PutMapping public Person update(@RequestBody Person person) { return repository.save(person.getId(), person); } @DeleteMapping("/{id}") public void delete(Long id) { repository.delete(id); } @GetMapping("/{id}") public Person findById(@PathVariable("id") Long id) { return repository.findOne(id); } @GetMapping("/{firstName}/{lastName}") public List<Person> findByName(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) { return repository.findByFirstNameAndLastName(firstName, lastName); } @GetMapping("/contacts/{firstName}/{lastName}") public List<Person> findByNameWithContacts(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) { List<Person> persons = repository.findByFirstNameAndLastName(firstName, lastName); List<Contact> contacts = repository.selectContacts(firstName, lastName); persons.stream().forEach(it -> it.setContacts(contacts.stream().filter(c -> c.getPersonId().equals(it.getId())).collect(Collectors.toList()))); LOGGER.info("PersonController.findByIdWithContacts: {}", contacts); return persons; } @GetMapping("/contacts2/{firstName}/{lastName}") public List<Person> findByNameWithContacts2(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) { List<List<?>> result = repository.selectContacts2(firstName, lastName); List<Person> persons = new ArrayList<>(); for (List<?> l : result) { persons.add(mapPerson(l)); } LOGGER.info("PersonController.findByIdWithContacts: {}", result); return persons; } private Person mapPerson(List<?> l) { Person p = new Person(); Contact c = new Contact(); p.setId((Long) l.get(0)); p.setFirstName((String) l.get(1)); p.setLastName((String) l.get(2)); c.setId((Long) l.get(3)); c.setType((ContactType) l.get(4)); c.setLocation((String) l.get(4)); p.addContact(c); return p; } }

Конечно же, важно проверить производительность созданного решения, особенно когда оно связано с хранением распределенных данных в оперативной памяти и с базами данных. Для этого я написал несколько junit-тестов, помещающих в кэш большое количество объектов, а затем вызывающих методы поиска (для ввода используются случайные данные) – так проверяется производительность запросов. Вот метод, генерирующий много объектов Person и Contact и помещающий их в кэш при помощи API конечных точек.

@Test
public void testAddPerson() throws InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); for (int j = 0; j < 10; j++) { es.execute(() -> { TestRestTemplate restTemplateLocal = new TestRestTemplate(); Random r = new Random(); for (int i = 0; i < 1000000; i++) { Person p = restTemplateLocal.postForObject("http://localhost:8090/person", createTestPerson(), Person.class); int x = r.nextInt(6); for (int k = 0; k < x; k++) { restTemplateLocal.postForObject("http://localhost:8090/contact", createTestContact(p.getId()), Contact.class); } } }); } es.shutdown(); es.awaitTermination(60, TimeUnit.MINUTES);
}

В Spring Boot предоставляются методы для взятия основных характеристик, позволяющих судить о скорости отклика API. Чтобы активировать такую возможность, нужно включить в зависимости Spring Actuator. Конечная точка Metrics доступна по адресу localhost:8090/metrics. Она не только показывает, сколько времени затрачивает на работу каждый метод API, но и выводит статистику по таким показателям, как количество действующих потоков или свободная память.

7. Запуск приложения

Теперь запустим получившееся у нас приложение, в которое встроен узел Apache Ignite. Я учел советы по повышению производительности, которые содержатся в документации Ignite и определил конфигурацию JVM, показанную ниже.

 java -jar -Xms512m -Xmx1024m -XX:MaxDirectMemorySize=256m -XX:+DisableExplicitGC -XX:+UseG1GC target/ignite-rest-service-1.0-SNAPSHOT.jar

Теперь можно запустить тестовый класс JUnit IgniteRestControllerTest. Он помещает в кэш некоторое количество данных, а затем вызывает методы поиска. Даны параметры для тестов, где в кэше использованы 1M объектов Person и 2.5M объектов Contact. Каждый из методов поиска выполняется в среднем за 1 мс.

 { "mem": 624886, "mem.free": 389701, "processors": 4, "instance.uptime": 2446038, "uptime": 2466661, "systemload.average": -1, "heap.committed": 524288, "heap.init": 524288, "heap.used": 133756, "heap": 1048576, "threads.peak": 107, "threads.daemon": 25, "threads.totalStarted": 565, "threads": 80,
... "gauge.response.person.contacts.firstName.lastName": 1, "gauge.response.contact": 1, "gauge.response.person.firstName.lastName": 1, "gauge.response.contact.location.location": 1, "gauge.response.person.id": 1, "gauge.response.person": 0, "counter.status.200.person.id": 1000, "counter.status.200.person.contacts.firstName.lastName": 1000, "counter.status.200.person.firstName.lastName": 1000, "counter.status.200.contact": 2500806, "counter.status.200.person": 1000000, "counter.status.200.contact.location.location": 1000
}
Теги
Показать больше

Похожие статьи

Кнопка «Наверх»
Закрыть