Хабрахабр

Применение R для утилитарных задач

library(readr)
library(tidyverse)
library(magrittr)
library(stringi)
library(fs)
library(glue)
library(RClickhouse)
library(DBI)
library(anytime)
library(tictoc)
library(iterators)
library(foreach)
library(doParallel)
library(futile.logger)
library(re2r)
library(data.table)
library(future)
library(doFuture) common_logname <- "DEV_log_parser.log"
table_name <- "DEV_LOGS" flog.appender(appender.file(common_logname))
flog.threshold(INFO)
flog.info("Start batch processing") oneTimeProcessing <- function(f_iter, log_type = c("app", "system")) -\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}", parallel = F)] %>% .[, log_line_number := cumsum(log_line_start)] %>% .[, body := stri_c(value, collapse = "\n"), by = log_line_number] %>% .[, `:=`(value = NULL, log_line_start = NULL, log_line_number = NULL)] %>% tibble::as_tibble() %>% # даже body = character(0) будет разложен на колонки с 0 строк # миллисекунды мы сразу засунем в POSIXct tidyr::extract(col = "body", into = c("timestamp", "tz", "level", "module", "class", "message"), # tz может быть (системные логи DEV), а может и не быть (приложения DEV) regex = "^(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}:\\d+([+-]\\d+)?) (.*?) <(.*?)> \\[(.*?)\\] (?s:(.*))$", case_insensitive = TRUE, ignore.case = TRUE) %>% # дату надо подогнать в ISO стандарт и переведем временную зону сразу на Москву (ведут ведь по ней?) # для ISO 8601 (https://en.wikipedia.org/wiki/ISO_8601) mutate_at("timestamp", re2r::re2_replace, # tz может быть (системные логи DEV), а может и не быть (приложения DEV) pattern = "(.*) (\\d{2}:\\d{2}:\\d{2}):(\\d+([+-]\\d+)?)", replacement = "\\1T\\2.\\3") %>% mutate_at("timestamp", lubridate::as_datetime, tz = "Europe/Moscow") %>% # добавим предметное описание mutate(location = f_iter$location, wk = f_iter$wk) # TRUNCATE в CH организован не так давно, поэтому в общем случае приходится удалять и создавать таблицу вручную # осуществляем запись в CH, ms можно получить как (timestamp %% 1) conn <- DBI::dbConnect(RClickhouse::clickhouse(), host = "10.0.0.1", db = "DEV_LOGS") # m <- DBI::dbExecute(conn, glue("ALTER TABLE {table_name}")) write_res <- log_df %>% mutate(ms = (as.numeric(timestamp) %% 1) * 1000) %>% select(location, wk, timestamp, ms, level, module, class, message) %>% # база к которой идет подключение должна быть определена в самом коннекте DBI::dbWriteTable(conn, cfg[[log_type]][["db_table"]], ., append = TRUE) DBI::dbDisconnect(conn) # сформируем на возврат статистику по файлу res <- tibble::tibble(id = f_iter$id, lines = nrow(log_df), min_t = min(log_df$timestamp), max_t = max(log_df$timestamp), write_res) rm(data, log_df) return(res)
} # Сам цикл загрузки
tic("Batch processing") # инициализируем параллельную обработку
gc(full = TRUE)
nworkers <- parallel::detectCores() - 1 registerDoFuture()
# future::plan(multiprocess)
# future::plan(multisession)
future::plan(multisession, workers = nworkers)
# future::plan(sequential) # так ~ секунд # осуществляем парсинг и загрузку в CH
# логи приложений ------------------
fnames_tbl <- here::here("raw_data") %>% fs::dir_ls(recurse = TRUE, glob = "*dev_app*.gz") %>% enframe(name = "fname") %>% # сразу выдернем короткое имя для представления в логах mutate(short_fname = as.character(fs::path_rel(fname, start = "./raw_data"))) %>% select(-value) %>% mutate(size = fs::file_size(fname)) %>% tidyr::extract(col = "short_fname", into = c("location", "wk"), regex = "^([^/]+)/wk(\\d+)", remove = FALSE) %>% arrange(size) %>% mutate(id = paste(format(row_number(), justify = "r", width = 4), "/", n())) %>% # поделим на ~ N обработчиков mutate(chunk = (row_number() %% nworkers + 1)) %>% # теперь сортируем по чанкам, чтобы dopar правильно все разделил arrange(chunk) start_time <- Sys.time()
stat_list <- foreach(it = iter(fnames_tbl, by = "row"), .export = c("start_time"), .verbose = TRUE, .inorder = FALSE, .errorhandling = "remove") %dopar% { # инициализируем логгер flog.appender(appender.file(common_logname)) # flog.info(capture.output(gc(verbose = TRUE))) res <- oneTimeProcessing(it, log_type = "app") flog.info(glue("Step {it$id} finished.", "Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->", .sep = " ")) return(res) }
flog.info("Load finished") # терминируем параллельную обработку --------------
# закрываем все дочерние сессии, они едят память
future::plan(sequential)
gc(reset = TRUE, full = TRUE)
flog.info(capture.output(toc())) # смотрим статистику по файлам -------------
logstat_tbl <- stat_list %>% dplyr::bind_rows() %>% # подклеиваем исходные атрибуты left_join(fnames_tbl, by = "id") %>% # дельта по времени между записями в логе в минутах mutate(delta_t = as.numeric(difftime(max_t, min_t, units = "mins"))) %>% arrange(min_t) write_delim(logstat_tbl, here::here("output", "DEV_parse_stat.csv.gz"), delim = ";") # проверим, а все ли результаты успешны?
if(nrow(logstat_tbl) < nrow(fnames_tbl)){ flog.error("!!!!!!! Not all workers were executed successfully !!!!!!!!!")
}

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть