Home » Локальный анализ многогигабайтных файлов JSON

Локальный анализ многогигабайтных файлов JSON

Недавно мне пришлось анализировать многогигабайтные дампы JSON в контексте проекта. JSON на самом деле это довольно приятный формат для использования, поскольку он удобен для чтения человеком и имеет много имеющихся для этого инструментов. JQ позволяет выражать сложные этапы обработки в одной командной строке и Юпитер с Python и Панды позволяют простой интерактивный анализ быстро найти то, что вы ищете.

Однако с файлами размером в несколько гигабайт анализ становится намного сложнее. Запуск сингла
jq команда займет много времени. Когда вы ~методом проб и ошибок~итеративно строите jq команды, как я, вы быстро устанете ждать около минуты, пока ваша команда увенчается успехом, только для того, чтобы обнаружить, что она на самом деле не вернула то, что вы искали. Интерактивный анализ аналогичен. Чтение всех 20 гигабайт JSON займет изрядное количество времени. Вы можете обнаружить, что данные не помещаются в оперативную память (что вполне возможно, ведь JSON в конце концов является удобочитаемым форматом), или в конечном итоге вам придется перезапустить ядро ​​Python, а это значит, что вам придется терпеть загрузку. время снова.

Конечно, существуют облачные предложения, основанные на Apache. Луч,
Значительный и многие другие. Однако данные клиентов не передаются в облачные сервисы с моего разрешения, так что это исключено. Настроить такую ​​среду, как Flink, локально вполне возможно, но для одноразового анализа придется приложить немало усилий.

Пытаясь проанализировать файлы такого размера, я нашел два способа эффективной локальной обработки очень больших файлов JSON, которыми хочу поделиться. Один из них основан на распараллеливании jq командная строка с GNU-параллельдругой основан на Jupyter с сумерки библиотека.

В начале была командная строка: JQ и Parallel

Я стараюсь сначала найти простые решения проблем, и большинство задач, которые мне приходилось решать с файлами JSON, представляли собой простые преобразования, которые легко выразить в jqязык. Извлечение вложенных значений или поиск конкретных объектов JSON выполняется очень легко. В качестве примера представьте, что у вас есть 20 гигабайт таких структур (я вставил символы новой строки для удобства чтения, ввод, который мы на самом деле читаем, находится в одной строке):

{
  "created_at": 1678184483,
  "modified_at": 1678184483,
  "artCode": "124546",
  "status": "AVAILABLE",
  "description": "A Windows XP sweater",
  "brandName": "Microsoft",
  "subArts": [
    {
      "created_at": 1678184483,
      "modified_at": 1678184483,
      "subCode": "123748",
      "color": "green",
      "subSubArts": [
        {
          "created_at": 1678184483,
          "modified_at": 1678184483,
          "code": "12876",
          "size": "droopy",
          "currency": "EUR",
          "currentPrice": 35
        },
        {
          "created_at": 1678184483,
          "modified_at": 1678184483,
          "code": "12876",
          "size": "snug",
          "currency": "EUR",
          "currentPrice": 30
        }
      ]
    },
    {
      "created_at": 1678184483,
      "modified_at": 1678184483,
      "subCode": "123749",
      "color": "grey",
      "subSubArts": [
        {
          "created_at": 1678184483,
          "modified_at": 1678184483,
          "code": "12879",
          "size": "droopy",
          "currency": "EUR",
          "currentPrice": 40
        },
        {
          "created_at": 1678184483,
          "modified_at": 1678184483,
          "code": "12876",
          "size": "snug",
          "currency": "EUR",
          "currentPrice": 35
        }
      ]
    }
  ]
}

А jq запрос типа .subArts[]|select(.subSubArts[].size|contains("snug")) выдаст вам все подстатьи, имеющие «удобные» подстатьи. Выполнение аналогичной команды для 10-гигабайтного файла JSON заняло около трех минут, что не очень хорошо, особенно если вы нетерпеливы (как я).

К счастью, мы можем ускорить этот процесс, если у нас есть некоторая информация о структуре входного файла (очевидно, мы знаем, что формат — JSON). Мы используем jq в качестве фильтра для отдельных объектов JSON, а это означает, что мы сможем эффективно распараллеливать выражение поиска. Всякий раз, когда мне приходится запускать команды оболочки параллельно, я использую GNU-параллелькоторый может обрабатывать команды оболочки, доступ по SSH к удаленным серверам для DIY-кластера, вставку SQL и многое другое.

Read more:  Новый анализ минералов зубов подтверждает, что мегалодон был теплокровным

В этом случае мы знаем, что наши объекты JSON в файле разделены закрывающей фигурной скобкой, за которой следует символ новой строки, по одному объекту JSON в каждой строке. Это означает, что мы можем сказать parallel бежать jq
параллельно с этими объектами JSON с помощью --recend выключатель. Обратите внимание, что вы также можете указать параллель для интерпретации --recend как регулярное выражение, которое позволит вам правильно разделить красиво напечатанный пример выше с помощью --recend из ^}n. Вероятно, это существенно медленнее, я бы не стал использовать инструмент, который выдает 10 гигабайт красиво напечатанный JSON, и при необходимости я бы просто использовал jq -c чтобы свернуть его снова.

Создание одиночного jq процесс для каждого объекта JSON будет нет приводят к ускорению (поскольку выполнение новых процессов обходится дорого), поэтому мы говорим parallel собирать законченные объекты в блоки и передавать их в jq процесс. Оптимальный размер блока будет зависеть от размера входного файла, пропускной способности вашего диска, количества процессоров и других факторов. У меня было достаточное ускорение с размером блока в 100 мегабайт, но выбор большего размера блока, вероятно, не помешает.
Parallel может эффективно разделить файлы, используя --pipe-part вариант (по причинам, почему это более эффективно, см.
здесь), поэтому мы можем использовать это для предоставления входных данных для нашей параллельной jq процессы.

Наконец, худшая часть любой параллельной работы: упорядочивание результатов. Parallel для этого есть много возможностей. Мы хотим сохранить наш вывод в исходном порядке, поэтому добавляем --keep-order аргумент. Конфигурация по умолчанию, --group, будет буферизовать входные данные для каждого задания до его завершения. В зависимости от вашего конкретного запроса потребуется буферизация на диск, если выходные данные запроса не помещаются в основную память. Вероятно, это не так, поэтому использование --group было бы хорошо. Однако мы можем добиться немного большего с --line-bufferчто в сочетании с --keep-order, немедленно запускает вывод для первого задания и буферизует вывод для других заданий. Для этого потребуется немного меньше места на диске или памяти за счет некоторого времени процессора. Оба варианта подойдут для «обычных» запросов, но проведите некоторое сравнительное тестирование, если ваш запрос генерирует большие объемы выходных данных.

Наконец, предоставьте входной файл с --arg-file. Собрав все это вместе, мы получаем готовую командную строку:

parallel -a '' --pipepart --keep-order --line-buffer --block 100M --recend '}n' "jq ''"

Это будет работать jq параллельно в вашем файле на блоках по 100 мегабайт, всегда содержащих полные объекты JSON. Вы получите результаты запроса в исходном порядке, но гораздо быстрее, чем в непараллельном случае. При выполнении на 8-ядерном/16-поточном процессоре Ryzen распараллеливание приведенного выше запроса приводит к времени выполнения 30 секунд, что означает ускорение примерно в 6 раз.. Неплохо для магии ракушек, а? И вот htop скриншот, показывающий великолепное распараллеливание.

Также обратите внимание, что этот подход распространяется на другие текстовые форматы. Если у вас есть 10 ГБ CSV, вы можете использовать Миллер для обработки. Для двоичных форматов вы можете использовать fq если вы можете найти работоспособный разделитель записей.

Ноутбук: Jupyter и Dask

Использовать GNU Parallel — это здорово, но для интерактивного анализа я предпочитаю блокноты Python и Jupyter. Одним из способов использования блокнота с таким большим файлом является его предварительная обработка с помощью parallel магия из предыдущего раздела. Однако я предпочитаю не переключать среду во время анализа данных, а использование истории оболочки в качестве документации не является устойчивой практикой (спросите меня, откуда я знаю).

Наивное чтение 9 гигабайт данных JSON с помощью Pandas read_json быстро исчерпывает мои 30 гигабайт оперативной памяти, поэтому явно требуется некоторая предварительная обработка. Опять же, итеративное выполнение этой предварительной обработки было бы болезненным, если бы нам пришлось снова обрабатывать весь файл JSON, чтобы увидеть результаты. Мы могли бы написать код, который будет обрабатывать только первые n строки файла JSON, но я искал более общее решение. Выше я упоминал Beam и Flink, но мне не удалось заставить работать локальную установку.

сумерки делает то, что мы хотим: он может разбивать большие наборы данных, обрабатывать их параллельно и объединять их вместе, чтобы получить окончательный результат. Давайте создадим новую среду Python с pipenvустановите необходимые зависимости и запустите блокнот Jupyter:

pipenv lock
pipenv install jupyterlab dask[distributed] bokeh pandas numpy seaborn
pipenv run jupyter lab

Если pipenv недоступен, следуйте инструкциям Инструкция по установке чтобы настроить его на вашем компьютере. Теперь мы можем начать. Импортируем необходимые пакеты и запускаем локальный кластер.

import dask.bag as db
import json
from dask.distributed import Client
client = Client()
client.dashboard_link

Ссылка на панель мониторинга предоставляет панель мониторинга, на которой подробно отображается активность, происходящая в вашем локальном кластере. Каждая распределенная операция, которую мы будем запускать, будет использовать этот клиент. Это почти как волшебство!

Скриншот

Теперь мы можем использовать этот локальный кластер для чтения нашего большого файла JSON в сумку. Пакет представляет собой неупорядоченную структуру, в отличие от фрейма данных, который упорядочен и разделен по индексу. Он хорошо работает с неструктурированными и вложенными данными, поэтому мы используем его здесь для предварительной обработки JSON. Мы можем прочитать текстовый файл в разделенную сумку с помощью dask.bag.read_text и blocksize аргумент. Обратите внимание, что мы сразу загружаем JSON, поскольку знаем, что полезная нагрузка действительна в формате JSON.

bag = db.read_text("", blocksize=100 * 1000 * 1000).map(json.loads)
bag

Вы можете получить первые несколько предметов в сумке с помощью bag.take(5). Это позволит вам просмотреть данные и выполнить предварительную обработку. Вы можете интерактивно протестировать предварительную обработку, добавив дополнительные шаги карты:

bag.map(lambda x: x["artCode"]).take(5)

Это даст вам первые пять кодов товаров в сумке. Обратите внимание, что функция вызывалась не для каждого элемента в сумке, а только для первых пяти элементов, которых достаточно, чтобы дать нам ответ. В этом преимущество использования Dask: вы запускаете код только по мере необходимости, что очень полезно для поиска подходящих шагов предварительной обработки.

Если у вас есть подходящий конвейер, вы можете вычислить полные данные с помощью bag.compute() или превратите его в фрейм данных Dask с помощью bag.to_dataframe(). Допустим, мы хотели извлечь размеры и коды наших подзаголовков из приведенного выше примера (это очень маленький файл, но это только иллюстративный пример). Затем мы сделали бы что-то вроде следующего:

result = db.read_text("").map(json.loads).map(lambda x: [{"code": z["code"], "size": z["size"]} for y in x["subArts"] for z in y["subSubArts"]])
result.flatten().to_dataframe().compute()

Это запустит предоставленную лямбда-функцию для каждого элемента сумки параллельно для каждого раздела.
flatten разделит список на отдельные элементы сумки, чтобы мы могли создать невложенный фрейм данных. Окончательно, to_dataframe() преобразует наши данные в фрейм данных Dask. Вызов compute() выполнит наш конвейер для всего набора данных, что может занять некоторое время. Однако из-за «ленивости» Dask вы можете проверять промежуточные этапы конвейера в интерактивном режиме (с помощью take()
и head()). Кроме того, Dask позаботится о перезапуске рабочих процессов и сбросе данных на диск, если памяти недостаточно. Получив фрейм данных Dask, мы можем сохранить его в более эффективный формат файла, например Паркеткоторый мы затем можем использовать в остальной части нашего кода Python либо параллельно, либо в «обычных» Pandas.

Для 9 гигабайт JSON мой ноутбук смог выполнить конвейер обработки данных, аналогичный приведенному выше, за 50 секунд.. Кроме того, я смог построить конвейер на «стандартном» Python в интерактивном режиме, аналогично тому, как я строю свой jq запросы.

У Dask есть целый ряд дополнительных функций для параллельной обработки данных, но я надеюсь, что вы получили базовое представление о том, как это работает. По сравнению с jqу вас в руках вся мощь Python, что упрощает объединение данных из разных источников (например, файлов и базы данных), и именно здесь решение на основе оболочки начинает испытывать трудности.

Плавник

Надеюсь, вы заметили, что обработка больших файлов не обязательно должна осуществляться в облаке. Новейший ноутбук или настольный компьютер часто достаточно хорош для выполнения задач предварительной обработки и статистики с помощью небольшого количества инструментов.. Для меня этот инструмент состоит из jq чтобы отвечать на быстрые вопросы во время отладки и принимать решения о способах реализации функций, а Dask — для более сложного исследовательского анализа данных.

2023-12-31 11:18:37


1704022736
#Локальный #анализ #многогигабайтных #файлов #JSON

Leave a Comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.