Parsing Compressed JSON Lines Across Multiple Files with Julia
I'm new to big data.
It turns out, when they say big, they're not kidding. (What follows might be completely obvious and trivial for 103% of you.)
Saving the Data
The naïve way
I started out writing arrays of (basically) homogeneous data by just JSON-serializing the whole thing and writing it out. This started to get out of hand, so I started compressing the files as well:
import json, subprocess
def jsonbz2(fn, data):
with open(fn, 'w') as f:
json.dump(data, f)
subprocess.Popen(['bzip2', '--best', fn])
I used subprocess
to spawn many bzip2 processes. You should use bz2
and multiprocessing
to keep it more pythonic and less shell scripty, and also to have finer control of how many bzip2 processes are launched. (It's the difference between using &
in your shell loop vs parallel --semaphore
. You can do it in Julia like this:
using JSON
function jsonbz2(fn, data)
open(fn, "w") do f
JSON.print(f, data)
end
@async run(`bzip2 --best $fn`)
end
Here we used the @async
macro (Julia has real Lisp macros) which wraps the Julia expression you give it into a Task
object and puts it in the OS schedule queue. The process will actually keep running after your code ends, but you can wait for it with @sync
.
Write out lines of JSON
OK, well the problem with having a bunch of *.json.bz2
files with JSON-serialized arrays is that you have to decompress the whole thing and parse it, even if you only want one element. Now, because you can serialize something with JSON without using newlines, it makes sense to use a newline to delimit chunks of text to run in a JSON parser. The bonus is that most decompression libraries support line-by-line decompression. So, let's JSON-serialize the elements of an array and write it out line-by-line.
import json
def jsonbz2(fn, data):
with open(fn, 'w') as f:
if f.write('\n'.join(map(json.dumps, data))) > 0:
f.write('\n')
subprocess.Popen(['bzip2', '--best', fn])
And in Julia:
using JSON
function jsonbz2(fn, data::AbstractVector)
open(fn, "w") do f
_ = join(f, map(JSON.json, data), "\n")
_ = write(f, "\n")
end
@async run(`bzip2 --best $fn`)
end
Reading the Files
OK, so now you have a bunch of *.json.bz2
files, where each line is an element of some array. In my case, the separate files have context worth preserving, but it might be necessary to iterate across elements of some or all of these files. So, here we go:
using CodecBzip2
using IterTools
using JSON
using Base.Iterators : flatten, cycle
loadjsonbz2lines(fns::Vector{<:AbstractString}) = flatten(chain(imap(loadjsonbz2lines, fns)))
loadjsonbz2lines(fn::AbstractString) = \
imap(JSON.parse, eachline(Bzip2DecompressionStream(open(fn))))
Then you can do this:
using Glob
elems = loadjsonbz2lines(glob("*.json.bz2"))
for elem in elems
# do stuff
end
Now, when you initialize elems
, you have setup an iterator that will (lazily) open up the first file, decompress chunks to get the first line, JSON parse it, and as you consume the iterator, it will work its way through the first file, then move on to the next, completely seamlessly.
OK, maybe not seamlessly. In my case, I would sometimes come across files with bad JSON (not all of them were made by me). In that case you could throw in the following:
maybe_json(s) = try JSON.parse(s) catch nothing end
nonothings(x) = x != nothing
Then, you can throw in a Base.Iterators.filter(nonothings, ...)
into loadjsonbz2lines
.
I will note that in my own case, I had LZMA-compressed (*.xz
) files, but since bzip2 is more frequently used, I thought I would write examples around that. If you did use LZMA, you can use CodecXz
instead of CodecBzip2
.