Parsing Compressed JSON Lines Across Multiple Files with Julia

I'm new to big data. It turns out, when they say big, they're not kidding. (What follows might be completely obvious and trivial for 103% of you.) I started out writing arrays of (basically) homogeneous data by just JSON-serializing the whole thing and writing it out.

Parsing Compressed JSON Lines Across Multiple Files with Julia

I'm new to big data.

It turns out, when they say big, they're not kidding. (What follows might be completely obvious and trivial for 103% of you.)

Saving the Data

The naïve way

I started out writing arrays of (basically) homogeneous data by just JSON-serializing the whole thing and writing it out. This started to get out of hand, so I started compressing the files as well:

import json, subprocess

def jsonbz2(fn, data):
    with open(fn, 'w') as f:
        json.dump(data, f)
    subprocess.Popen(['bzip2', '--best', fn])

I used subprocess to spawn many bzip2 processes. You should use bz2 and multiprocessing to keep it more pythonic and less shell scripty, and also to have finer control of how many bzip2 processes are launched. (It's the difference between using & in your shell loop vs parallel --semaphore. You can do it in Julia like this:

using JSON

function jsonbz2(fn, data)
    open(fn, "w") do f
        JSON.print(f, data)
    end
    @async run(`bzip2 --best $fn`)
end

Here we used the @async macro (Julia has real Lisp macros) which wraps the Julia expression you give it into a Task object and puts it in the OS schedule queue. The process will actually keep running after your code ends, but you can wait for it with @sync.

Write out lines of JSON

OK, well the problem with having a bunch of *.json.bz2 files with JSON-serialized arrays is that you have to decompress the whole thing and parse it, even if you only want one element. Now, because you can serialize something with JSON without using newlines, it makes sense to use a newline to delimit chunks of text to run in a JSON parser. The bonus is that most decompression libraries support line-by-line decompression. So, let's JSON-serialize the elements of an array and write it out line-by-line.

import json

def jsonbz2(fn, data):
    with open(fn, 'w') as f:
        if f.write('\n'.join(map(json.dumps, data))) > 0:
            f.write('\n')
    subprocess.Popen(['bzip2', '--best', fn])

And in Julia:

using JSON

function jsonbz2(fn, data::AbstractVector)
    open(fn, "w") do f
        _ = join(f, map(JSON.json, data), "\n")
        _ = write(f, "\n")
    end
    @async run(`bzip2 --best $fn`)
end

Reading the Files

OK, so now you have a bunch of *.json.bz2 files, where each line is an element of some array. In my case, the separate files have context worth preserving, but it might be necessary to iterate across elements of some or all of these files. So, here we go:

using CodecBzip2
using IterTools
using JSON
using Base.Iterators : flatten, cycle

loadjsonbz2lines(fns::Vector{<:AbstractString}) = flatten(chain(imap(loadjsonbz2lines, fns)))

loadjsonbz2lines(fn::AbstractString) = \
    imap(JSON.parse, eachline(Bzip2DecompressionStream(open(fn))))

Then you can do this:

using Glob

elems = loadjsonbz2lines(glob("*.json.bz2"))
for elem in elems
    # do stuff
end

Now, when you initialize elems, you have setup an iterator that will (lazily) open up the first file, decompress chunks to get the first line, JSON parse it, and as you consume the iterator, it will work its way through the first file, then move on to the next, completely seamlessly.

OK, maybe not seamlessly. In my case, I would sometimes come across files with bad JSON (not all of them were made by me). In that case you could throw in the following:

maybe_json(s) = try JSON.parse(s) catch nothing end
nonothings(x) = x != nothing

Then, you can throw in a Base.Iterators.filter(nonothings, ...) into loadjsonbz2lines.

I will note that in my own case, I had LZMA-compressed (*.xz) files, but since bzip2 is more frequently used, I thought I would write examples around that. If you did use LZMA, you can use CodecXz instead of CodecBzip2.