пятница, 22 июня 2012 г.

Как использовать hive в веб аналитике. Сокращение больших данных


При работе с логами высоконагруженного сервиса приходится решать следующие проблемы
  • новые логи появляются каждый день и нужно автоматизировать их добавление и обработку
  • многие статистики считаются за день, неделю, месяц. Хочется простым способом считать их, используя при этом только логи за выбранные даты, а не за все время
  • если логов очень много, то хочется уметь считать статистики приближенно, используя только часть данных
Первые две проблемы решаются созданием разделов в таблице, последняя — семплированием


Это 4я статья про использование hive в веб аналитике. В предыдущих статьях я рассказал:

 Использование разделов

Одна из фишек hiveQL, позволяющая использовать его "промышленно" - возможность создавать и работать с таблицами, разделенными по разделам.
В таблицах с таким разделением при создании указывается, что есть специальное поле и записи с разными значениями этого поля хранятся в разных разделах.
Физически это реализуется так: в папке, в которой хранятся данные из таблицы, создаются подпапки, каждая из которых соответствует своему разделу (партиции). Cами данные разложены по этим подпапкам.
Использование разделов позволяет:
  • считать статистики только по указанным разделам
  • считать статистики и по полным данным из таблицы
  • независимо друг от друга добавлять, удалять и перезаписывать разделы
