Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/QueueName.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/QueueName.java?rev=1234070&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/QueueName.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/QueueName.java Fri Jan 20 18:55:20 2012 @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen.datatypes; + + +/** + * Represents a queue name. + */ +public class QueueName extends DefaultAnonymizableDataType { + private final String queueName; + + public QueueName(String queueName) { + super(); + this.queueName = queueName; + } + + @Override + public String getValue() { + return queueName; + } + + @Override + protected String getPrefix() { + return "queue"; + }; +} \ No newline at end of file Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/UserName.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/UserName.java?rev=1234070&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/UserName.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/UserName.java Fri Jan 20 18:55:20 2012 @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen.datatypes; + +/** + * Represents a user's name. + */ +public class UserName extends DefaultAnonymizableDataType { + private final String userName; + + public UserName(String userName) { + super(); + this.userName = userName; + } + + @Override + public String getValue() { + return userName; + } + + @Override + protected String getPrefix() { + return "user"; + } +} \ No newline at end of file Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/DefaultJobPropertiesParser.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/DefaultJobPropertiesParser.java?rev=1234070&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/DefaultJobPropertiesParser.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/DefaultJobPropertiesParser.java Fri Jan 20 18:55:20 2012 @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen.datatypes.util; + +import org.apache.hadoop.tools.rumen.datatypes.DataType; +import org.apache.hadoop.tools.rumen.datatypes.DefaultDataType; + +/** + * A simple job property parser that acts like a pass-through filter. + */ +public class DefaultJobPropertiesParser implements JobPropertyParser { + @Override + public DataType parseJobProperty(String key, String value) { + return new DefaultDataType(value); + } +} \ No newline at end of file Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/JobPropertyParser.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/JobPropertyParser.java?rev=1234070&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/JobPropertyParser.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/JobPropertyParser.java Fri Jan 20 18:55:20 2012 @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen.datatypes.util; + +import org.apache.hadoop.tools.rumen.datatypes.DataType; +import org.apache.hadoop.tools.rumen.datatypes.JobProperties; + +/** + * A {@link JobProperties} parsing utility. + */ +public interface JobPropertyParser { + /** + * Parse the specified job configuration key-value pair. + * + * @return Returns a {@link DataType} if this parser can parse this value. + * Returns 'null' otherwise. + */ + public DataType parseJobProperty(String key, String value); +} Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/MapReduceJobPropertiesParser.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/MapReduceJobPropertiesParser.java?rev=1234070&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/MapReduceJobPropertiesParser.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/MapReduceJobPropertiesParser.java Fri Jan 20 18:55:20 2012 @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen.datatypes.util; + +import java.lang.reflect.Field; +import java.text.DecimalFormat; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.tools.rumen.datatypes.*; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +/** + * A default parser for MapReduce job configuration properties. + * MapReduce job configuration properties are represented as key-value pairs. + * Each key represents a configuration knob which controls or affects the + * behavior of a MapReduce job or a job's task. The value associated with the + * configuration key represents its value. Some of the keys are deprecated. As a + * result of deprecation some keys change or are preferred over other keys, + * across versions. {@link MapReduceJobPropertiesParser} is a utility class that + * parses MapReduce job configuration properties and converts the value into a + * well defined {@link DataType}. Users can use the + * {@link MapReduceJobPropertiesParser#parseJobProperty()} API to process job + * configuration parameters. This API will parse a job property represented as a + * key-value pair and return the value wrapped inside a {@link DataType}. + * Callers can then use the returned {@link DataType} for further processing. + * + * {@link MapReduceJobPropertiesParser} thrives on the key name to decide which + * {@link DataType} to wrap the value with. Values for keys representing + * job-name, queue-name, user-name etc are wrapped inside {@link JobName}, + * {@link QueueName}, {@link UserName} etc respectively. Keys ending with *dir* + * are considered as a directory and hence gets be wrapped inside + * {@link FileName}. Similarly key ending with *codec*, *log*, *class* etc are + * also handled accordingly. Values representing basic java data-types like + * integer, float, double, boolean etc are wrapped inside + * {@link DefaultDataType}. If the key represents some jvm-level settings then + * only standard settings are extracted and gets wrapped inside + * {@link DefaultDataType}. Currently only '-Xmx' and '-Xms' settings are + * considered while the rest are ignored. + * + * Note that the {@link MapReduceJobPropertiesParser#parseJobProperty()} API + * maps the keys to a configuration parameter listed in + * {@link MRJobConfig}. This not only filters non-framework specific keys thus + * ignoring user-specific and hard-to-parse keys but also provides a consistent + * view for all possible inputs. So if users invoke the + * {@link MapReduceJobPropertiesParser#parseJobProperty()} API with either + * <"mapreduce.job.user.name", "bob"> or <"user.name", "bob">, then the result + * would be a {@link UserName} {@link DataType} wrapping the user-name "bob". + */ +@SuppressWarnings("deprecation") +public class MapReduceJobPropertiesParser implements JobPropertyParser { + private Field[] mrFields = MRJobConfig.class.getFields(); + private DecimalFormat format = new DecimalFormat(); + private JobConf configuration = new JobConf(false); + private static final Pattern MAX_HEAP_PATTERN = + Pattern.compile("-Xmx[0-9]+[kKmMgGtT]?+"); + private static final Pattern MIN_HEAP_PATTERN = + Pattern.compile("-Xms[0-9]+[kKmMgGtT]?+"); + + // turn off the warning w.r.t deprecated mapreduce keys + static { + Logger.getLogger(Configuration.class).setLevel(Level.OFF); + } + + // Accepts a key if there is a corresponding key in the current mapreduce + // configuration + private boolean accept(String key) { + return getLatestKeyName(key) != null; + } + + // Finds a corresponding key for the specified key in the current mapreduce + // setup. + // Note that this API uses a cached copy of the Configuration object. This is + // purely for performance reasons. + private String getLatestKeyName(String key) { + // set the specified key + configuration.set(key, key); + try { + // check if keys in MRConfig maps to the specified key. + for (Field f : mrFields) { + String mrKey = f.get(f.getName()).toString(); + if (configuration.get(mrKey) != null) { + return mrKey; + } + } + + // unset the key + return null; + } catch (IllegalAccessException iae) { + throw new RuntimeException(iae); + } finally { + // clean up! + configuration.clear(); + } + } + + @Override + public DataType parseJobProperty(String key, String value) { + if (accept(key)) { + return fromString(key, value); + } + + return null; + } + + /** + * Extracts the -Xmx heap option from the specified string. + */ + public static void extractMaxHeapOpts(String javaOptions, + List heapOpts, + List others) { + for (String opt : javaOptions.split(" ")) { + Matcher matcher = MAX_HEAP_PATTERN.matcher(opt); + if (matcher.find()) { + heapOpts.add(opt); + } else { + others.add(opt); + } + } + } + + /** + * Extracts the -Xms heap option from the specified string. + */ + public static void extractMinHeapOpts(String javaOptions, + List heapOpts, List others) { + for (String opt : javaOptions.split(" ")) { + Matcher matcher = MIN_HEAP_PATTERN.matcher(opt); + if (matcher.find()) { + heapOpts.add(opt); + } else { + others.add(opt); + } + } + } + + // Maps the value of the specified key. + private DataType fromString(String key, String value) { + if (value != null) { + // check known configs + // job-name + String latestKey = getLatestKeyName(key); + + if (MRJobConfig.JOB_NAME.equals(latestKey)) { + return new JobName(value); + } + // user-name + if (MRJobConfig.USER_NAME.equals(latestKey)) { + return new UserName(value); + } + // queue-name + if (MRJobConfig.QUEUE_NAME.equals(latestKey)) { + return new QueueName(value); + } + if (MRJobConfig.MAP_JAVA_OPTS.equals(latestKey) + || MRJobConfig.REDUCE_JAVA_OPTS.equals(latestKey)) { + List heapOptions = new ArrayList(); + extractMaxHeapOpts(value, heapOptions, new ArrayList()); + extractMinHeapOpts(value, heapOptions, new ArrayList()); + return new DefaultDataType(StringUtils.join(heapOptions, ' ')); + } + + //TODO compression? + //TODO Other job configs like FileOutputFormat/FileInputFormat etc + + // check if the config parameter represents a number + try { + format.parse(value); + return new DefaultDataType(value); + } catch (ParseException pe) {} + + // check if the config parameters represents a boolean + // avoiding exceptions + if ("true".equals(value) || "false".equals(value)) { + Boolean.parseBoolean(value); + return new DefaultDataType(value); + } + + // check if the config parameter represents a class + if (latestKey.endsWith(".class") || latestKey.endsWith(".codec")) { + return new ClassName(value); + } + + // handle distributed cache sizes and timestamps + if (latestKey.endsWith("sizes") + || latestKey.endsWith(".timestamps")) { + new DefaultDataType(value); + } + + // check if the config parameter represents a file-system path + //TODO: Make this concrete .location .path .dir .jar? + if (latestKey.endsWith(".dir") || latestKey.endsWith(".location") + || latestKey.endsWith(".jar") || latestKey.endsWith(".path") + || latestKey.endsWith(".logfile") || latestKey.endsWith(".file") + || latestKey.endsWith(".files") || latestKey.endsWith(".archives")) { + try { + return new FileName(value); + } catch (Exception ioe) {} + } + } + + return null; + } +} \ No newline at end of file Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/BlockingSerializer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/BlockingSerializer.java?rev=1234070&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/BlockingSerializer.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/BlockingSerializer.java Fri Jan 20 18:55:20 2012 @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen.serializers; + +import java.io.IOException; + +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.map.JsonSerializer; +import org.codehaus.jackson.map.SerializerProvider; + +/** + * A JSON serializer for Strings. + */ +public class BlockingSerializer extends JsonSerializer { + + public void serialize(String object, JsonGenerator jGen, SerializerProvider sProvider) + throws IOException, JsonProcessingException { + jGen.writeNull(); + }; +} Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultAnonymizingRumenSerializer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultAnonymizingRumenSerializer.java?rev=1234070&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultAnonymizingRumenSerializer.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultAnonymizingRumenSerializer.java Fri Jan 20 18:55:20 2012 @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen.serializers; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.tools.rumen.datatypes.AnonymizableDataType; +import org.apache.hadoop.tools.rumen.state.StatePool; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.map.JsonSerializer; +import org.codehaus.jackson.map.SerializerProvider; + +/** + * Default Rumen JSON serializer. + */ +@SuppressWarnings("unchecked") +public class DefaultAnonymizingRumenSerializer + extends JsonSerializer { + private StatePool statePool; + private Configuration conf; + + public DefaultAnonymizingRumenSerializer(StatePool statePool, + Configuration conf) { + this.statePool = statePool; + this.conf = conf; + } + + public void serialize(AnonymizableDataType object, JsonGenerator jGen, + SerializerProvider sProvider) + throws IOException, JsonProcessingException { + Object val = object.getAnonymizedValue(statePool, conf); + // output the data if its a string + if (val instanceof String) { + jGen.writeString(val.toString()); + } else { + // let the mapper (JSON generator) handle this anonymized object. + jGen.writeObject(val); + } + }; +} \ No newline at end of file Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultRumenSerializer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultRumenSerializer.java?rev=1234070&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultRumenSerializer.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultRumenSerializer.java Fri Jan 20 18:55:20 2012 @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen.serializers; + +import java.io.IOException; + +import org.apache.hadoop.tools.rumen.datatypes.DataType; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.map.JsonSerializer; +import org.codehaus.jackson.map.SerializerProvider; + +/** + * Default Rumen JSON serializer. + */ +@SuppressWarnings("unchecked") +public class DefaultRumenSerializer extends JsonSerializer { + public void serialize(DataType object, JsonGenerator jGen, SerializerProvider sProvider) + throws IOException, JsonProcessingException { + Object data = object.getValue(); + if (data instanceof String) { + jGen.writeString(data.toString()); + } else { + jGen.writeObject(data); + } + }; +} \ No newline at end of file Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/ObjectStringSerializer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/ObjectStringSerializer.java?rev=1234070&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/ObjectStringSerializer.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/ObjectStringSerializer.java Fri Jan 20 18:55:20 2012 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen.serializers; + +import java.io.IOException; + +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.map.JsonSerializer; +import org.codehaus.jackson.map.SerializerProvider; + +/** + * Rumen JSON serializer for serializing object using toSring() API. + */ +public class ObjectStringSerializer extends JsonSerializer { + public void serialize(T object, JsonGenerator jGen, SerializerProvider sProvider) + throws IOException, JsonProcessingException { + jGen.writeString(object.toString()); + }; +} \ No newline at end of file Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/State.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/State.java?rev=1234070&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/State.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/State.java Fri Jan 20 18:55:20 2012 @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen.state; + +import org.codehaus.jackson.annotate.JsonIgnore; + +/** + * Represents a state. This state is managed by {@link StatePool}. + * + * Note that a {@link State} objects should be persistable. Currently, the + * {@link State} objects are persisted using the Jackson JSON library. Hence the + * implementors of the {@link State} interface should be careful while defining + * their public setter and getter APIs. + */ +public interface State { + /** + * Returns true if the state is updated since creation (or reload). + */ + @JsonIgnore + boolean isUpdated(); + + /** + * Get the name of the state. + */ + public String getName(); + + /** + * Set the name of the state. + */ + public void setName(String name); +} Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StateDeserializer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StateDeserializer.java?rev=1234070&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StateDeserializer.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StateDeserializer.java Fri Jan 20 18:55:20 2012 @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen.state; + +import java.io.IOException; + +import org.apache.hadoop.tools.rumen.state.StatePool.StatePair; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.map.DeserializationContext; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.deser.StdDeserializer; +import org.codehaus.jackson.node.ObjectNode; + +/** + * Rumen JSON deserializer for deserializing the {@link State} object. + */ +public class StateDeserializer extends StdDeserializer { + public StateDeserializer() { + super(StatePair.class); + } + + @Override + public StatePair deserialize(JsonParser parser, + DeserializationContext context) + throws IOException, JsonProcessingException { + ObjectMapper mapper = (ObjectMapper) parser.getCodec(); + // set the state-pair object tree + ObjectNode statePairObject = (ObjectNode) mapper.readTree(parser); + Class stateClass = null; + + try { + stateClass = + Class.forName(statePairObject.get("className").getTextValue().trim()); + } catch (ClassNotFoundException cnfe) { + throw new RuntimeException("Invalid classname!", cnfe); + } + + String stateJsonString = statePairObject.get("state").toString(); + State state = (State) mapper.readValue(stateJsonString, stateClass); + + return new StatePair(state); + } +} \ No newline at end of file Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StatePool.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StatePool.java?rev=1234070&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StatePool.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StatePool.java Fri Jan 20 18:55:20 2012 @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen.state; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.HashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.rumen.Anonymizer; +import org.apache.hadoop.tools.rumen.datatypes.DataType; +import org.codehaus.jackson.JsonEncoding; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.Version; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.codehaus.jackson.map.module.SimpleModule; + +/** + * A pool of states. States used by {@link DataType}'s can be managed the + * {@link StatePool}. {@link StatePool} also supports persistence. Persistence + * is key to share states across multiple {@link Anonymizer} runs. + */ +@SuppressWarnings("unchecked") +public class StatePool { + private static final long VERSION = 1L; + private boolean isUpdated = false; + private boolean isInitialized = false; + private Configuration conf; + + // persistence configuration + public static final String DIR_CONFIG = "rumen.anonymization.states.dir"; + public static final String RELOAD_CONFIG = + "rumen.anonymization.states.reload"; + public static final String PERSIST_CONFIG = + "rumen.anonymization.states.persist"; + + // internal state management configs + private static final String COMMIT_STATE_FILENAME = "latest"; + private static final String CURRENT_STATE_FILENAME = "temp"; + + private String timeStamp; + private Path persistDirPath; + private boolean reload; + private boolean persist; + + /** + * A wrapper class that binds the state implementation to its implementing + * class name. + */ + public static class StatePair { + private String className; + private State state; + + public StatePair(State state) { + this.className = state.getClass().getName(); + this.state = state; + } + + public String getClassName() { + return className; + } + + public void setClassName(String className) { + this.className = className; + } + + public State getState() { + return state; + } + + public void setState(State state) { + this.state = state; + } + } + + /** + * Identifies to identify and cache {@link State}s. + */ + private HashMap pool = new HashMap(); + + public void addState(Class id, State state) { + if (pool.containsKey(id.getName())) { + throw new RuntimeException("State '" + state.getName() + "' added for the" + + " class " + id.getName() + " already exists!"); + } + isUpdated = true; + pool.put(id.getName(), new StatePair(state)); + } + + public State getState(Class clazz) { + return pool.containsKey(clazz.getName()) + ? pool.get(clazz.getName()).getState() + : null; + } + + // For testing + @JsonIgnore + public boolean isUpdated() { + if (!isUpdated) { + for (StatePair statePair : pool.values()) { + // if one of the states have changed, then the pool is dirty + if (statePair.getState().isUpdated()) { + isUpdated = true; + return true; + } + } + } + return isUpdated; + } + + /** + * Initialized the {@link StatePool}. This API also reloads the previously + * persisted state. Note that the {@link StatePool} should be initialized only + * once. + */ + public void initialize(Configuration conf) throws Exception { + if (isInitialized) { + throw new RuntimeException("StatePool is already initialized!"); + } + + this.conf = conf; + String persistDir = conf.get(DIR_CONFIG); + reload = conf.getBoolean(RELOAD_CONFIG, false); + persist = conf.getBoolean(PERSIST_CONFIG, false); + + // reload if configured + if (reload || persist) { + System.out.println("State Manager initializing. State directory : " + + persistDir); + System.out.println("Reload:" + reload + " Persist:" + persist); + if (persistDir == null) { + throw new RuntimeException("No state persist directory configured!" + + " Disable persistence."); + } else { + this.persistDirPath = new Path(persistDir); + } + } else { + System.out.println("State Manager disabled."); + } + + // reload + reload(); + + // now set the timestamp + DateFormat formatter = + new SimpleDateFormat("dd-MMM-yyyy-hh'H'-mm'M'-ss'S'"); + Calendar calendar = Calendar.getInstance(); + calendar.setTimeInMillis(System.currentTimeMillis()); + timeStamp = formatter.format(calendar.getTime()); + + isInitialized = true; + } + + private void reload() throws Exception { + if (reload) { + // Reload persisted entries + Path stateFilename = new Path(persistDirPath, COMMIT_STATE_FILENAME); + FileSystem fs = stateFilename.getFileSystem(conf); + if (fs.exists(stateFilename)) { + reloadState(stateFilename, conf); + } else { + throw new RuntimeException("No latest state persist directory found!" + + " Disable persistence and run."); + } + } + } + + private void reloadState(Path stateFile, Configuration conf) + throws Exception { + FileSystem fs = stateFile.getFileSystem(conf); + if (fs.exists(stateFile)) { + System.out.println("Reading state from " + stateFile.toString()); + FSDataInputStream in = fs.open(stateFile); + + read(in); + in.close(); + } else { + System.out.println("No state information found for " + stateFile); + } + } + + private void read(DataInput in) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure( + DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true); + + // define a module + SimpleModule module = new SimpleModule("State Serializer", + new Version(0, 1, 1, "FINAL")); + // add the state deserializer + module.addDeserializer(StatePair.class, new StateDeserializer()); + + // register the module with the object-mapper + mapper.registerModule(module); + + JsonParser parser = + mapper.getJsonFactory().createJsonParser((DataInputStream)in); + StatePool statePool = mapper.readValue(parser, StatePool.class); + this.setStates(statePool.getStates()); + parser.close(); + } + + /** + * Persists the current state to the state directory. The state will be + * persisted to the 'latest' file in the state directory. + */ + public void persist() throws IOException { + if (!persist) { + return; + } + if (isUpdated()) { + System.out.println("State is updated! Committing."); + Path currStateFile = new Path(persistDirPath, CURRENT_STATE_FILENAME); + Path commitStateFile = new Path(persistDirPath, COMMIT_STATE_FILENAME); + FileSystem fs = currStateFile.getFileSystem(conf); + + System.out.println("Starting the persist phase. Persisting to " + + currStateFile.toString()); + // persist current state + // write the contents of the current state to the current(temp) directory + FSDataOutputStream out = fs.create(currStateFile, true); + write(out); + out.close(); + + System.out.println("Persist phase over. The best known un-committed state" + + " is located at " + currStateFile.toString()); + + // commit (phase-1) + // copy the previous commit file to the relocation file + if (fs.exists(commitStateFile)) { + Path commitRelocationFile = new Path(persistDirPath, timeStamp); + System.out.println("Starting the pre-commit phase. Moving the previous " + + "best known state to " + commitRelocationFile.toString()); + // copy the commit file to the relocation file + FileUtil.copy(fs,commitStateFile, fs, commitRelocationFile, false, + conf); + } + + // commit (phase-2) + System.out.println("Starting the commit phase. Committing the states in " + + currStateFile.toString()); + FileUtil.copy(fs, currStateFile, fs, commitStateFile, true, true, conf); + + System.out.println("Commit phase successful! The best known committed " + + "state is located at " + commitStateFile.toString()); + } else { + System.out.println("State not updated! No commit required."); + } + } + + private void write(DataOutput out) throws IOException { + // This is just a JSON experiment + System.out.println("Dumping the StatePool's in JSON format."); + ObjectMapper outMapper = new ObjectMapper(); + outMapper.configure( + SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true); + // define a module + SimpleModule module = new SimpleModule("State Serializer", + new Version(0, 1, 1, "FINAL")); + // add the state serializer + //module.addSerializer(State.class, new StateSerializer()); + + // register the module with the object-mapper + outMapper.registerModule(module); + + JsonFactory outFactory = outMapper.getJsonFactory(); + JsonGenerator jGen = + outFactory.createJsonGenerator((DataOutputStream)out, JsonEncoding.UTF8); + jGen.useDefaultPrettyPrinter(); + + jGen.writeObject(this); + jGen.close(); + } + + /** + * Getters and setters for JSON serialization + */ + + /** + * To be invoked only by the Jackson JSON serializer. + */ + public long getVersion() { + return VERSION; + } + + /** + * To be invoked only by the Jackson JSON deserializer. + */ + public void setVersion(long version) { + if (version != VERSION) { + throw new RuntimeException("Version mismatch! Expected " + VERSION + + " got " + version); + } + } + + /** + * To be invoked only by the Jackson JSON serializer. + */ + public HashMap getStates() { + return pool; + } + + /** + * To be invoked only by the Jackson JSON deserializer. + */ + public void setStates(HashMap states) { + if (pool.size() > 0) { + throw new RuntimeException("Pool not empty!"); + } + + //TODO Should we do a clone? + this.pool = states; + } +} \ No newline at end of file