Created
September 30, 2013 13:57
-
-
Save jeremykarn/6764204 to your computer and use it in GitHub Desktop.
Simple Pig example showing UDFs being called in each of the map and reduce phase.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
REGISTER ‘udf.py’ USING streaming_python AS my_udfs; | |
tweets = LOAD 's3n://twitter-gardenhose-mortar/tweets' | |
USING org.apache.pig.piggybank.storage.JsonLoader( | |
'text: chararray, place:tuple(name:chararray)'); | |
-- my_length UDF is called in the mapper for each tweet. | |
long_tweets = FILTER tweets BY my_udfs.my_length(text) > 50; | |
-- my_avg_length UDF is called in the reducer on the grouped output. | |
avg_long_tweet_length = FOREACH (GROUP long_tweets BY place.name) GENERATE | |
group, | |
my_udfs.my_avg_length(long_tweets.text); | |
STORE avg_long_tweet_length INTO '<output>'; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@outputSchema('length:int') | |
def my_length(tweet): | |
return len(tweet) if tweet else 0 | |
@outputSchema('avg_length:float') | |
def my_avg_length(tweets): | |
""" | |
tweets is a list of one element tuples. | |
""" | |
return sum( [len(t[0]) for t in tweets] ) / float(len(tweets)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment