Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dseq for a range of integers / custom input splits from clojure code #9

Open
mjwillson opened this issue Nov 11, 2014 · 9 comments
Open

Comments

@mjwillson
Copy link

I was looking for a way to create a dseq for a range of integers, which can be split amongst the mappers.

So far the closest I found was parkour.io.mem/dseq, but this only works in local mode.

Is there a way to implement custom dseqs with custom logic for input splits from clojure code? Perhaps I need to implement (and AOT-compile) a custom InputFormat class for this kind of thing, and implement a cstep which sets this class's name as mapreduce.job.inputformat.class, sets config for it, then somehow wrap that as a dseq? Or perhaps all the dseq stuff assumes file-based input?

Thought I'd mention this anyway as one of the "gaps" alluded to on that documentation ticket -- it feels like something which would be really simple in idiomatic clojure code, and these dseqs are advertised as one of the core abstractions of the library, but it's not at all clear how I'd go about creating one myself, given some logic I'd like to implement for how to generate keys as input for mappers. Looking at the protocol for dseq isn't very enlightening in this respect as they mainly just appear to be wrappers for csteps.

(I did find at least one custom InputFormat for integer range here: https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/IntegerListInputFormat.java , although I'm not sure why this particular class uses static fields for its config and if that would work with parkour.)

@llasram
Copy link
Member

llasram commented Nov 11, 2014

It should be possible to take any existing InputFormat and use it as a dseq, even one which uses static fields for no reason. As looking at some of the existing examples (parkour.io.{text,seqf}) should suggest, you just need to write a Clojure function which use the Hadoop API to configure a Job to use the input format then "bless" the function with parkour.io.dseq/dseq. (With the blessed function usually itself being a closure over the particular input parameters.) Dseqs are a core abstraction, but their goal is to make access to Hadoop data more Clojure-ish; I haven't spent as much time making Clojure data more Hadoop-able.

Simplifying writing new InputFormats (and OutputFormats) from Clojure is something I've considered adding to Parkour, but just hasn't come up enough for me to spend time on it yet. There's a few potential stumbling blocks (the need for input splits to have a backing concrete Writable class), but I think it'd be possible to work around those, at least for the common case.

I have been meaning to add an InputFormat which allows mapping over an existing in-memory collection, by serializing sections of it to the InputSplits as EDN. Your request suggests a relatively simple extension of that where each split's input collection is the result of a function applied to the raw serialized backing split data. But then by that point we're halfway to support for writing arbitrary new InputFormats.

I'd like to stew for a bit on some ideas here. If you'd like to participate, we can use this ticket for design discussion.

@mjwillson
Copy link
Author

Thanks, that helps in terms of understanding what dseqs are and aren't trying to do. Sorry for the possibly-slightly-silly question-- I guess it was bad luck that the first thing I tried to do struck on a complicated use case.

I guess most people will be using hadoop data as input so will be using an existing InputFormat so I'm the odd one out here -- I want to use programatically-generated values as input to my mappers, which can be generated lazily.

Generating those values eagerly and serializing them as EDN would work, although perhaps not ideal if you're serializing a massive list of numbers which could be generated programatically. I wonder if it'd be possible to have a way to create a dseq from a var which maps to a lazy seq of lazy seqs for the splits:

(def splits (partition 10 (range 100)))
(parkour.lazy-splits/dseq #'splits)

Although I guess you'd have to be a little bit careful about things like chunking in the lazy sequences, and perhaps careful to avoid sequential dependences if you want to allow each split to be generated independently of the other ones. E.g. the following might be better than the example above:

(def splits (map #(range % (+ 10 %)) (range 0 100 10)))

@mjwillson
Copy link
Author

Perhaps requiring a function which returns a vector of lazy sequences would make the requirement for laziness more explicit:

(defn splits
   []
   (mapv #(range % (+ 10 %)) (range 0 100 10)))

(parkour.lazy-splits/dseq #'splits)

@mjwillson
Copy link
Author

What I'm missing here is the details of when / where / how these InputFormat and InputSplit instances are serialized and passed around when a job runs on multiple nodes. It sounds like the InputSplits need to be serializable somehow (despite that not being part of the InputSplit interface). I guess that's the difficulty here and why you were thinking about:

result of a function applied to the raw serialized backing split data

Perhaps best if I leave this to the better informed :)

@llasram
Copy link
Member

llasram commented Nov 12, 2014

I think you're on the right track. Here's my breakdown (partially to clarify my own thoughts):

  • An input format consists of two functions:
    • A function which accepts a job configuration, then returns a collection of input splits; and
    • A function which accepts a job configuration and an input split, then returns a record reader.
  • An input split is data describing how to read some particular chunk of an input collection; specifically:
    • Whatever information is necessary to build a record reader for that chunk; e.g., file input path, start offset, etc.
    • The size in bytes of the input data.
    • Which nodes have that chunk locally available and (starting in Hadoop 2.5) how they have it available.
  • A record reader is the complex of two pieces of state-management derived from an input split:
    • Acquisition & release of any external resources necessary to produce records for the split; e.g. opening & closing a file in HDFS.
    • Stateful iteration over the records for the split.

In the actual implementation in Hadoop, there are a few complications:

  • Each of these things has an associated abstract class, from which all concrete implementation must derive.
  • The input splits must be serializable via a configured Hadoop serialization mechanism. In theory any configured mechanism will do, but using anything but Writable serialization is probably asking for trouble.
  • The common logic for (HDFS) file-backed input formats is implemented via the “template method” pattern in FileInputFormat. Accessing this common logic requires creating a concrete sub-class of FileInputFormat.

I believe we can simplify this significantly, via the following:

  • Embed the function (or specifically function-var) for producing a record reader from an input split in the input split data itself as another component
    value. This reduces an input format to just a function accepting a job configuration then returning a collection of splits, requiring only a single
    concrete class implementation.
  • Make input splits an EDN-serialization wrapper around arbitrary data, again requiring only a single concrete class implementation.
  • Provide a function wrapper for accessing the common FileInputFormat logic, implemented via instantiating a sub-class designed for this purpose.

The one tricky thing left is the record readers. They’re just inherently mutable, in way which doesn’t work well with Clojure idioms, and driven in all their mutability by code deep in Hadoop MapReduce. I’m going to mull on them a bit longer, and see if I can’t think of a way to abstract them a bit more nicely from the Clojure perspective.

@llasram
Copy link
Member

llasram commented Nov 13, 2014

I've thought about it for a bit, and I don't see a way of completely hiding RecordReaders without loss of generality, but I think a convenience interface allowing the user to provide a function which returns reifyed instances of the combination of the Seqable, Counted, and Closeable interfaces would cover most uses. I should have a chance to whip something together over the next few days.

@llasram
Copy link
Member

llasram commented Nov 15, 2014

Design changed slightly when faced with actual implementation, but check out PR #10 for my proposed implementation of the basic interface and some examples.

@mjwillson
Copy link
Author

Ooh nice, thanks!
I ended up writing my own IntegerRangeInputFormat in java for now, but will try this out when I can

@llasram
Copy link
Member

llasram commented Feb 5, 2015

Parkour 0.6.2 is now released and includes the basic interface mentioned. This basic interface does not include providing access to the FileInputFormat utility code, so I'm going to leave this issue open for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants