Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[report]
exclude_lines =
# Skip any pass lines such as may be used for @abstractmethod
pass

# Have to re-enable the standard pragma
pragma: no cover

# Don't complain about missing debug-only code:
def __repr__
if self\.debug

# Don't complain if tests don't hit defensive assertion code:
raise AssertionError
raise NotImplementedError

# Don't complain if non-runnable code isn't run:
if 0:
if __name__ == .__main__.:
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Change Log
All notable changes to this project will be documented in this file.

## 2.2.1 - 2026-05-31
### Runner
- added option to use day="last" for monthly scheduling

## 2.2.0 - 2026-03
### Runner
Expand Down
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ Runner uses a schedule-based approach:
#### Data Flow

For each pipeline execution:
1. **Dependency verification**: Check that all required input tables have data within specified time intervals
2. **Transformation execution**: Run your transformation to produce a Spark DataFrame
3. **Automatic enrichment**: Runner adds `INFORMATION_DATE` (the run date) and `VERSION` (package version) columns
4. **Partitioned write**: Data is written to Databricks with partitioning configuration
5. **Reporting**: Optional email notifications on failure and run information stored to tracking table
1. **Previous completion check**: Runner checks if the target table partition for the run date already exists (unless `rerun` is enabled)
2. **Dependency verification**: Check that all required input tables have data within specified time intervals
3. **Transformation execution**: Run your transformation to produce a Spark DataFrame
4. **Automatic enrichment**: Runner adds `INFORMATION_DATE` (the run date) and `VERSION` (package version) columns
5. **Partitioned write**: Data is written to Databricks with partitioning configuration
6. **Reporting**: Optional email notifications on failure and run information stored to tracking table

#### Dependency Tracking

Expand All @@ -66,13 +67,13 @@ Runner's dependency tracking ensures that all required input data is available b
* **Missing data handling**: If required data is missing, Runner raises an error for that specific pipeline/date but continues executing other pipelines and dates in the queue

**Example:** If you're running a pipeline on 2024-01-15 with a dependency that has a 7-day interval:
* Runner checks if the dependency table has data for 2024-01-08 (15 days - 7 days)
* Runner checks if the dependency table has data between 2024-01-08 and 2024-01-15 (15 days - 7 days)
* If the dependency has `filters: {VERSION: "v2"}`, it specifically checks for data where VERSION='v2'
* If data exists, the pipeline proceeds; otherwise, an error is raised for this specific execution, but other scheduled runs continue

