-
-
Save UstDoesTech/172525c34c4bea5650e1338e77c5bf3a to your computer and use it in GitHub Desktop.
# COMMAND ---------- | |
import boto3 | |
from botocore.config import Config | |
# COMMAND ---------- | |
awsAccessKeyId = dbutils.secrets.get("scopeName", key = "AWSAccessKeyID") | |
awsSecretAccessKey = dbutils.secrets.get("scopeName", key = "AWSSecretAccessKey") | |
# COMMAND ---------- | |
botoConfig = Config( | |
region_name = 'eu-west-2', | |
signature_version = 'v4', | |
retries = { | |
'max_attempts': 10, | |
'mode': 'standard' | |
} | |
) | |
# COMMAND ---------- | |
client = boto3.client('sts', config = botoConfig, aws_access_key_id = awsAccessKeyId, aws_secret_access_key = awsSecretAccessKey) | |
response = client.assume_role( | |
RoleArn='arn:aws:iam::1234567890:role/role_name', | |
RoleSessionName='AzureDatabricks', | |
DurationSeconds=3600 | |
) | |
# COMMAND ---------- | |
credResponse = response['Credentials'] | |
# COMMAND ---------- | |
spark.conf.set("fs.s3a.credentialsType", "AssumeRole") | |
spark.conf.set("fs.s3a.stsAssumeRole.arn", "arn:aws:iam::1234567890:role/role_name") | |
spark.conf.set("fs.s3a.acl.default", "BucketOwnerFullControl") | |
sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") | |
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", credResponse['AccessKeyId'] ) | |
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", credResponse['SecretAccessKey']) | |
sc._jsc.hadoopConfiguration().set("fs.s3a.session.token", credResponse['SessionToken']) | |
# COMMAND ---------- | |
df = (spark.readStream.format("cloudFiles") | |
.option("cloudFiles.format", "json") | |
.schema(definedSchema) | |
.load("s3a://bucket/directory/") | |
) | |
# COMMAND ---------- | |
(df.writeStream.format("delta") | |
.option("checkpointLocation", "/mnt/lake/directory/_checkpoint") | |
.trigger(once=True) | |
.start("/mnt/lake/directory/") | |
) |
Aren't we access S3 by using secret key and access key ? I have ingested the data from S3 to L2(databricks) now i want to enable autolader to my code. Could you help me with that ?
import org.apache.spark.sql.functions.{input_file_name, current_timestamp, explode}
import org.apache.spark.sql.types.{StructType, StructField, StringType, TimestampType, IntegerType, ArrayType}
import org.apache.spark.sql.{SparkSession, DataFrame}
import com.databricks.spark.xml.XmlDataFrameReader
import io.delta.tables._
import scala.collection.mutable.WrappedArray
val spark = SparkSession.builder().getOrCreate()
spark.conf.set("spark.hadoop.fs.s3a.access.key", "########")
spark.conf.set("spark.hadoop.fs.s3a.secret.key", "########")
spark.conf.set("spark.hadoop.fs.s3a.endpoint", "s3.######.amazonaws.com")
val df = spark.read.option("multiline", "true").json("s3a://########")
val df_flattened = df.selectExpr(
"subject.cohortName as cohortName",
"subject.createdDt as createdDt",
"explode(subject.event) as event",
"subject.statusChangeDt as statusChangeDt",
"subject.studyID as studyID",
"subject.subjectAge as subjectAge",
"subject.subjectGender as subjectGender",
"subject.subjectID as subjectID",
"subject.subjectStatus as subjectStatus",
"subject.trialSiteID as trialSiteID"
).selectExpr(
"cohortName",
"createdDt",
"event.date as event_date",
"event.name as event_name",
"event.values as event_values",
"statusChangeDt",
"studyID",
"subjectAge",
"subjectGender",
"subjectID",
"subjectStatus",
"trialSiteID"
)
df_flattened.show()
now I want ot enable autoloader in the above code.
I can't help you with the AWS side of things too much, as it differs from organisation to organisation. However, the documentation on the ARN should help https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_identifiers.html
The credentials associated will be generated on the AWS side too. Talk to your AWS IAM admin for support. All this script does is generate temporary credentials to authenticate against an IAM backed storage account.