This article is contributed. See the original author and article here.
Azure HDInsight Spark 5.0 to HDI 5.1 Migration
A new version of HDInsight 5.1 is released with Spark 3.3.1. This release improves join query performance via Bloom filters, increases the Pandas API coverage with the support of popular Pandas features such as datetime.timedelta and merge_asof, simplifies the migration from traditional data warehouses by improving ANSI compliance and supporting dozens of new built-in functions.
In this article we will discuss about the migration of user applications from HDInsight 5.0(Spark 3.1) to HDInsight 5.1 (Spark 3.3). The sections include,
1. Changes which are compatible with minor changes
2. Changes in Spark that require application changes
Application Changes with backport.
The below changes are part of HDI 5.1 release. If these functions are used in applications, the given steps can be taken to avoid the changes in application code.
- Since Spark 3.3, the histogram_numeric function in Spark SQL returns an output type of an array of structs (x, y), where the type of the ‘x’ field in the return value is propagated from the input values consumed in the aggregate function. In Spark 3.2 or earlier, x’ always had double type. Optionally, use the configuration spark.sql.legacy.histogramNumericPropagateInputType since Spark 3.3 to revert to the previous behavior.
Spark 3.1 (pyspark)
Spark 3.3:
- In Spark 3.3, the timestamps subtraction expression such as timestamp ‘2021-03-31 23:48:00’ – timestamp ‘2021-01-01 00:00:00’ returns values of DayTimeIntervalType. In Spark 3.1 and earlier, the type of the same expression is CalendarIntervalType. To restore the behavior before Spark 3.3, you can set spark.sql.legacy.interval.enabled to true.
- Since Spark 3.3, the functions lpad and rpad have been overloaded to support byte sequences. When the first argument is a byte sequence, the optional padding pattern must also be a byte sequence and the result is a BINARY value. The default padding pattern in this case is the zero byte. To restore the legacy behavior of always returning string types, set spark.sql.legacy.lpadRpadAlwaysReturnString to true.
> SELECT hex(lpad(x’1020′, 5, x’05’))
0505051020
SELECT hex(rpad(x’1020′, 5, x’05’)) 1020050505
- Since Spark 3.3, Spark turns a non-nullable schema into nullable for API DataFrameReader.schema(schema: StructType).json(jsonDataset: Dataset[String]) and DataFrameReader.schema(schema: StructType).csv(csvDataset: Dataset[String]) when the schema is specified by the user and contains non-nullable fields. To restore the legacy behavior of respecting the nullability, set spark.sql.legacy.respectNullabilityInTextDatasetConversion to true.
- Since Spark 3.3, nulls are written as empty strings in CSV data source by default. In Spark 3.2 or earlier, nulls were written as empty strings as quoted empty strings, “”. To restore the previous behavior, set nullValue to “”, or set the configuration spark.sql.legacy.nullValueWrittenAsQuotedEmptyStringCsv to true.
Sample Data:
Spark 3.1:
Spark 3.3:
- Since Spark 3.3, Spark will try to use built-in data source writer instead of Hive serde in INSERT OVERWRITE DIRECTORY. This behavior is effective only if spark.sql.hive.convertMetastoreParquet or spark.sql.hive.convertMetastoreOrc is enabled respectively for Parquet and ORC formats. To restore the behavior before Spark 3.3, you can set spark.sql.hive.convertMetastoreInsertDir to false.
Spark logs:
INFO ParquetOutputFormat [Executor task launch worker for task 0.0 in stage 0.0 (TID 0)]: ParquetRecordWriter [block size: 134217728b, row group padding size: 8388608b, validating: false]INFO ParquetWriteSupport [Executor task launch worker for task 0.0 in stage 0.0 (TID 0)]: Initialized Parquet WriteSupport with Catalyst schema:{ “type” : “struct”, “fields” : [ { “name” : “fname”, “type” : “string”, “nullable” : true, “metadata” : { } }, {
- Since Spark 3.3.1 and 3.2.3, for SELECT … GROUP BY a GROUPING SETS (b)-style SQL statements, grouping__id returns different values from Apache Spark 3.2.0, 3.2.1, 3.2.2, and 3.3.0. It computes based on user-given group-by expressions plus grouping set columns. To restore the behavior before 3.3.1 and 3.2.3, you can set spark.sql.legacy.groupingIdWithAppendedUserGroupBy
- In Spark 3.3, spark.sql.adaptive.enabled is enabled by default. To restore the behavior before Spark 3.3, you can set spark.sql.adaptive.enabled to false.
In Spark3.1, AQE is set to false by default.
In Spark3.3, AQE is enabled by default.
Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan, which is enabled by default since Apache Spark 3.3.0. Spark SQL can turn on and off AQE by spark.sql.adaptive.enabled as an umbrella configuration. As of Spark 3.0, there are three major features in AQE: including coalescing post-shuffle partitions, converting sort-merge join to broadcast join, and skew join optimization.
https://spark.apache.org/docs/latest/sql-performance-tuning.htm
- In Spark 3.3, the output schema of SHOW TABLES becomes namespace: string, tableName: string, isTemporary: boolean. In Spark 3.1 or earlier, the namespace field was named database for the builtin catalog, and there is no isTemporary field for v2 catalogs. To restore the old schema with the builtin catalog, you can set spark.sql.legacy.keepCommandOutputSchema to true.
In Spark3.1, Field is termed as database:-
In Spark3.3, Field is termed as Namespace: –
We can restore the behavior by setting the below property.
- In Spark 3.3, the output schema of SHOW TABLE EXTENDED becomes namespace: string, tableName: string, isTemporary: boolean, information: string. In Spark 3.1 or earlier, the namespace field was named database for the builtin catalog, and no change for the v2 catalogs. To restore the old schema with the builtin catalog, you can set spark.sql.legacy.keepCommandOutputSchema to true.
Show similar screenshot details in both spark-sql shell for spark3.1 and spark3.3 versions.
In Spark3.1, Field is termed as database:
In Spark3.3, Field is termed as Namespace: –
We can restore the behavior by setting the below property.
- In Spark 3.3, CREATE TABLE AS SELECT with non-empty LOCATION will throw AnalysisException. To restore the behavior before Spark 3.2, you can set spark.sql.legacy.allowNonEmptyLocationInCTAS to true.
In spark 3.3, we are able to CTAS with non-empty location, as shown below
In spark 3.3 also we are able to create tables without the above property change
- In Spark 3.3, special datetime values such as epoch, today, yesterday, tomorrow, and now are supported in typed literals or in cast of foldable strings only, for instance, select timestamp’now’ or select cast(‘today’ as date). In Spark 3.1 and 3.0, such special values are supported in any casts of strings to dates/timestamps. To keep these special values as dates/timestamps in Spark 3.1 and 3.0, you should replace them manually, e.g. if (c in (‘now’, ‘today’), current_date(), cast(c as date)).
In spark 3.3 and 3.1 below code works exactly same.
Application Changes Expected
There are some changes in the spark functions between HDI 5.0 and 5.1. The changes depend on whether the applications use below functionalities and APIs.
- Since Spark 3.3, DESCRIBE FUNCTION fails if the function does not exist. In Spark 3.2 or earlier, DESCRIBE FUNCTION can still run and print “Function: func_name not found”.
Spark 3.1:
Spark 3.3:
- Since Spark 3.3, DROP FUNCTION fails if the function name matches one of the built-in functions’ name and is not qualified. In Spark 3.2 or earlier, DROP FUNCTION can still drop a persistent function even if the name is not qualified and is the same as a built-in function’s name.
- Since Spark 3.3, when reading values from a JSON attribute defined as FloatType or DoubleType, the strings “+Infinity”, “+INF”, and “-INF” are now parsed to the appropriate values, in addition to the already supported “Infinity” and “-Infinity” variations. This change was made to improve consistency with Jackson’s parsing of the unquoted versions of these values. Also, the allowNonNumericNumbers option is now respected so these strings will now be considered invalid if this option is disabled.
- Since Spark 3.3, when reading values from a JSON attribute defined as FloatType or DoubleType, the strings “+Infinity”, “+INF”, and “-INF” are now parsed to the appropriate values, in addition to the already supported “Infinity” and “-Infinity” variations. This change was made to improve consistency with Jackson’s parsing of the unquoted versions of these values. Also, the allowNonNumericNumbers option is now respected so these strings will now be considered invalid if this option is disabled.
Spark 3.3:
Spark 3.1:
- Spark 3.3 introduced error handling functions like below:
TRY_SUBTRACT – behaves as an “-” operator but returns null in case of an error.
TRY_MULTIPLY – is a safe representation of the “*” operator.
TRY_SUM – is an error-handling implementation of the sum operation.
TRY_AVG – is an error handling-implementation of the average operation.
TRY_TO_BINARY – eventually converts an input value to a binary value.
Example of ‘try_to_binary’ function:
When correct value given for base64 decoding:
When wrong value given for base64 decoding it doesn’t throw any error.
- Since Spark 3.3, ADD FILE/JAR/ARCHIVE commands require each path to be enclosed by ” or ‘ if the path contains whitespaces.
In spark3.3:
In spark3.1: Multiple jars adding not working, only at a time can be added.
- 16.In Spark 3.3, the following meta-characters are escaped in the show() action. In Spark 3.1 or earlier, the following metacharacters are output as it is.
n (new line)
r (carrige ret)
t (horizontal tab)
f (form feed)
b (backspace)
u000B (vertical tab)
u0007 (bell)
In Spark3.3, meta-characters are escaped in the show() action.
In Spark3.1, the meta-characters are actually interpreted as their define functions.
- In Spark 3.3, the output schema of DESCRIBE NAMESPACE becomes info_name: string, info_value: string. In Spark 3.1 or earlier, the info_name field was named database_description_item and the info_value field was named database_description_value for the builtin catalog. To restore the old schema with the builtin catalog, you can set spark.sql.legacy.keepCommandOutputSchema to true.
In Spark 3.1, we see the below headers before we set the property to false and check.
In Spark 3.3, we see the Info name and Info value before we set the property to true.
- In Spark 3.3, DataFrameNaFunctions.replace() no longer uses exact string match for the input column names, to match the SQL syntax and support qualified column names. Input column name having a dot in the name (not nested) needs to be escaped with backtick `. Now, it throws AnalysisException if the column is not found in the data frame schema. It also throws IllegalArgumentException if the input column name is a nested column. In Spark 3.1 and earlier, it used to ignore invalid input column name and nested column name.
- In Spark 3.3, CREATE TABLE .. LIKE .. command can not use reserved properties. You need their specific clauses to specify them, for example, CREATE TABLE test1 LIKE test LOCATION ‘some path’. You can set spark.sql.legacy.notReserveProperties to true to ignore the ParseException, in this case, these properties will be silently removed, for example: TBLPROPERTIES(‘owner’=’yao’) will have no effect. In Spark version 3.1 and below, the reserved properties can be used in CREATE TABLE .. LIKE .. command but have no side effects, for example, TBLPROPERTIES(‘location’=’/tmp’) does not change the location of the table but only creates a headless property just like ‘a’=’b’.
In spark 3.3 we got the same parse exceptions, post setting the property we were able to create the table
In spark 3.1 , we didn’t get any exceptions or errors:
- In Spark 3.3, the unit-to-unit interval literals like INTERVAL ‘1-1’ YEAR TO MONTH and the unit list interval literals like INTERVAL ‘3’ DAYS ‘1’ HOUR are converted to ANSI interval types: YearMonthIntervalType or DayTimeIntervalType. In Spark 3.1 and earlier, such interval literals are converted to CalendarIntervalType. To restore the behavior before Spark 3.3, you can set spark.sql.legacy.interval.enabled to true.
In spark 3.3, post setting up this spark.sql.legacy.interval.enabled to true
these literals are converted to ANSI interval types: YearMonthIntervalType or DayTimeIntervalType.
In Spark 3.1, there are no changes due to the change in property.
- In Spark 3.3, the TRANSFORM operator can’t support alias in inputs. In Spark 3.1 and earlier, we can write script transform like SELECT TRANSFORM(a AS c1, b AS c2) USING ‘cat’ FROM TBL.
In Spark 3.1 we are able use direct transforms but ,
In spark 3.3, direct transform is prohibited , but can be use with below workaround.
Brought to you by Dr. Ware, Microsoft Office 365 Silver Partner, Charleston SC.
Recent Comments