Skip to content

peter siemen, dipl.-inf.

Parsing large XML streams with Enumerators, Iteratees and Enumeratees

Scala, Play Framework, Stream Processing2 min read

I am not a fan of huge XML files. They are fragile, encoding issues mess up parsers easily and when working with them you just cannot fail fast. I also find writing custom SAX parsers too tedious and while writing them I always think SAX gives me the wrong level of abstraction. Last week I needed one of those vicious huge-XML-file processors for a scala project and it also needed to be fault tolerant. It should extract as many objects of a certain type as possible and not fail if faced with some messed up syntax here and there. The preceding step of the XML file processing was downloading the file and since the playframework’s prefered way of downloading huge files involves Iteratees I needed a quick refresher. While reading through play’s documentation on Enumerators, Iteratees and Enumeratees I decided that I should also parse the XML file reactively using a combination of them. Play gave me just the right forward pass with this Enumerator:

figure-1
1val fileEnumerator: Enumerator[Array[Byte]] = {
2 Enumerator.fromFile(new File("path/to/some/file"))
3}

I figured I could simply write a custom Enumeratee that transforms a stream of Array[Byte] objects into a stream of objects that I want to extract from the XML file. The idea of an Enumeratee is to transform reactively an input stream into an output stream. So I had my plan laid out. In order to keep things simple and to do one thing at a time I wanted to use three Enumeratees and chain them together and plug them to the Enumerator Enumerator.fromFile. The first Enumeratee must transform pieces of Array[Byte] to Array[Char]. The next one must transform Array[Char] chunks into single Chars. And the final one would be my SAX parser replacement and transform the stream of Chars into a stream of objects I want to extract from the XML file. I did not need to implement the first one, since James Roper already did a great job on that. It transforms a stream of Array[Byte] into a stream of Array[Char] and does the decoding of multibyte encoded UTF-8 correctly. Then I found

1Enumeratee.mapConcat[Array[Char]](char => char))

which gave me the second Enumeratee. So I finally needed to implement the SAX parser replacement Enumeratee and I managed to put it together like this:

figure-2
1object ExtraEnumeratee extends Logging {
2
3 def toTypedArray[T](tag: String, deserialize: String => Option[T]): Enumeratee[Char, T] =
4 new Enumeratee[Char, T] {
5 private val startTag = "<" + tag + ">"
6 private val endTag = "</" + tag + ">"
7
8 private def toType(str: String): Option[T] = {
9 try {
10 deserialize(str.substring(str.lastIndexOf(startTag)))
11 } catch {
12 case _: IndexOutOfBoundsException =>
13 logger.error(s"Could not find startTag $startTag. Cannot deserialize: $str")
14 None
15 case x: Throwable =>
16 logger.error(s"cannot deserialize: $str from substring ${str.substring(str.indexOf(startTag))}")
17 None
18 }
19 }
20
21 def step[A](
22 inner: Iteratee[T, A],
23 buffer: StringBuilder)(in: Input[Char]): Iteratee[Char, Iteratee[T, A]] =
24 in match {
25 case EOF => Done[Char, Iteratee[T, A]](inner, EOF)
26 case Empty => Cont(step(inner, buffer))
27 case El(data) => {
28 buffer.append(data)
29 if (buffer.endsWith(endTag)) {
30 inner.pureFlatFold {
31 case Step.Cont(k) =>
32 toType(buffer.toString) match {
33 case Some(item) => Cont(step(k(El(item)), new StringBuilder))
34 case None =>
35 Cont(step(inner, new StringBuilder))
36 }
37 case _ => Done(inner, Input.Empty)
38 }
39 } else Cont(step(inner, buffer))
40 }
41 }
42
43 def applyOn[A](inner: Iteratee[T, A]) = Cont(step(inner, new StringBuilder))
44 }
45}

It buffers a stream of Char by Char. For each incoming Char it checks if the the buffer ends with <endTag>. If so it strips the buffer down to the last occurrence of <startTag> and applies the function deserialize to the string to construct an object of type T from it. Finally all plugged together, it looks nice and simple like this:

figure-3
1object ItemManager extends Logging {
2
3 def enumerate(inputStream: InputStream):
4 Enumerator[Item] = {
5 Enumerator.fromStream(inputStream)
6 .through(ExtraEnumeratee.toCharArray())
7 .through(Enumeratee.mapConcat[Array[Char]]
8 (char => char))
9 .through(ExtraEnumeratee.toTypedArray[Item]
10 ("item", Item.deserialize))
11 }
12}

To be honest, a SAX parser would be much easier to understand and much less complex than the above Enumeratee and the whole concept of Iteratees and Enumerators. But this XML parsing Enumeratee is generic, composes well and hence I can use it now everywhere where I need to extract objects from XML files. All in all this sounds much better to me than having to write custom SAX Parsers over and over again. Great Success ;)!

© 2020 by peter siemen, dipl.-inf.. All rights reserved.