\documentclass[nojss]{jss}
\usepackage{graphicx,keyval,thumbpdf,url}

\newcommand{\sQuote}[1]{`{#1}'}
\newcommand{\dQuote}[1]{``{#1}''}
\newcommand{\file}[1]{\sQuote{\textsf{#1}}}
\newcommand{\class}[1]{\code{"#1"}}

%% need no \usepackage{Sweave.sty}
%%\SweaveOpts{engine=R,eps=FALSE,results=verbatim,fig=FALSE,echo=TRUE,strip.white=true}
\AtBeginDocument{\setkeys{Gin}{width=0.6\textwidth}}

\date{\today}
\title{Distributed Storage and Lists}
\Plaintitle{Distributed Storage and Lists}
\Shorttitle{DSL}
\author{Stefan Theu\ss{}l}
\Plainauthor{Stefan Theussl}
%% \VignetteIndexEntry{DSL}

\Abstract{
  Distributed lists are list-type objects where elements (i.e.,
  arbitrary \proglang{R} objects) are stored in serialized form on a
  distributed storage. The latter is often used in high performance
  computing environments to process large quantities of data. First
  proposed by Google, data located in such an environment is most
  efficiently processed using the MapReduce programming model. The
  \proglang{R} package~\pkg{DSL} provides an environment for creating
  and handling of distributed lists. The package allows to make use of
  different types of storage backends, in particular the Hadoop
  Distributed File System. Furthermore, it offers functionality to
  operate on such lists efficiently using the MapReduce programming
  model.
}
\Keywords{\proglang{R}, lists, MapReduce}
\Plainkeywords{R, lists, MapReduce}

\Address{
  Stefan Theu\ss{}l\\
%%  Institute for Statistics and Mathematics\\
%%  WU Wirtschaftsuniversit\"at Wien\\
%%  Augasse 2--6\\
%%  1090 Wien, Austria
  E-mail: \email{Stefan.Theussl@R-project.org}\\
%%  URL: \url{http://statmath.wu.ac.at/~theussl/}\\
}



\begin{document}

<<init, echo=FALSE>>=
options(width = 60)
require("DSL")
@ %

\maketitle
\sloppy{}



\section{Introduction}
\label{sec:introduction}

\emph{Distributed lists} are list-type objects using a
\emph{distributed storage} to store their elements. Typically,
distributed lists are advantageous in environments where large
quantities of data need to be processed at once since all data is
stored out of the main memory which is often limited. Usually, a
``distributed file system'' (DFS) can serve as a container to hold the
data on a distributed storage. Such a container can hold arbitrary
objects by serializing them to files.

A recurrent function when computing on lists in
\proglang{R}~\citep[][]{Rcore:2011} is \code{lapply()} and variants
thereof. Conceptually, this is similar to a ``Map'' function from
functional programming where a given (\proglang{R}) function is
applied to each element of a vector (or in this case a
list). Furthermore, another typical type of function often applied to
lists is a function which combines contained elements. In functional
programming this is called ``Reduce'' but variants thereof also exists
in other areas (e.g., in the MPI standard, see
\url{http://www.mpi-forum.org/docs/mpi22-report/node103.htm#Node103}).

First proposed by Google the Map and Reduce functions are often
sufficient to express many tasks for analyzing large data sets. They
implement a framework which follows closely the MapReduce programming
model~\citep[see][and
\url{http://en.wikipedia.org/wiki/MapReduce}]{Dean+Ghemawat:2004}. Note
however, that as pointed out e.g., in~\cite{Laemmel:2007} Map and
Reduce operations in the MapReduce programming model do not
necessarily follow the definition from functional programming.  It
rather aims to support computation (i.e., map and reduction
operations) on large data sets on clusters of workstations in a
distributed manner. Provided each mapping operation is independent of
the others, all maps can be performed in parallel. Hadoop
(\url{http://hadoop.apache.org/}) is an open source variant of this
framework.

Package~\pkg{DSL} is an extension package for \proglang{R} for
creating and handling list-type objects whose elements are stored
using a distributed storage backend. For operating on such distributed
lists efficiently the package offers methods and functions from the
MapReduce programming model. In particular, \pkg{DSL} allows to make
use of the Hadoop Distributed File System~\citep[HDFS,
see][]{Borthakur:2010} and Hadoop Streaming (MapReduce) for storing
and distributed processing of data. In
Section~\ref{sec:design+implementation}, we describe the underlying
data structures, and the MapReduce functionality.  Examples are
discussed in Section~\ref{sec:examples}.
Section~\ref{sec:conclusion+outlook} concludes the paper.


\section{Design and Implementation}
\label{sec:design+implementation}

\subsection{Data Structures}

\subsubsection{Distributed Storage}

The S3 class \class{DStorage} defines a virtual storage where files
are kept on a file system which possibly spans over several
workstations. Data is distributed automatically among these nodes
when using such a file system. Objects of class \class{DStorage}
``know'' how to use the corresponding file system by supplied accessor
and modifier methods. The following file systems are supported to be
used as distributed storage (DS):

\begin{description}
\item[\code{"LFS"}:] the local file system. This type uses functions
  and methods from the packages \pkg{base} and \pkg{utils} delivered
  with the \proglang{R} distribution to handle files.
\item[\code{"HDFS"}:] the Hadoop distributed file system. Functions
  and Methods from package
  \pkg{hive}~\citep[][]{Theussl+Feinerer:2011} are used to interact
  with the HDFS.
\end{description}

Essentially, such a class needs methods for reading and writing to the
distributed storage (DS). Note however that files are typically
organized according to a published standard. Thus, one should not
write or modify arbitrary files or directories on such a file
system. To account for this, class \class{DStorage} specifies a
directory \code{base\_dir} which can be modified freely but avoids
that read/write operations can escape from that directory. The
following (\pkg{DSL}-internal) methods are available for objects of
class \class{DStorage}.

\begin{itemize}
\item \code{DS\_dir\_create()}
\item \code{DS\_get()}
\item \code{DS\_list\_directory()}
\item \code{DS\_put()}
\item \code{DS\_read\_lines()}
\item \code{DS\_unlink()}
\item \code{DS\_write\_lines()}
\end{itemize}

Depending on the type of storage suitable functions from different
packages will be used to interact with the corresponding file
system. Whereas \code{DS\_dir\_create()},
\code{DS\_list\_directory()}, \code{DS\_read\_lines()},
\code{DS\_unlink()}, and \code{DS\_write\_lines()} mimic the behavior
of corresponding functions of package \pkg{base} (\code{dir.create()},
\code{dir()}, \code{readLines}, \code{unlink()}, and
\code{writeLines()}, respectively), functions \code{DS\_get()} and
\code{DS\_put()} can be used to read/write \proglang{R} objects from/to
disk.

The main reason of having such a virtual storage class in \proglang{R}
is that it allows for easy extension of memory space in the
\proglang{R} working environment. E.g., this storage can be used to
store arbitrary (serialized) \proglang{R} objects. These objects are
only loaded to the current working environment (i.e., into RAM) when
they are needed for computation. However, it is in most cases not a
good idea to place many small files on such a file system due to
efficiency reasons. Putting several serialized \proglang{R} objects
into files of a certain maximum size (e.g., line by line as key/value
pairs) circumvents this issue. Indeed, frameworks like Hadoop benefit
from such a setup \citep[see Section \emph{Data Organization}
in][]{Borthakur:2010}. Thus, a constructor function must take the
following arguments:
\begin{description}
\item[\code{type}:] the file system type,
\item[\code{base\_dir}:] the directory under which chunks of
  serialized objects are to be stored,
\item[\code{chunksize}:] the maximal size of a single chunk.
\end{description}

E.g., a DS of \code{type} \code{"LFS"} using the system-wide or a
user-defined \emph{temporary directory} as the base directory
(\code{base\_dir}) and a chunk size of 10MB can be instanciated using
the function \code{DStorage()}:
<<ds_create>>=
ds <- DStorage( type = "LFS", base_dir = tempdir(),
                chunksize = 10 * 1024^2 )
@

Further methods to class \class{DStorage} are a corresponding
\code{print()} and a \code{summary()} method.
<<ds_print_summary>>=
ds
summary(ds)
@

\subsubsection{Distributed Lists}

Distributed lists are defined in \proglang{R} by the S3 class
\class{DList}. Objects of this class behave similar to standard
\proglang{R} lists but use a distributed storage of class
\class{DStorage} to store their elements. Distributed lists can be
easily constructed using the function \code{DList()} or can be coerced
using the generic function \code{as.DList()}. Available methods
support coercion of \proglang{R} lists and character vectors
representing paths to data repositories as well as coercion of
\class{DList} objects to lists.

<<dl_create>>=
dl <- DList( letters = letters, numbers = 0:9  )
l <- as.list( letters )
names(l) <- LETTERS
dl2 <- as.DList(l)
identical( as.list(dl2), l )
dl3 <- as.DList( system.file("examples", package = "DSL") )
@

Note that the above example uses a default storage type, namely
\code{"LFS"} using a \emph{temporary directory} generated with
\code{tempdir()} as the base directory. In order to set a user defined
storage the \code{DStorage} argument to the \code{DList()}
constructor is used.

<<dl_create2>>=
dl <- DList( letters = letters, numbers = 0:9, DStorage = ds )
@

Conceptually we want a distributed list to support a set
of intuitive operations, like accessing each element (stored somewhere
on a DFS) in a direct way,
displaying the distributed list and each individual
element, obtaining information about basic properties (e.g., the
length of the list), or applying some operation on a
range of elements. These requirements are formalized via a set of
interfaces which must be implemented by the \class{DList} class:
\begin{description}
\item[Display] Since elements of the list are not directly available
  the \code{print} and \code{summary} methods should provide other
  useful information about the distributed list (like the number of
  list elements).
\item[Length] The \code{length()} function must return the number of
  list elements.
\item[Names] Named list must be supported.
\item[Subset] The \code{[[}%]]
  \ operator must be implemented so that individual elements of the
  distributed list can be retrieved.
\item[MapReduce] Map and Reduce operations as well as variants of
  \code{lapply} (which are conceptually similar to Map) can be used to
  express most of the computation on \class{DList} objects.
\end{description}

<<dl_methods>>=
#dl
summary(dl)
names( dl2 )
length(dl3)
dl3[[1]]
@

MapReduce is discussed in more detail in the next section.


\subsection{Methods on Distributed Lists}
\label{sec:methods}

The MapReduce programming model as defined
by~\cite{Dean+Ghemawat:2004} is as follows. The computation takes a
set of input key/value pairs, and produces a set of output key/value
pairs. The user expresses the computation as two functions: Map and
Reduce. The Map function takes an input pair and produces a set of
intermediate key/value pairs. The Reduce function accepts an
intermediate key and a set of values for that key (possibly grouped by
the MapReduce library). It merges these values together to form a
possibly smaller set of values. Typically, just zero or one output
value is produced per reduce invocation. Furthermore, data is usually
stored on a (distributed) file system which is recognized by the
MapReduce library. This allows such a framework to handle lists of
values (here objects of class \class{DList}) that are too large to fit
in main memory (i.e., RAM).

\begin{description}
\item[\code{DGather}:] this collective operation is similar to an
  MPI\_GATHER
  (\url{http://www.mpi-forum.org/docs/mpi22-report/node95.htm#Node95}). However,
  instead of collecting results from processes running in parallel,
  \code{DGather()} collects the contents of chunks holding the
  elements of a \class{DList}. By default a named list of length the
  number of chunks is to be returned. Its elements are character
  vectors of values from key/value pairs stored in chunks read line by
  line from the corresponding chunk. Alternatively, \code{DGather()}
  can be used to retrieve the keys only.
\item[\code{DLapply}:] is an (l)apply-type function which is used to
  iteratively \emph{apply} a function to a set of input values. In
  case of \code{DLapply()} input values are elements of \class{DList}
  objects (i.e., the value of a key/value pair). A distributed list of
  the same length is to be returned.
\item[\code{DMap}:] is similar to \code{DLapply()} above but always
  takes both the key and the value from a key/value pair as
  input. Thus, keys can also be modified. Indeed, the returned object
  can differ in length from the original as opposed to when using
  \code{DLapply}.
\item[\code{DReduce}:] this collective operation takes a set of
  (intermediate) key/value pairs and combines
  values with the same associated key using a given directive (the
  reduce function). By default values are concatinated using the
  \code{c()} operator.
\end{description}

<<dl_mapreduce>>=
dl <- DList( line1 = "This is the first line.",
             line2 = "Now, the second line." )
res <- DLapply( dl, function(x) unlist(strsplit(x, " ")) )
as.list( res )

foo <- function( keypair )
    list( key = paste("next_", keypair$key, sep = ""), value =
         gsub("first", "mapped", keypair$value) )

dlm <- DMap( x = dl, MAP = foo)
## retrieve keys
unlist(DGather(dlm, keys = TRUE, names = FALSE))
## retrieve values
as.list( dlm )
@

Further methods on \class{DList} objects are prefixed with
\code{DL\_}. Currently, only methods for interacting with
the underlying \class{DStorage}  are available.

\begin{description}
\item[\code{DL\_storage}:] accesses the storage of \class{DList}
  objects. Returns objects of class \class{DStorage}.
\item[\code{DL\_storage<-}:] replaces the storage  in \class{DList}
  objects. Data is automatically transferred to the new storage.
\end{description}

<<dl_stor_replace, eval = FALSE>>=
l <- list( line1 = "This is the first line.",
           line2 = "Now, the second line." )
dl <- as.DList( l )
DL_storage(dl)
ds <- DStorage("HDFS", tempdir())
DL_storage(dl) <- ds
as.list(dl)
@

\section{Examples}
\label{sec:examples}

\subsection{Word Count}

This examples demonstrates how \code{Dmap()} and \code{DReduce()} can
be used to count words based on text files located somewhere on a
given file system. The following two files contained in the example
directory of the package will be used.
<<ex1_files>>=
## simple wordcount based on two files:
dir(system.file("examples", package = "DSL"))
@
We use a temporary directory as the base directory of a new \class{DStorage}
object. By setting the maximum chunk size to 1 Byte we force the name
of each file being placed in a separate chunk. Then we store the
absolute path to the text files as elements of a \class{DList} object.
<<ex1_stor>>=
## first force 1 chunk per file (set max chunk size to 1 byte):
ds <- DStorage("LFS", tempdir(), chunksize = 1L)
## make "DList", i.e., read file contents and store in chunks
dl <- as.DList( system.file("examples", package = "DSL"),
                DStorage = ds )
@
Data is read into chunks (one per original file) by using a simple
call of \code{DMap()} on the distributed list.
<<ex1_read>>=
## read files
dl <- DMap(dl, function( keypair ){
    list( key = keypair$key,
          value = tryCatch(readLines(keypair$value),
                           error = function(x) NA) )
})
@
The contents of the files is split into words using the following call.
<<ex1_map>>=
## split into terms
splitwords <- function( keypair ){
    keys <- unlist(strsplit(keypair$value, " "))
    mapply( function(key, value) list( key = key, value = value),
            keys, rep(1L, length(keys)),
            SIMPLIFY = FALSE, USE.NAMES = FALSE )
}
res <- DMap( dl, splitwords )
as.list(res)
@
Eventually, collected intermediate results are summed.
<<ex1_reduce>>=
## now aggregate by term
res <- DReduce( res, sum )
as.list( res )
@


% \subsection{Temperature}

% TODO: Example from Hadoop book. around 30 GB of raw data.

% <<>>=
% data <- "~/tmp/NCDC"
% require("hive")
% require("DSL")

% ## first force 1 chunk per file (set max chunk size to 1 byte):
% ds <- DStorage("LFS", tempdir(), chunksize = 1L)
% ## make "DList", i.e., read file contents and store in chunks
% dl <- as.DList( data,
%                 DStorage = ds )

% dl <- DMap(dl, function( keypair ){
%     con <- gzfile(keypair$value)
%     lines <- tryCatch(readLines(con),
%                       error = function(x) NA)
%     close( con )
%     mapply( function(key, value) list( key = key, value = value),
%            keypair$key, lines,
%            SIMPLIFY = FALSE, USE.NAMES = FALSE )
% })

% ds <- DStorage( "HDFS", tempdir() )

% DL_storage(dl) <- ds

% int <- DMap(dl, function( keypair ){
%    airtemp <- as.integer( substr(keypair$value, 88, 92) )
%    if( airtemp == 9999 || (! as.integer(substr(keypair$value, 93, 93))
%    %in% c(0L,1L,4L,5L,9L)) )
%      airtemp <- NA
%    list(key = substr(keypair$value, 16, 19),
%         value = airtemp)
% })

% maxtemp <- DReduce( int, function(x) max(x, na.rm = TRUE) )

% as.list(maxtemp)
% @


\section{Conclusion and Outlook}
\label{sec:conclusion+outlook}

Package~\pkg{DSL} was designed to allow for handling of large data
sets not fitting into main memory. The main data structure is the
class \class{DList} which is a list-type object storing its elements
on a virtual storage of class \class{DStorage}. The package currently
provides basic data structures for creating and handling \class{DList}
and \class{DStorage} objects, and facilities for computing on these,
including map and reduction methods based on the MapReduce paradigm.

Possible future extensions include:
\begin{itemize}
 \item \class{DStorage} interface to NoSQL database systems,
 \item better integration of the \pkg{parallel} package. Currently
   only the \emph{multicore} version of \code{lapply} is used for
   \code{"LFS"} type \class{DStorage}.
 \end{itemize}

\subsubsection*{Acknowledgments}

We are grateful to Christian Buchta for providing efficient
\proglang{C} code for collecting partial results in \code{DReduce()}.

{\small
  \bibliography{DSL}
}

\end{document}