Хабрахабр

[Перевод] Картографирование шума с помощью KSQL, Raspberry Pi и радиоприемника

Трудно представить, что всё это имеет самое непосредственное отношение к Kafka, KSQL и эксперименту «как в домашних условиях с помощью информационных технологий найти самый шумный самолёт». На первый взгляд, в этой истории есть всё, чтобы заслужить статус романтичного поста накануне 8 марта: самолёты, любовь, чуточку шпионажа и, наконец, котик (точнее, кошка). Ее будят звуки самолетов, пролетающих над нашим домом. Трудно, но придётся: именно такой эксперимент провёл Саймон Обьюри, а мы перевели статью его авторства с описанием всех подробностей процесса.
Наша новая кошка по имени Снежок просыпается рано. Хорошо бы еще создать занятную панель слежения, на которую кошка могла бы переключить свое внимание — и дать мне ещё немножко поспать. А что если бы я, используя Apache Kafka, KSQL и Raspberry Pi, смог определить, какой именно самолет не дает моей кошке спать?

В общих чертах


Переносим самолеты с неба в графики с помощью Kafka и KSQL

Бортовой передатчик периодически сообщает локацию, идентификационный номер, высоту и скорость корабля, используя короткие радиопередачи. Самолеты определяют свое местоположение с помощью GPS приемников. Распознать эти хаотичные потоки данных — это все равно что подслушать беседу на шумной вечеринке. Эти передачи вещательного автоматического зависимого наблюдения (АЗН-В) являются по сути пакетами данных, открытыми для доступа с наземных станций.
Один микрокомпьютер, такой как Raspberry Pi, и несколько вспомогательных компонентов — это все, что требуется для получения сообщений бортовых передатчиков самолетов, снующих над моим домом.
Бортовые передачи самолетов выглядят как запутанный клубок сообщений и требуют систематизации. Поэтому, чтобы найти самолет, который тревожит мою кошку, я решил использовать сочетание Kafka и KSQL.


Разбуженная кошка и Raspberry Pi

Сбор показаний АЗН-В с помощью Raspberry Pi

Для сбора бортовых передач я использовал Raspberry Pi и RTL2832U — USB-модем, изначально продававшийся для просмотра цифрового ТВ на компьютере. На Raspberry Pi я установил dump1090 — программу, которая получает данные с АЗН-В через RTL2832U с помощью небольшой антенны.


Мой программный радиоприемник из Raspberry Pi и RTL2832U

Преобразуем сигналы АЗН-В в темы Kafka

Raspberry Pi не имеет достаточной мощности для серьезных вычислений, поэтому мне придется передать обработку данных моему локальному кластеру на Kafka. Теперь, когда я получил поток необработанных сигналов АЗН-В, нам следует обратить внимание на трафик.

Локация будет иметь выглядеть как борт 7c6db8 летит на высоте 6,250 футов в координате -33. Получаемые сообщения делятся либо на сообщения о локации, либо на сообщения об идентификации борта. 0. 8,151. Сообщение об идентификации борта будет выглядеть как борт 7c451c совершает полет по маршруту QJE1726.

Я использовал прокси-сервер Confluent Rest Proxy для распределения данных с Raspberry Pi в темы location-topic и ident-topic на Kafka. Небольшой скрипт на Python, работающий на Raspberry Pi, разделяет входящие сообщения АЗН-В. Прокси-сервер предоставляет RESTful интерфейс для кластера Kafka, позволяя легко создавать сообщения путем простого REST-вызова на Pi.

База данных OpenFlights позволяет сопоставить код авиаборта, например 7C6DB8, присвоенный Международной организацией гражданской авиации (ИКАО), с типом самолета — в нашем случае «Боинг-737». Я хотел понять, какие самолеты летали над моей крышей и по каким маршрутам. Я загрузил данные моего картографирования в тему icao-to-aircraft.

Например, чтобы найти бортовой код 7C6DB8, мы можем написать запрос следующим образом:
KSQL предоставляет “SQL-движок”, который дает возможность обработки данных в реальном времени по темам Apache Kafka.

CREATE TABLE icao_to_aircraft WITH (KAFKA_TOPIC='ICAO_TO_AIRCRAFT_REKEY', VALUE_FORMAT='AVRO', KEY='ICAO'); ksql> SELECT manufacturer, aircraft, registration \ FROM icao_to_aircraft \ WHERE icao = '7C6DB8'; Boeing | B738 | VH-VYI

е. Аналогично, в тему callsign-details я загрузил позывные (т. QFA563, это рейс авиакомпании Qantas из Брисбена в Сидней).

CREATE TABLE callsign_details WITH (KAFKA_TOPIC='CALLSIGN_DETAILS_REKEY', VALUE_FORMAT='AVRO', KEY='CALLSIGN'); ksql> SELECT operatorname, fromairport, toairport \ FROM callsign_details \ WHERE callsign = 'QFA563'; Qantas | Brisbane | Sydney

Мы можем наблюдать постоянный поток входящих сообщений о местоположении пролетающего самолета.
Теперь давайте взглянем поток данных location-topic.

kafka-avro-console-consumer --bootstrap-server localhost:9092 --property --topic location-topic

Запрос на KSQL будет выглядеть так:

ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yyyy-MM-dd HH:mm:ss'), \ ico, height, location \ FROM location_stream \ WHERE ico = '7C6DB8'; 2018-09-19 07:13:33 | 7C6DB8 | 6250.0 | -33.807724,151.091495

KSQL: гармонизация потоков...

Настоящая ценность KSQL лежит в возможности объединения входящих потоков данных о местоположении с исходными данными тем (см. 03_ksql.sql) — то есть, добавлении полезных сведений к необработанному потоку данных. Это очень похоже на left join в традиционной базе данных. Результатом является еще одна тема Kafka, произведенная без единой строчки кода Java!

CREATE STREAM location_and_details_stream AS \ SELECT l.ico, l.height, l.location, t.aircraft \ FROM location_stream l \ LEFT JOIN icao_to_aircraft t ON l.ico = t.icao;

Поток данных будет выглядеть так:
К тому же вы получаете запрос KSQL.

ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , manufacturer \ , aircraft \ , registration \ , height \ , location \ FROM location_and_details_stream; 18-09-27 09:53:28 | Boeing | B738 | VH-YIA | 7225 | -33.821,151.052 18-09-27 09:53:31 | Boeing | B738 | VH-YIA | 7375 | -33.819,151.049 18-09-27 09:53:32 | Boeing | B738 | VH-YIA | 7425 | -33.818,151.048

Помимо этого, мы можем объединить входящий поток callsign с фиксированной темой callsign_details:

CREATE STREAM ident_callsign_stream AS \ SELECT i.ico \ , c.operatorname \ , c.callsign \ , c.fromairport \ , c.toairport \ FROM ident_stream i \ LEFT JOIN callsign_details c ON i.indentification = c.callsign; ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , operatorname \ , callsign \ , fromairport \ , toairport \ FROM ident_callsign_stream ; 18-09-27 13:33:19 | Qantas | QFA926 | Sydney | Cairns 18-09-27 13:44:11 | China Eastern | CES777 | Kunming | Sydney 18-09-27 14:00:54 | Air New Zealand | ANZ110 | Sydney | Auckland

location_and_details_stream, которая обеспечивает поток обновленной информации о местоположении и скорости самолета;
2. Теперь у нас есть две информативные темы:
1. ident_callsign_stream, которая описывает подробности рейса, в том числе авиакомпанию и пункт назначения.

Я использовал Kafka Connect, чтобы выгрузить темы Kafka, заполняемые KSQL, в Elasticsearch (полные скрипты здесь). С этими постоянно обновляемыми темами мы можем создать несколько отличных обзорных панелей.

Обзорная панель Kibana

Вот пример обзорной панели, демонстрирующей местоположение самолета на карте. Кроме того, вы можете увидеть диаграмму по авиакомпаниям, график высоты полета и облака слов по основным пунктам назначения. Тепловая карта показывает районы сосредоточения самолетов, т.е. с наивысшим уровнем шума.

Назад, к кошке

Сегодня моя кошка разбудила меня в районе 6 часов утра. Может ли KSQL помочь мне найти тот самолет, который пролетал в это время над моим домом на высоте меньше 3,500 футов?

select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss') , manufacturer , aircraft , registration , height from location_and_details_stream where height < 3500 and rowtime > stringtotimestamp('18-09-27 06:10', 'yy-MM-dd HH:mm') and rowtime < stringtotimestamp('18-09-27 06:20', 'yy-MM-dd HH:mm'); 2018-09-27 06:15:39 | Airbus | A388 | A6-EOD | 2100.0 2018-09-27 06:15:58 | Airbus | A388 | A6-EOD | 3050.0

Я могу определить самолет, оказавшийся над моей крышей в 6:15 утра. Потрясающе! Которая, к тому же, позволяет быстро найти интересные события данных. Оказывается, Снежка разбудил Airbus А380 (огромный самолет, кстати), летевший в Дубай.
Всего пара выходных дней, и у вас есть система потоковой обработки с KSQL. Хотя Снежок может отнестись к ним скептически.

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть