Created
July 17, 2016 13:16
-
-
Save GEOFBOT/041d76b47f08919305493f57ebdde0f7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.flink; | |
import org.apache.commons.io.Charsets; | |
import org.apache.commons.io.FileUtils; | |
import org.apache.flink.api.common.functions.RichFlatMapFunction; | |
import org.apache.flink.api.common.functions.RichMapFunction; | |
import org.apache.flink.api.java.DataSet; | |
import org.apache.flink.api.java.ExecutionEnvironment; | |
import org.apache.flink.api.java.tuple.Tuple; | |
import org.apache.flink.api.java.tuple.Tuple1; | |
import org.apache.flink.configuration.Configuration; | |
import org.apache.flink.shaded.com.google.common.io.Files; | |
import org.apache.flink.util.Collector; | |
import java.io.*; | |
import java.util.*; | |
public class DistCacheTest { | |
// tests creating and reading from DistributedCache on local machine | |
// based on DistributedCacheTest | |
public static final String data | |
= "machen\n" | |
+ "zeit\n" | |
+ "heerscharen\n" | |
+ "keiner\n" | |
+ "meine\n"; | |
protected static String textPath; | |
private static final List<File> tempFiles = new ArrayList<File>(); | |
public static class WordChecker extends RichFlatMapFunction<String, Tuple1<String>> { | |
private static final long serialVersionUID = 1L; | |
private final Set<String> wordList = new HashSet<>(); | |
@Override | |
public void open(Configuration conf) throws FileNotFoundException, IOException { | |
File file = getRuntimeContext().getDistributedCache().getFile("cache_test"); | |
BufferedReader reader = new BufferedReader(new FileReader(file)); | |
String tempString; | |
while ((tempString = reader.readLine()) != null) { | |
wordList.add(tempString); | |
} | |
reader.close(); | |
} | |
@Override | |
public void flatMap(String word, Collector<Tuple1<String>> out) throws Exception { | |
if (wordList.contains(word)) { | |
out.collect(new Tuple1<>(word)); | |
} | |
} | |
} | |
// | |
// Program | |
// | |
public static void main(String[] args) throws Exception { | |
textPath = createTempFile("count.txt", data); | |
// set up the execution environment | |
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); | |
env.registerCachedFile(textPath, "cache_test"); | |
env | |
.readTextFile(textPath) | |
.flatMap(new WordChecker()) | |
.print(); | |
} | |
// -------------------------------------------------------------------------------------------- | |
// Temporary File Utilities | |
// -------------------------------------------------------------------------------------------- | |
public static String createTempFile(String fileName, String contents) throws IOException { | |
File f = createAndRegisterTempFile(fileName); | |
Files.write(contents, f, Charsets.UTF_8); | |
return f.toURI().toString(); | |
} | |
public static File createAndRegisterTempFile(String fileName) throws IOException { | |
File baseDir = new File(System.getProperty("java.io.tmpdir")); | |
File f = new File(baseDir, "-" + fileName); | |
if (f.exists()) { | |
deleteRecursively(f); | |
} | |
File parentToDelete = f; | |
while (true) { | |
File parent = parentToDelete.getParentFile(); | |
if (parent == null) { | |
throw new IOException("Missed temp dir while traversing parents of a temp file."); | |
} | |
if (parent.equals(baseDir)) { | |
break; | |
} | |
parentToDelete = parent; | |
} | |
Files.createParentDirs(f); | |
tempFiles.add(parentToDelete); | |
return f; | |
} | |
private static void deleteRecursively(File f) throws IOException { | |
if (f.isDirectory()) { | |
FileUtils.deleteDirectory(f); | |
} else if (!f.delete()) { | |
System.err.println("Failed to delete file " + f.getAbsolutePath()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment