<-- mermaid -->

Clojureで巨大なZIPファイル/CSVファイルを処理した話

SaaS Product Team(以下Product Team)のあやぴーです。

Product Teamの開発しているプロダクトでは「企業に関する大量データ」というものを扱う機会があります。特に様々な形式でデータパートナーから受領するため、一筋縄でいかないことが多々あります。今回はその中でも巨大なZIPファイルの中に大量のCSV(ライクな)ファイルをClojureでいい感じに処理するために苦戦した話を書いていこうと思います。

前提

まずはZIPファイルについて説明します。

  • 毎月新しいファイルが100程度配信されている
  • ZIPファイルは大きいもので2GB、小さいもので1MB程度
  • ZIPファイルの中には大量のCSV形式(区切り文字は|)のテキストファイルが含まれている
    • 数で言えば多いものでは5万、少ないものだと数千程度
  • 各CSVファイルは1000~15000行程度のデータを含んでいる
    • FOOBARBAZ|20000|2024-02-03のように日付や任意の値などが含まれている

このような巨大なZIPファイルを、CSVの各行の日付がある範囲内(例えば2023-03~2024-02)にあるときのみ抽出して、データベースにinsertしたかったのです。そして、可能な限り速く処理してしまいたいと考えていました。

こういう問題はClojureが得意とする領域なので、今回はClojureでやっつけてしまうことに決めました。全体像としては以下のようなコードで表現できます。

;; java.io.Fileのシーケンスを返す
(defn zip-files [])

;; java.io.File(ZIPファイル)を受け取り、
;; 任意の日付の範囲内に収まる行データを各CSVファイルから抽出する
;; それらをまとめて単一のシーケンスとして返す
(defn process-zip-file [zip])
  
;; 受け取った行データをデータベースに挿入する 
(defn insert [rows])
  
(doseq [zip-file (zip-files)]
  (-> (process-zip-file zip-file)
      insert))

今回の注目したい点はこの中でもprocess-zip-fileという関数として実装した部分になります。

最初のアプローチ

最初の実装は以下のようになりました。Javaのjava.util.zip.ZipInputStreamclojure.java.io/copyなどを活用し、あまり苦労せず書き下すことができました。

(ns unzip.example
  (:require [clojure.data.csv :as csv]
            [clojure.java.io :as io]
            [me.raynes.fs :as fs]
            [tick.core :as t])
  (:import [java.io ByteArrayOutputStream]
           [java.util.zip ZipInputStream]))

(let [months (->> (iterate #(t/>> % (t/of-months 1)) (t/year-month "2023-03"))
                  (take-while #(t/<= % (t/year-month "2024-02")))
                  (map (partial t/format (java.time.format.DateTimeFormatter/ofPattern "yyyy-MM")))
                  set)
      pred (fn pred [l]
             (contains? months (.substring (get l 2) 0 7)))]
  (defn process-zip-file [zip] 
    (with-open [zis (ZipInputStream. (io/input-stream zip))]
      (loop [entry (.getNextEntry zis)
             rows []]
        (if (nil? entry)
          rows
          (let [rows (let [os (ByteArrayOutputStream.)
                           _ (io/copy zis os)]
                       (with-open [rdr (io/reader (.toByteArray os))]
                         (->> (csv/read-csv rdr :separator \|)
                              (filter pred)
                              (into rows))))]
            (recur (.getNextEntry zis) rows)))))))

monthspredが少し不思議な実装に見えるかもしれませんが、前提のところで説明したようにある期間に含まれる行データを抽出したいため、年月(2024-02など)の集合をつくっているのと、行(FOOBARBAZ|20000|2024-02-03)から3つ目の項目である日付(2024-02-03)を取り出して文字列操作で年月の部分だけを切り出して抽出対象かを判定する述語関数を作っています(丁寧にやるのであれば日付オブジェクトに変換するべきだとは考えましたが、何百,何千万回と比較で使われるため速さを重視しました/不正なデータがここにないことは約束されています)。

このように実装するとloopを使っている関係から、遅延シーケンスではなく大きなシーケンスを作って返すことになります。あまりClojureであることを活かせていませんが、これでうまく動作するのであればこれで終わらせるつもりでした。

OutOfMemoryErrorとの闘い

実データを使い確認をしていると幾つか問題が出てきました。

  1. 明確に時間がかかる
  2. OutOfMemoryErrorの発生

時間がかかるのは一旦置いておいても、OutOfMemoryErrorは見過ごせません。どうやらデータベースにinsertする際に、行数が50万を超えてしまうこともあり処理しきれないZIPファイルが存在することが分かりました。

そうなるとinsertを10万行程度のチャンクにわけて処理したほうが良さそうです。つまり、全体像のイメージを以下のようにすることを検討しました。

(doseq [zip-file (zip-files)
        rows (partition-all 100000 (process-zip-file zip-file))]
  (insert rows))

ただし、先程の実装ではprocess-zip-fileが完了しないとpartition-allをはじめることができないため、Clojureらしさを活かすためにもprocess-zip-fileで遅延シーケンスを返すように修正することにしました。遅延シーケンスにすることによりオーバーヘッドが余計に増えることになりますが、今回のように巨大なデータを扱う場合はメモリを逼迫することがなくなるため有利になります。

Clojureで遅延シーケンスを返すように変更するのは難しくなく、以下のようになります。

(ns unzip.example
  (:require [clojure.data.csv :as csv]
            [clojure.java.io :as io]
            [me.raynes.fs :as fs]
            [tick.core :as t])
  (:import [java.util.zip ZipFile]))

(let [months (->> (iterate #(t/>> % (t/of-months 1)) (t/year-month "2023-03"))
                  (take-while #(t/<= % (t/year-month "2024-02")))
                  (map (partial t/format (java.time.format.DateTimeFormatter/ofPattern "yyyy-MM")))
                  set)
      pred (fn pred [l]
             (contains? months (.substring (get l 2) 0 7)))]
  (defn process-zip-file 
    ([zip]
     (let [zip (cond-> zip (instance? java.io.File zip) (ZipFile.))]
       (process-zip-file zip (enumeration-seq (.entries zip)))))
    ([zip [entry :as entries]]
     (when entry
       (lazy-cat (with-open [rdr (io/reader (.getInputStream zip entry))]
                   (->> (csv/read-csv rdr :separator \|)
                        (filter pred)
                        doall))
                 (process-zip-file zip (rest entries)))))))

この実装は最初に実装に比べるとスマートではないでしょうか。java.io.zip.ZipFileを使うように変更しています。ZipFile#entriesEnumerationを返すため、Clojureでは直接扱えませんがenumeration-seq関数を使うとすんなりシーケンスへと変換することができます。

ZIPファイル内のファイル毎の処理もZipFile#getInputStreamを始点に書くことができるため、煩雑さがなくなっています。ZipEntry毎に処理しているというのも直截的表現されているため分かりやすさも手に入れました。

このように遅延シーケンスを返すように変更した成果もあり、OutOfMemoryErrorは発生しなくなりました。

実行時間との闘い

これで一通りちゃんと動くようにはなったんですが、既に分かっていたように非常に遅いのです。実際4,5時間かけないと処理が完了しないという状況でした。このままでは運用する際に困ることが目に見えていたため、少し改善することにしました。

CSVの読み込み

まずREPLを使って開発しているときに明確に遅いと感じていたのは、CSVファイルの読み込みでした。clojure.data.csvを使っていたのですが、もう少し速い方法はないのかと調べているとtech.ml.datasetというものがありました。

このライブラリを使うとCSVファイルを高速に読み込めそうでしたが、よくよく依存しているライブラリを見ているとunivocity-parsersというライブラリを使ってCSVファイルのパースを行っているようだったので、univocity-parsersを使って自前で以下のような関数を書いてみました。

(defn read-csv [^java.io.Reader rdr]
  (let [parser (let [settings (CsvParserSettings.)
                     _ (.setDelimiter ^CsvFormat (.getFormat settings) \|)]
                 (CsvParser. settings))
        read-line (fn read-line []
                    (if-let [line (.parseNext parser)]
                      (cons line (lazy-seq (read-line)))
                      (.stopParsing parser)))]
    (.beginParsing parser rdr)
    (read-line)))

この関数を使って1万5千行程度のCSVファイルを読み込むと、clojure.data.csv/read-csvに比べて10倍以上の速さでCSVファイルを読み込めるようになりました1。この関数を採用するとZIPファイル全体では約2倍の速度で動作するようになりました。

少しだけ注意しなければならないのは(.parseNext parser)が返す値はStringの配列(Java由来)になるため、実際はString配列の遅延シーケンスが作られるという点です。うっかりconjすると例外が投げられるのに注意です。

リフレクションの抑制

もう1つ分かりやすいところで、型ヒントをつけることでパフォーマンスを向上させられそうだと考えました。(set! *warn-on-reflection* true)を評価して、process-zip-fileを評価してみると4箇所ほどリフレクション警告が発生することが確認できました。

この中で1番呼び出し回数が多いのはpred関数の中で使われているString#substringです。ここでリフレクションが発生していると、CSVファイルのすべての行をテストするときに毎回リフレクションが発生していることになります。つまり、1万5千行あれば1万5千回なので、このリフレクションを止めるのが効果が高そうでした。

そして、これは完全にうっかりだったんですが、ClojureにはString#substringをラップしたsubsという関数があります。これを利用するようにするだけでリフレクションは抑制されて、5倍程度速くなってしまいました。

まとめ

Clojureを使うことで巨大なZIPファイル/CSVファイルの問題を見事に解決することができました。最終的にすべてのZIPファイルを1時間程度で処理しきることができたので、私としては満足する結果を得ることができました。

Clojureは普通に書いてしまうと遅くなってしまうときがありますが、最初は書きやすい/読みやすい書き方で書いてしまって、今回のようにパフォーマンス上の懸念が出てきた場合には少しずつ適切に対応していくことでパワフルさをそのままに性能を手に入れることができる良い言語です。しかも、Javaさえ動くならどこでも動くというのもポイントが高いです。

最終的な動くコードは以下に貼っておきます。

(ns unzip.example
  (:require [clojure.java.io :as io]
            [me.raynes.fs :as fs]
            [tick.core :as t])
  (:import [com.univocity.parsers.csv CsvFormat CsvParser CsvParserSettings]
           [java.util.zip ZipFile]))

(defn read-csv [^java.io.Reader rdr]
  (let [parser (let [settings (CsvParserSettings.)
                     _ (.setDelimiter ^CsvFormat (.getFormat settings) \|)]
                 (CsvParser. settings))
        read-line (fn read-line []
                    (if-let [line (.parseNext parser)]
                      (cons line (lazy-seq (read-line)))
                      (.stopParsing parser)))]
    (.beginParsing parser rdr)
    (read-line)))

(let [months (->> (iterate #(t/>> % (t/of-months 1)) (t/year-month "2023-03"))
                  (take-while #(t/<= % (t/year-month "2024-02")))
                  (map (partial t/format (java.time.format.DateTimeFormatter/ofPattern "yyyy-MM")))
                  set)
      pred (fn pred [l]
             (contains? months (subs (get l 2) 0 7)))]
  (defn process-zip-file
    ([zip]
     (let [zip (cond-> zip (instance? java.io.File zip) (ZipFile.))]
       (process-zip-file zip (enumeration-seq (.entries zip)))))
    ([zip [entry :as entries]]
     (when entry
       (lazy-cat (with-open [rdr (io/reader (.getInputStream zip entry))]
                   (->> (read-csv rdr)
                        (filter pred)
                        doall))
                 (process-zip-file zip (rest entries)))))))

  1. 手元で簡単に測ってclojure.data.csv/read-csvが約150ms、univocity-parsersを使った関数が約10msという結果が得られました
Page top