При анализе апачовых логов разумно раскладывать логи по дням, чтобы можно было работать с разными днями независимо. Таблицу с разделами по дням можно создать командой
CREATE EXTERNAL TABLE partitioned_apache_logs ( 
    -- таблицу делаем внешней, чтобы ненароком не удалить ценные данные
    ip STRING,
    human_time STRING,
    url STRING,
    response STRING,
    referrer STRING,
    user_agent STRING
)
PARTITIONED BY (day STRING)
    -- вот! этой командой мы добавляем разделение по дням
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" =
"([\\d\\.]+) - - \\[(.*?)\\] \"GET (.+?) HTTP.*?\" (\\d+?) \\d+? (\\S+?) \"(.+?)\"")
LOCATION '/home/mezentsev/src/hive_web_analytics/logs/';
Если мы попробуем прочитать содержимое таблицы
SELECT * FROM partitioned_apache_logs LIMIT 10;
то на выходе мы не должны получить ровным счетом ничего. Пичалька :(. Ведь в таблицу не было добавлено ни одного раздела.
Добавить раздел в таблицу можно командой
ALTER TABLE partitioned_apache_logs 
ADD PARTITION (day='2012-05-01') 
LOCATION '/home/.../logs/2012-05-01';
После этого тот же самый запрос выдаст 10 строк лога за 1е мая.
Автоматизировать добавление партиций можно простым bash скриптом
hive -e "ALTER TABLE partitioned_apache_logs 
         ADD PARTITION (day='$DAY') 
         LOCATION '/home/.../$DAY';"
подставляя вместо $DAY новые даты.

Как считать статистику по разделам

Созданная таблица с разделами позволяет посчитать статистику за любой из добавленных в таблицу дней.
Вот, например, как можно посчитать число пользователей, посетивших наш сервис 1го мая:
SELECT COUNT (DISTINCT CONCAT (ip, user_agent)), 
COUNT (1) FROM partitioned_apache_logs 
WHERE day ='2012-05-01' 
  -- через такие вот ограничения и выбираются партиции
; 
При этом hadoop'ом обсчитываются только логи за первое мая!
Хотим посчитать число уников (уникальных пользователей) за неделю? Не вопрос. Вот такая команда выдаст число пользователей за первую неделю мая:
SELECT COUNT (DISTINCT CONCAT (ip, user_agent)) 
FROM partitioned_apache_logs 
WHERE day >= '2012-05-07' AND day <= '2012-05-13';
Вот так можно посчитать число уников за месяц:
SELECT COUNT (DISTINCT CONCAT (ip, user_agent)), 
COUNT (1) FROM partitioned_apache_logs 
WHERE day LIKE '2012-05-__';
А подобный запрос построит распределение числа посетителей по дням:
SELECT day, COUNT (DISTINCT CONCAT (ip, user_agent))
FROM partitioned_apache_logs GROUP BY day;
Подробнее по работу с партициями хорошо написано в 12 главе Hadoop the definitive giude

Семплирование по логам

Логи высоконагруженных сервисов порой бывают настолько большими, что расчет некоторых статистик на ним может занимать часы. Это могут быть, например, статистики за год или по куче сайтов одновременно.
При этом часто мы готовы пожертвовать точностью нашего расчета, чтобы сократить время ожидания результата. Расчет статистик по репрезентативным выборкам в веб аналитике называется "семплированием".
Hive поддерживает возможность производить считать статистику по семплированным данным. Для этого в hive используются бакеты (корзинки) - файлы, по которым раскладываются данные, хранящиеся в таблице.
При расчете статистик по пользователям и по сессиям разумно делать семплирование по пользователям отбирая случайным образом тех,по кому мы будем считать статистику.
Создадим такую таблицу:
CREATE TABLE sampled_apache_logs (
    ip STRING,
    human_time STRING,
    url STRING,
    response STRING,
    referrer STRING,
    user_agent STRING,
    user_id STRING
)
CLUSTERED BY(user_id) INTO 32 BUCKETS
LOCATION '/home/mezentsev/src/sampled_apache_logs';  
Чтобы разделить логи на семплы, мы просто должны включить автоматическое разделение на семплы, а затем скопировать логи из "обычной" таблицы в таблицу с семплированием:
ADD JAR /usr/local/hive-0.7.1/lib/hive-contrib-0.7.1.jar;
SET hive.enforce.bucketing=true;
INSERT OVERWRITE TABLE sampled_apache_logs 
SELECT ip, human_time, url, response, referrer, user_agent, 
CONCAT (ip, user_agent) as user_id 
FROM partitioned_apache_logs; 
Внимание! Чтобы разложить на 32 корзинки, хадупу потребуется  запустить 32 редьюсера. В локальном режиме работы редьюсер всегда один и "фокус не удасться". Для правильного результата задачу надо запускать в распределенном режиме.
Первая команда добавляет необходимы JAR файл в распределенный кеш, чтобы разбор таблицы исходных логов выполнялся правильно на каждом из мапперов.Вторая заставляет раскладывать результаты инсерта по бакетам, а третий запрос, собственно говоря, выполняет преобразование.
Запросить результаты по самплу можно командой
SELECT count (DISTINCT user_id) 
FROM sampled_apache_logs 
TABLESAMPLE (BUCKET 1 OUT OF 32);
В выражении "BUCKET 1 OUT OF 32"  32 означает число корзин, из которых мы будем выбирать, а 1 - номер выбранной корзины из этого множества. "BUCKET 1 OUT OF 8" будет означать, что мы читаем данные из одной из 8 корзин. В реальности будут обработаны логи, хранящиеся в 4 корзинах из 32.
Для статистики я прогнал на кластере одну и ту же задачу: найти число пользователей с айпадами на сайте, используя разное семплирование.
Запрос выглядит так:
SELECT count (DISTINCT user_id) FROM 
sampled_apache_logs TABLESAMPLE (bucket 2 out of 32
WHERE user_agent LIKE "%iPad%";  
Вот что у меня получилось:
  • в реальности их 331 из 1000 (33.1%). Запрос посчитался за 28 секунд
  • на 1/4 данных их 91 из 272 (33.6%). Время расчета: 21 секунда
  • на 1/32 данных их осталось 11 из 33 (33.3%). Расчет занял: 19 секунд
В понастоящему больших задачах семплирование уменьшает время исполнения в разы.

Как это работает в Рамблере

В нашей компании при обработке внутренней статистики комбинируются оба эти подхода:
  • Логи разделяются на разделы по дням и по проектам
  • Внутри каждого из разделов делается разделение логов на 32 корзины по id пользователя

6 комментариев:

  1. Насчет добавления партиций: есть и другие способы:
    1) RECOVER PARTITIONS, если используется Amazon EMR
    2) Команда msck
    https://issues.apache.org/jira/browse/HIVE-874

    Сказать по правде, ни то, ни другое я пока не попробовал, хватало функции Dynamic Partitions при перебрасывании данных из промежуточной таблицы в конечную.

    ОтветитьУдалить
    Ответы
    1. насчет msck - полезное замечание. Можно будет его попробовать в работе. С EMR обращаться мне пока не довелось.

      Удалить
  2. А разве в Hive нет индексов, позволяющих эффективно выполнять range-запросы?

    ОтветитьУдалить
    Ответы
    1. В официальной документации и в самых свежих книжках утверждается, что hive не поддерживает индексы.

      Однако про использования индексов есть упоминания в Wiki разработчиков, уже определен их синтаксис. Так что в новой версии hive мы можем их увидеть.

      Вот что написано в wiki про range подобные запросы:
      https://cwiki.apache.org/confluence/display/Hive/FilterPushdownDev

      Удалить
  3. Одна из последних презентаций на тему индексов в Hadoop:
    http://www.slideshare.net/squarecog/flexible-insitu-indexing-for-hadoop-via-elephant-twin

    ОтветитьУдалить
    Ответы
    1. Здоровская вещь!
      Жаль что интегрирована пока только с Pig'ом - наши аналитики сейчас плотно уже подсели на hive

      Удалить