Migration from Databricks
Create (or get) a SparkSession
spark = SparkSession.builder.getOrCreate()
Replace dbutils.fs with underlying Hadoop client
Replace this:
folders=dbutils.fs.ls(f"dbfs:/mnt/{SourceContainer}/{SourceFolder}/")
for i in folders:
# Do things here with `i`....
With:
hadoop = spark._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
for f in fs.get(spark.sparkContext.hadoopConfiguration).listStatus(hadoop.fs.Path(f"abfss://{SourceContainer}@{StorageAccount}.dfs.core.windows.net/{SourceFolder}/")):
print(f.getPath(), f.getLen())
Change
- r.name -> r.getName()
- r.path -> r.path.toString()
Remove mount and replace dbfs by abfss
Remove mount, example:
dbutils.fs.mount(
source = f"abfss://{SourceContainer}@{StorageAccount}.dfs.core.windows.net/",
mount_point = mountPoint,
extra_configs = configs)
To:
#
Replace dbfs, example:
f"dbfs:/mnt/{SourceContainer}/...
To:
f"abfss://{SourceContainer}@{StorageAccount}.dfs.core.windows.net/...
And update Spark conf, example:
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id":oauthClientId,
"fs.azure.account.oauth2.client.secret": oauthSecret,
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<<your tenant>>/oauth2/token"}
To
spark.sparkContext._jsc.hadoopConfiguration().set(f"fs.azure.account.auth.type.{StorageAccount}.dfs.core.windows.net", "OAuth")
spark.sparkContext._jsc.hadoopConfiguration().set(f"fs.azure.account.oauth.provider.type.{StorageAccount}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.sparkContext._jsc.hadoopConfiguration().set(f"fs.azure.account.oauth2.client.id.{StorageAccount}.dfs.core.windows.net",oauthClientId)
spark.sparkContext._jsc.hadoopConfiguration().set(f"fs.azure.account.oauth2.client.secret.{StorageAccount}.dfs.core.windows.net", oauthSecret)
spark.sparkContext._jsc.hadoopConfiguration().set(f"fs.azure.account.oauth2.client.endpoint.{StorageAccount}.dfs.core.windows.net", "https://login.microsoftonline.com/<<your tenant>>/oauth2/token")
Run via the Job API
$baseUrl="https://api.graal.systems/api/v1/"
Invoke-WebRequest "$baseUrl/jobs" | ConvertFrom-Json
$headers = @{
"Content-Type" = "application/vnd.graal.systems.v1.job+json"
}
$body = @{
"name" = "Test"
"description" = "A description"
"options" = @{
"type" = "spark"
"docker_image" = "spark-py:3.0.1-jre11-hadoop3.2-azure"
"file_url" = "compute-xml-file.py"
"conf" = @{
"spark.executor.instances" = "1"
"spark.executor.memory" = "512m"
"spark.driver.cores" = "1"
}
}
}
$job = Invoke-WebRequest "$baseUrl/jobs" -Method Post -Body ($body | ConvertTo-Json) -Headers $headers | ConvertFrom-Json
$headers = @{
"Content-Type" = "application/vnd.graal.systems.v1.run+json"
}
$body = @{
"name" = "Run"
"initiator" = "datafactory-XXX"
"description" = "Demo Spark job"
}
$jobId=$job.id
$run = Invoke-WebRequest "$baseUrl/jobs/$jobId/runs" -Method Post -Body ($body | ConvertTo-Json) -Headers $headers -UseBasicParsing | ConvertFrom-Json
$runId=$run.id
Invoke-WebRequest "$baseUrl/jobs/$jobId/runs/$runId" -UseBasicParsing | ConvertFrom-Json
Use Java classes from Python
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip
python3
from pyspark.sql import SparkSession
sc = SparkSession \
.builder \
.appName("Your app name") \
.config("spark.jars", "/tmp/your-jar.jar") \
.getOrCreate()
from py4j.java_gateway import java_import
java_import(sc.sparkContext._gateway.jvm,"your.package.YourClass")
java_import(sc.sparkContext._gateway.jvm,"java.util.Properties")
properties= sc.sparkContext._gateway.jvm.Properties()
sc.sparkContext._gateway.jvm.YourClass.updateProperties(properties)