Skip to content

Commit bb112e2

Browse files
Xiang Licloud-fan
authored andcommitted
[SPARK-52326][SQL] Add partitions related ExternalCatalogEvent and post them in corresponding operations
### What changes were proposed in this pull request? 1. Add `partitions` related events, for the following external catalog operations: create / drop / alter / rename. A base "PartitionsEvent" is added by extending "TableEvent". One "PartitionsEvent" (and its subs) could contain the operation for one or more partitions. So it is named as "Partition`s`Event" (in the plural form of partition), so are its subs. 2. Post those events in the corresponding external catalog operations ### Why are the changes needed? The operation list extracted from Spark events are of great help for user to summarize what have been done against a db/table/function/partition, for the purpose of logging and/or auditing. In [ExternalCatalogWithListener](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala), there are events posted against db, table and function for all registered listeners. But those operations against partition(s) do not have their events posted. ### Does this PR introduce _any_ user-facing change? With this change, partition(s) related operations are posted into register listener as events ### How was this patch tested? Enrich UT and also tested on local cluster ### Was this patch authored or co-authored using generative AI tooling? No Closes #53439 from waterlx/partitions_event. Authored-by: Xiang Li <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 04624f6 commit bb112e2

File tree

3 files changed

+203
-0
lines changed

3 files changed

+203
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,10 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog)
204204
table: String,
205205
parts: Seq[CatalogTablePartition],
206206
ignoreIfExists: Boolean): Unit = {
207+
val partSpecs = parts.map(_.spec)
208+
postToAll(CreatePartitionsPreEvent(db, table, partSpecs))
207209
delegate.createPartitions(db, table, parts, ignoreIfExists)
210+
postToAll(CreatePartitionsEvent(db, table, partSpecs))
208211
}
209212

210213
override def dropPartitions(
@@ -214,22 +217,29 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog)
214217
ignoreIfNotExists: Boolean,
215218
purge: Boolean,
216219
retainData: Boolean): Unit = {
220+
postToAll(DropPartitionsPreEvent(db, table, partSpecs))
217221
delegate.dropPartitions(db, table, partSpecs, ignoreIfNotExists, purge, retainData)
222+
postToAll(DropPartitionsEvent(db, table, partSpecs))
218223
}
219224

220225
override def renamePartitions(
221226
db: String,
222227
table: String,
223228
specs: Seq[TablePartitionSpec],
224229
newSpecs: Seq[TablePartitionSpec]): Unit = {
230+
postToAll(RenamePartitionsPreEvent(db, table, specs, newSpecs))
225231
delegate.renamePartitions(db, table, specs, newSpecs)
232+
postToAll(RenamePartitionsEvent(db, table, specs, newSpecs))
226233
}
227234

228235
override def alterPartitions(
229236
db: String,
230237
table: String,
231238
parts: Seq[CatalogTablePartition]): Unit = {
239+
val partSpecs = parts.map(_.spec)
240+
postToAll(AlterPartitionsPreEvent(db, table, partSpecs))
232241
delegate.alterPartitions(db, table, parts)
242+
postToAll(AlterPartitionsEvent(db, table, partSpecs))
233243
}
234244

235245
override def getPartition(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.spark.sql.catalyst.catalog
1818

1919
import org.apache.spark.scheduler.SparkListenerEvent
20+
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2021

2122
/**
2223
* Event emitted by the external catalog when it is modified. Events are either fired before or
@@ -203,3 +204,87 @@ case class RenameFunctionEvent(
203204
name: String,
204205
newName: String)
205206
extends FunctionEvent
207+
208+
/**
209+
* Event fired when some partitions (of a table) are created, dropped, renamed, altered.
210+
*/
211+
trait PartitionsEvent extends TableEvent {
212+
/**
213+
* Specs of the partitions which are touched.
214+
*/
215+
val partSpecs: Seq[TablePartitionSpec]
216+
}
217+
218+
/**
219+
* Event fired before some partitions (of a table) are created.
220+
*/
221+
case class CreatePartitionsPreEvent(
222+
database: String,
223+
name /* of table */: String,
224+
partSpecs: Seq[TablePartitionSpec])
225+
extends PartitionsEvent
226+
227+
/**
228+
* Event fired after some partitions (of a table) have been created.
229+
*/
230+
case class CreatePartitionsEvent(
231+
database: String,
232+
name /* of table */: String,
233+
partSpecs: Seq[TablePartitionSpec])
234+
extends PartitionsEvent
235+
236+
/**
237+
* Event fired before some partitions (of a table) are dropped.
238+
*/
239+
case class DropPartitionsPreEvent(
240+
database: String,
241+
name /* of table */ : String,
242+
partSpecs: Seq[TablePartitionSpec])
243+
extends PartitionsEvent
244+
245+
/**
246+
* Event fired after some partitions (of a table) have been dropped.
247+
*/
248+
case class DropPartitionsEvent(
249+
database: String,
250+
name /* of table */ : String,
251+
partSpecs: Seq[TablePartitionSpec])
252+
extends PartitionsEvent
253+
254+
/**
255+
* Event fired before some partitions (of a table) are renamed.
256+
*/
257+
case class RenamePartitionsPreEvent(
258+
database: String,
259+
name /* of table */ : String,
260+
partSpecs: Seq[TablePartitionSpec],
261+
newPartSpecs: Seq[TablePartitionSpec])
262+
extends PartitionsEvent
263+
264+
/**
265+
* Event fired after some partitions (of a table) have been renamed.
266+
*/
267+
case class RenamePartitionsEvent(
268+
database: String,
269+
name /* of table */ : String,
270+
partSpecs: Seq[TablePartitionSpec],
271+
newPartSpecs: Seq[TablePartitionSpec])
272+
extends PartitionsEvent
273+
274+
/**
275+
* Event fired before some partitions (of a table) are altered.
276+
*/
277+
case class AlterPartitionsPreEvent(
278+
database: String,
279+
name /* of table */ : String,
280+
partSpecs: Seq[TablePartitionSpec])
281+
extends PartitionsEvent
282+
283+
/**
284+
* Event fired after some partitions (of a table) have been altered.
285+
*/
286+
case class AlterPartitionsEvent(
287+
database: String,
288+
name /* of table */ : String,
289+
partSpecs: Seq[TablePartitionSpec])
290+
extends PartitionsEvent

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,4 +209,112 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
209209
catalog.dropFunction("db5", "fn4")
210210
checkEvents(DropFunctionPreEvent("db5", "fn4") :: DropFunctionEvent("db5", "fn4") :: Nil)
211211
}
212+
213+
testWithCatalog("partitions") { (catalog, checkEvents) =>
214+
// Prepare db
215+
val db = "db1"
216+
val dbUri = preparePath(Files.createTempDirectory(db + "_"))
217+
val dbDefinition = CatalogDatabase(
218+
name = db,
219+
description = "",
220+
locationUri = dbUri,
221+
properties = Map.empty)
222+
223+
catalog.createDatabase(dbDefinition, ignoreIfExists = false)
224+
checkEvents(
225+
CreateDatabasePreEvent(db) ::
226+
CreateDatabaseEvent(db) :: Nil)
227+
228+
// Prepare table
229+
val table = "table1"
230+
val tableUri = preparePath(Files.createTempDirectory(table + "_"))
231+
val tableDefinition = CatalogTable(
232+
identifier = TableIdentifier(table, Some(db)),
233+
tableType = CatalogTableType.MANAGED,
234+
storage = CatalogStorageFormat.empty.copy(locationUri = Option(tableUri)),
235+
schema = new StructType()
236+
.add("year", "int")
237+
.add("month", "int")
238+
.add("sales", "long"))
239+
240+
catalog.createTable(tableDefinition, ignoreIfExists = false)
241+
checkEvents(
242+
CreateTablePreEvent(db, table) ::
243+
CreateTableEvent(db, table) :: Nil)
244+
245+
// Prepare partitions
246+
val storageFormat = CatalogStorageFormat(
247+
locationUri = Some(tableUri),
248+
inputFormat = Some("tableInputFormat"),
249+
outputFormat = Some("tableOutputFormat"),
250+
serde = None,
251+
compressed = false,
252+
properties = Map.empty)
253+
val parts = Seq(CatalogTablePartition(Map("year" -> "2025", "month" -> "Jan"), storageFormat))
254+
val partSpecs = parts.map(_.spec)
255+
256+
val newPartSpecs = Seq(Map("year" -> "2026", "month" -> "Feb"))
257+
258+
// CREATE
259+
catalog.createPartitions(db, table, parts, ignoreIfExists = false)
260+
checkEvents(
261+
CreatePartitionsPreEvent(db, table, partSpecs) ::
262+
CreatePartitionsEvent(db, table, partSpecs) :: Nil)
263+
264+
// Re-create with ignoreIfExists as true
265+
catalog.createPartitions(db, table, parts, ignoreIfExists = true)
266+
checkEvents(
267+
CreatePartitionsPreEvent(db, table, partSpecs) ::
268+
CreatePartitionsEvent(db, table, partSpecs) :: Nil)
269+
270+
// createPartitions() failed because re-creating with ignoreIfExists as false, so PreEvent only
271+
intercept[AnalysisException] {
272+
catalog.createPartitions(db, table, parts, ignoreIfExists = false)
273+
}
274+
checkEvents(CreatePartitionsPreEvent(db, table, partSpecs) :: Nil)
275+
276+
// ALTER
277+
catalog.alterPartitions(db, table, parts)
278+
checkEvents(
279+
AlterPartitionsPreEvent(db, table, partSpecs) ::
280+
AlterPartitionsEvent(db, table, partSpecs) ::
281+
Nil)
282+
283+
// RENAME
284+
catalog.renamePartitions(db, table, partSpecs, newPartSpecs)
285+
checkEvents(
286+
RenamePartitionsPreEvent(db, table, partSpecs, newPartSpecs) ::
287+
RenamePartitionsEvent(db, table, partSpecs, newPartSpecs) :: Nil)
288+
289+
// renamePartitions() failed because partitions have been renamed according to newPartSpecs,
290+
// so PreEvent only
291+
intercept[AnalysisException] {
292+
catalog.renamePartitions(db, table, partSpecs, newPartSpecs)
293+
}
294+
checkEvents(RenamePartitionsPreEvent(db, table, partSpecs, newPartSpecs) :: Nil)
295+
296+
// DROP
297+
// dropPartitions() failed
298+
// because partition of (old) partSpecs do not exist and ignoreIfNotExists is false,
299+
// So PreEvent only
300+
intercept[AnalysisException] {
301+
catalog.dropPartitions(db, table, partSpecs,
302+
ignoreIfNotExists = false, purge = true, retainData = true)
303+
}
304+
checkEvents(DropPartitionsPreEvent(db, table, partSpecs) :: Nil)
305+
306+
// Drop the renamed partitions
307+
catalog.dropPartitions(db, table, newPartSpecs,
308+
ignoreIfNotExists = false, purge = true, retainData = true)
309+
checkEvents(
310+
DropPartitionsPreEvent(db, table, newPartSpecs) ::
311+
DropPartitionsEvent(db, table, newPartSpecs) :: Nil)
312+
313+
// Re-drop with ignoreIfNotExists being true
314+
catalog.dropPartitions(db, table, newPartSpecs,
315+
ignoreIfNotExists = true, purge = true, retainData = true)
316+
checkEvents(
317+
DropPartitionsPreEvent(db, table, newPartSpecs) ::
318+
DropPartitionsEvent(db, table, newPartSpecs) :: Nil)
319+
}
212320
}

0 commit comments

Comments
 (0)