### Transformation
### Job/Transformation
For the details on the interface see the [implementation](rialto/runner/transformation.py)
Inside the transformation you have access to a [TableReader](#common), date of running, and if provided to Runner, a live spark session and [metadata manager](#metadata).
Inside the job you have access to a [TableReader](#common), date of running, and if provided to Runner, a live spark session and [metadata manager](#metadata).
You can either implement your jobs directly via extending the Transformation class, or by using the [jobs](#jobs) abstraction.

### Runner
Expand Down Expand Up @@ -163,7 +164,7 @@ pipelines: # a list of pipelines to run
python_class: Pipeline2Class
schedule:
frequency: monthly
day: 6
day: 6 # or 'latest' for the last day of the month, otherwise avoid using days higher than 28 to ensure all months are covered
info_date_shift: # can be combined as a list
- units: "days"
value: 5
Expand Down Expand Up @@ -434,7 +435,7 @@ With that sorted out, we can now provide a quick example of the *rialto.jobs* mo
from pyspark.sql import DataFrame
from rialto.common import TableReader
from rialto.jobs import config_parser, job, datasource
from rialto.runner.config_loader import PipelineConfig
from rialto.runner.services.config_loader import PipelineConfig
from pydantic import BaseModel


Expand Down
19 changes: 19 additions & 0 deletions rialto/common/table_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ def get_table(
"""
raise NotImplementedError

@abc.abstractmethod
def table_exists(self, table: str) -> bool:
"""
Check table exists in storage

:param table: full table path
:return: bool
"""
raise NotImplementedError


class TableReader(DataReader):
"""An implementation of data reader for databricks tables"""
Expand Down Expand Up @@ -165,3 +175,12 @@ def get_table(
if uppercase_columns:
df = self._uppercase_column_names(df)
return df

def table_exists(self, table: str) -> bool:
"""
Check table exists in spark catalog

:param table: full table path
:return: bool
"""
return self.spark.catalog.tableExists(table)
2 changes: 1 addition & 1 deletion rialto/jobs/job_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from rialto.loader import PysparkFeatureLoader
from rialto.metadata import MetadataManager
from rialto.runner import Transformation
from rialto.runner.config_loader import PipelineConfig
from rialto.runner.services.config_loader import PipelineConfig


class JobMetadata(BaseModel):
Expand Down
2 changes: 1 addition & 1 deletion rialto/metadata/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

def class_to_catalog_name(class_name) -> str:
"""
Map python class name of feature group (CammelCase) to databricks compatible format (lowercase with underscores)
Map python class name of feature group (CamelCase) to databricks compatible format (lowercase with underscores)

:param class_name: Python class name
:return: feature storage name
Expand Down
107 changes: 0 additions & 107 deletions rialto/runner/date_manager.py

This file was deleted.

159 changes: 159 additions & 0 deletions rialto/runner/engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Copyright 2022 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

__all__ = ["RunnerEngine"]

import traceback
from datetime import datetime
from typing import List

from loguru import logger
from pyspark.sql import DataFrame

from rialto.runner.runner_services import RunnerServices
from rialto.runner.services.config_loader import PipelineConfig
from rialto.runner.services.result_mapper import TaskResultMapper
from rialto.runner.services.task_registry import PipelineTask


class RunnerEngine:
"""Orchestrates pipeline execution lifecycle and task tracking"""

def __init__(self, services: RunnerServices, rerun: bool, skip_dependencies: bool):
self.services = services
self.rerun = rerun
self.skip_dependencies = skip_dependencies

def select_pipelines(self, op: str = None) -> List[PipelineConfig]:
"""Select pipelines to run based on operation name"""
if op:
selected = [p for p in self.services.config.pipelines if p.name == op]
if not selected:
raise ValueError(f"Unknown operation selected: {op}")
return selected
return self.services.config.pipelines

def register_tasks(self, pipelines: List[PipelineConfig]) -> None:
"""Register tasks for all pipelines and date combinations"""
for pipeline in pipelines:
for exec_date, partition_date in self.services.date_manager.get_execution_and_partition_dates(
pipeline.schedule
):
self.services.registry.add_task(
name=pipeline.name,
execution_date=exec_date,
partition_date=partition_date,
config=pipeline,
)

def check_tasks(self) -> None:
"""Check task completion and dependency status"""
for task in self.services.registry.tasks:
if not self.rerun:
try:
self.services.task_checker.check_completion(task)
except Exception as e:
logger.error(f"{task.name} completion check failed for {task.partition_date}:\n\t{e}")
task.precheck_failed = True
task.error = str(e)
task.error_trace = traceback.format_exc()
if not self.skip_dependencies:
try:
self.services.task_checker.check_pipeline_dependencies(task)
except Exception as e:
logger.error(f"{task.name} dependency check failed for {task.partition_date}:\n\t{e}")
task.precheck_failed = True
task.error = str(e)
task.error_trace = traceback.format_exc()

def log_task_status(self) -> None:
"""Log summary of task statuses"""
self.services.registry.log_status()

def run_tasks(self) -> None:
"""Execute runnable tasks with per-task error isolation"""
for task in self.services.registry.tasks:
logger.info(f"Executing task {task.name} for partition date {task.partition_date}")
self._execute_task_with_tracking(task)

def _execute_task_with_tracking(self, task: PipelineTask) -> None:
"""Execute single task with record tracking"""
run_start = datetime.now()

if task.precheck_failed:
self.services.tracker.add(TaskResultMapper.exception(task, run_start, task.error, task.error_trace))
return

# Skip already-complete tasks
if task.completion and not self.rerun:
logger.info(f"Skipping task {task.name} for partition {task.partition_date} - already complete")
self.services.tracker.add(TaskResultMapper.already_complete(task, run_start))
return

# Skip if dependencies not met
incomplete_deps = [
f"{dep.table.get_table_path()} from {dep.date_from} until {dep.date_until}"
for dep in task.dependencies
if not dep.complete
]
if incomplete_deps and not self.skip_dependencies:
logger.info(
f"Incomplete dependencies for task {task.name} for "
f"partition {task.partition_date} - {', '.join(incomplete_deps)}"
)
self.services.tracker.add(TaskResultMapper.dependencies_incomplete(task, run_start, incomplete_deps))
return

# Execute task
try:
df = self.services.executor.execute(task)
self.services.writer.write(df, task.partition_date, task.target)
records = self.services.data_checker.check_written(task.target, task.partition_date, df)
logger.info(
f"Task {task.name} for partition {task.partition_date} completed successfully with {records} records"
)
self.services.tracker.add(TaskResultMapper.success(task, run_start, records))
except KeyboardInterrupt:
self.services.tracker.add(TaskResultMapper.interrupted(task, run_start))
raise
except Exception as e:
logger.exception(f"Task {task.name} failed for partition {task.partition_date}")
self.services.tracker.add(TaskResultMapper.exception(task, run_start, str(e), traceback.format_exc()))

def finalize(self) -> None:
"""Send final reports via mail/bookkeeping"""
self.services.tracker.report_by_mail()
self.log_task_status()

def run(self, op: str = None) -> None:
"""Execute all tasks"""
pipelines = self.select_pipelines(op)
self.register_tasks(pipelines)
self.check_tasks()
self.log_task_status()
self.run_tasks()
self.finalize()

def dry_run_execution(self, op: str = None) -> None:
"""Execute pre-run checks without task execution"""
pipelines = self.select_pipelines(op)
self.register_tasks(pipelines)
self.check_tasks()
self.log_task_status()

def debug_first_task(self, op: str = None) -> DataFrame:
"""Debug mode: execute first task and return result"""
pipelines = self.select_pipelines(op)
self.register_tasks(pipelines)
return self.services.executor.execute(self.services.registry.tasks[0])
Loading
Loading