Skip to content

Instantly share code, notes, and snippets.

@UstDoesTech
Created October 21, 2021 10:22
Show Gist options
  • Save UstDoesTech/172525c34c4bea5650e1338e77c5bf3a to your computer and use it in GitHub Desktop.
Save UstDoesTech/172525c34c4bea5650e1338e77c5bf3a to your computer and use it in GitHub Desktop.
Using Autoloader with S3 on Azure Databricks. Temporary Credential Authentication Method
# COMMAND ----------
import boto3
from botocore.config import Config
# COMMAND ----------
awsAccessKeyId = dbutils.secrets.get("scopeName", key = "AWSAccessKeyID")
awsSecretAccessKey = dbutils.secrets.get("scopeName", key = "AWSSecretAccessKey")
# COMMAND ----------
botoConfig = Config(
region_name = 'eu-west-2',
signature_version = 'v4',
retries = {
'max_attempts': 10,
'mode': 'standard'
}
)
# COMMAND ----------
client = boto3.client('sts', config = botoConfig, aws_access_key_id = awsAccessKeyId, aws_secret_access_key = awsSecretAccessKey)
response = client.assume_role(
RoleArn='arn:aws:iam::1234567890:role/role_name',
RoleSessionName='AzureDatabricks',
DurationSeconds=3600
)
# COMMAND ----------
credResponse = response['Credentials']
# COMMAND ----------
spark.conf.set("fs.s3a.credentialsType", "AssumeRole")
spark.conf.set("fs.s3a.stsAssumeRole.arn", "arn:aws:iam::1234567890:role/role_name")
spark.conf.set("fs.s3a.acl.default", "BucketOwnerFullControl")
sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", credResponse['AccessKeyId'] )
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", credResponse['SecretAccessKey'])
sc._jsc.hadoopConfiguration().set("fs.s3a.session.token", credResponse['SessionToken'])
# COMMAND ----------
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema(definedSchema)
.load("s3a://bucket/directory/")
)
# COMMAND ----------
(df.writeStream.format("delta")
.option("checkpointLocation", "/mnt/lake/directory/_checkpoint")
.trigger(once=True)
.start("/mnt/lake/directory/")
)
@psrawal
Copy link

psrawal commented May 23, 2023

Could you please help me with roleArn and credentials. I did not understand what values need to be pass

@UstDoesTech
Copy link
Author

I can't help you with the AWS side of things too much, as it differs from organisation to organisation. However, the documentation on the ARN should help https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_identifiers.html

The credentials associated will be generated on the AWS side too. Talk to your AWS IAM admin for support. All this script does is generate temporary credentials to authenticate against an IAM backed storage account.

@psrawal
Copy link

psrawal commented May 24, 2023

Aren't we access S3 by using secret key and access key ? I have ingested the data from S3 to L2(databricks) now i want to enable autolader to my code. Could you help me with that ?
import org.apache.spark.sql.functions.{input_file_name, current_timestamp, explode}
import org.apache.spark.sql.types.{StructType, StructField, StringType, TimestampType, IntegerType, ArrayType}
import org.apache.spark.sql.{SparkSession, DataFrame}
import com.databricks.spark.xml.XmlDataFrameReader
import io.delta.tables._
import scala.collection.mutable.WrappedArray

val spark = SparkSession.builder().getOrCreate()

spark.conf.set("spark.hadoop.fs.s3a.access.key", "########")
spark.conf.set("spark.hadoop.fs.s3a.secret.key", "########")
spark.conf.set("spark.hadoop.fs.s3a.endpoint", "s3.######.amazonaws.com")

val df = spark.read.option("multiline", "true").json("s3a://########")

val df_flattened = df.selectExpr(
"subject.cohortName as cohortName",
"subject.createdDt as createdDt",
"explode(subject.event) as event",
"subject.statusChangeDt as statusChangeDt",
"subject.studyID as studyID",
"subject.subjectAge as subjectAge",
"subject.subjectGender as subjectGender",
"subject.subjectID as subjectID",
"subject.subjectStatus as subjectStatus",
"subject.trialSiteID as trialSiteID"
).selectExpr(
"cohortName",
"createdDt",
"event.date as event_date",
"event.name as event_name",
"event.values as event_values",
"statusChangeDt",
"studyID",
"subjectAge",
"subjectGender",
"subjectID",
"subjectStatus",
"trialSiteID"
)
df_flattened.show()

now I want ot enable autoloader in the above code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment