Skip to content

Instantly share code, notes, and snippets.

@UstDoesTech
Created October 21, 2021 10:22
Show Gist options
  • Save UstDoesTech/172525c34c4bea5650e1338e77c5bf3a to your computer and use it in GitHub Desktop.
Save UstDoesTech/172525c34c4bea5650e1338e77c5bf3a to your computer and use it in GitHub Desktop.
Using Autoloader with S3 on Azure Databricks. Temporary Credential Authentication Method
# COMMAND ----------
import boto3
from botocore.config import Config
# COMMAND ----------
awsAccessKeyId = dbutils.secrets.get("scopeName", key = "AWSAccessKeyID")
awsSecretAccessKey = dbutils.secrets.get("scopeName", key = "AWSSecretAccessKey")
# COMMAND ----------
botoConfig = Config(
region_name = 'eu-west-2',
signature_version = 'v4',
retries = {
'max_attempts': 10,
'mode': 'standard'
}
)
# COMMAND ----------
client = boto3.client('sts', config = botoConfig, aws_access_key_id = awsAccessKeyId, aws_secret_access_key = awsSecretAccessKey)
response = client.assume_role(
RoleArn='arn:aws:iam::1234567890:role/role_name',
RoleSessionName='AzureDatabricks',
DurationSeconds=3600
)
# COMMAND ----------
credResponse = response['Credentials']
# COMMAND ----------
spark.conf.set("fs.s3a.credentialsType", "AssumeRole")
spark.conf.set("fs.s3a.stsAssumeRole.arn", "arn:aws:iam::1234567890:role/role_name")
spark.conf.set("fs.s3a.acl.default", "BucketOwnerFullControl")
sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", credResponse['AccessKeyId'] )
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", credResponse['SecretAccessKey'])
sc._jsc.hadoopConfiguration().set("fs.s3a.session.token", credResponse['SessionToken'])
# COMMAND ----------
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema(definedSchema)
.load("s3a://bucket/directory/")
)
# COMMAND ----------
(df.writeStream.format("delta")
.option("checkpointLocation", "/mnt/lake/directory/_checkpoint")
.trigger(once=True)
.start("/mnt/lake/directory/")
)
@psrawal
Copy link

psrawal commented May 24, 2023

Aren't we access S3 by using secret key and access key ? I have ingested the data from S3 to L2(databricks) now i want to enable autolader to my code. Could you help me with that ?
import org.apache.spark.sql.functions.{input_file_name, current_timestamp, explode}
import org.apache.spark.sql.types.{StructType, StructField, StringType, TimestampType, IntegerType, ArrayType}
import org.apache.spark.sql.{SparkSession, DataFrame}
import com.databricks.spark.xml.XmlDataFrameReader
import io.delta.tables._
import scala.collection.mutable.WrappedArray

val spark = SparkSession.builder().getOrCreate()

spark.conf.set("spark.hadoop.fs.s3a.access.key", "########")
spark.conf.set("spark.hadoop.fs.s3a.secret.key", "########")
spark.conf.set("spark.hadoop.fs.s3a.endpoint", "s3.######.amazonaws.com")

val df = spark.read.option("multiline", "true").json("s3a://########")

val df_flattened = df.selectExpr(
"subject.cohortName as cohortName",
"subject.createdDt as createdDt",
"explode(subject.event) as event",
"subject.statusChangeDt as statusChangeDt",
"subject.studyID as studyID",
"subject.subjectAge as subjectAge",
"subject.subjectGender as subjectGender",
"subject.subjectID as subjectID",
"subject.subjectStatus as subjectStatus",
"subject.trialSiteID as trialSiteID"
).selectExpr(
"cohortName",
"createdDt",
"event.date as event_date",
"event.name as event_name",
"event.values as event_values",
"statusChangeDt",
"studyID",
"subjectAge",
"subjectGender",
"subjectID",
"subjectStatus",
"trialSiteID"
)
df_flattened.show()

now I want ot enable autoloader in the above code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment