Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/root_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ func Prepare() *cobra.Command {
validateRulesCmd.Flags().Bool("json", false, "Output the validation status in JSON format")
validateCmd.AddCommand(validateRulesCmd)

// validate schema cmd
validateSchemaCmd.Flags().String("postgres-url", "", "Source postgres URL to validate the schema against")
validateSchemaCmd.Flags().Bool("json", false, "Output the validation status in JSON format")
validateCmd.AddCommand(validateSchemaCmd)

// Flag binding for root cmd
rootFlagBinding(rootCmd)

Expand Down
57 changes: 56 additions & 1 deletion cmd/validate_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var validateCmd = &cobra.Command{
Short: "Validate different parts of the pgstream configuration",
}

var errNoPostgresURL = errors.New("postgres URL is required for transformation rules validation")
var errNoPostgresURL = errors.New("postgres URL is required for validation")

var validateRulesCmd = &cobra.Command{
Use: "rules",
Expand Down Expand Up @@ -91,6 +91,55 @@ var validateRulesCmd = &cobra.Command{
`,
}

var validateSchemaCmd = &cobra.Command{
Use: "schema",
Short: "Validates source schema compatibility before running replication",
PreRunE: validateSchemaFlagBinding,
RunE: func(cmd *cobra.Command, args []string) error {
sp, _ := pterm.DefaultSpinner.WithText("validating pgstream source schema...").Start()

err := func() error {
streamConfig, err := config.ParseStreamConfig()
if err != nil {
return fmt.Errorf("parsing stream config: %w", err)
}

if streamConfig.Listener.Postgres == nil || streamConfig.Listener.Postgres.URL == "" {
return errNoPostgresURL
}

statusChecker := stream.NewStatusChecker()
schemaStatus, err := statusChecker.SchemaCompatibilityStatus(context.Background(), streamConfig)
if err != nil {
return err
}

if len(schemaStatus.GetErrors()) == 0 {
sp.Success("source schema is compatible")
} else {
sp.Warning("pgstream validation check identified issues: ", strings.Join(schemaStatus.GetErrors(), ", "))
}

err = print(cmd, schemaStatus)
if err != nil {
return fmt.Errorf("failed to format pgstream schema compatibility status: %w", err)
}

return nil
}()
if err != nil {
sp.Fail(err.Error())
}

return err
},
Example: `
pgstream validate schema -c pg2pg.env
pgstream validate schema --postgres-url <postgres-url>
pgstream validate schema -c pg2pg.yaml --json
`,
}

func validateRulesFlagBinding(cmd *cobra.Command, _ []string) error {
// to be able to overwrite configuration with flags when yaml config file is
// provided
Expand All @@ -102,3 +151,9 @@ func validateRulesFlagBinding(cmd *cobra.Command, _ []string) error {
viper.BindPFlag("PGSTREAM_POSTGRES_LISTENER_URL", cmd.Flags().Lookup("postgres-url"))
return nil
}

func validateSchemaFlagBinding(cmd *cobra.Command, _ []string) error {
viper.BindPFlag("source.postgres.url", cmd.Flags().Lookup("postgres-url"))
viper.BindPFlag("PGSTREAM_POSTGRES_LISTENER_URL", cmd.Flags().Lookup("postgres-url"))
return nil
}
40 changes: 39 additions & 1 deletion docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ pgstream validate <subcommand> [flags]
```

**Description:**
The `validate` command allows you to validate specific aspects of your pgstream configuration before running it. Currently supports validating transformation rules.
The `validate` command allows you to validate specific aspects of your pgstream configuration before running it. It currently supports validating transformation rules and source schema compatibility.

#### validate rules

Expand Down Expand Up @@ -326,6 +326,44 @@ Transformation rules status:
- CI/CD pipeline integration for rule validation
- Debugging transformation rule issues

#### validate schema

Validates source schema compatibility before running replication.

```bash
pgstream validate schema [flags]
```

**Description:**
The `validate schema` command checks that the configured source tables can be read safely by pgstream before replication starts.

- Column type resolution from the source database
- Replica identity safety for update and delete events
- Table accessibility through the configured source connection

**Prerequisites:**

- Access to the source PostgreSQL database

**Flags:**

- `--postgres-url` - Source postgres URL to validate the schema against
- `--json` - Output the validation status in JSON format

**Examples:**

```bash
pgstream validate schema -c pg2pg.env
pgstream validate schema --postgres-url <postgres-url>
pgstream validate schema -c pg2pg.yaml --json
```

**Use Cases:**

- Pre-deployment validation of source tables
- Catching replica identity issues before replication starts
- Verifying source tables are reachable and readable

### destroy

It destroys any pgstream setup, removing the replication slot and all the relevant tables/functions/triggers, along with the internal pgstream schema.
Expand Down
47 changes: 47 additions & 0 deletions pkg/stream/stream_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ type Status struct {
Source *SourceStatus
}

type SchemaCompatibilityStatus struct {
Valid bool
Tables []SchemaTableStatus
}

type SchemaTableStatus struct {
Schema string
Table string
Errors []string
}

type ConfigStatus struct {
Valid bool
Errors []string
Expand Down Expand Up @@ -106,6 +117,42 @@ func (s *Status) PrettyPrint() string {
return prettyPrint.String()
}

func (s *SchemaCompatibilityStatus) GetErrors() []string {
if s == nil {
return nil
}

errors := []string{}
for _, table := range s.Tables {
if len(table.Errors) == 0 {
continue
}
errors = append(errors, table.Errors...)
}

return errors
}

func (s *SchemaCompatibilityStatus) PrettyPrint() string {
if s == nil {
return ""
}

var prettyPrint strings.Builder
prettyPrint.WriteString("Schema compatibility status:\n")
fmt.Fprintf(&prettyPrint, " - Valid: %t\n", s.Valid)

for _, table := range s.Tables {
if len(table.Errors) == 0 {
continue
}
fmt.Fprintf(&prettyPrint, " - Table %s.%s\n", table.Schema, table.Table)
fmt.Fprintf(&prettyPrint, " - Errors: [%s]\n", strings.Join(table.Errors, "; "))
}

return strings.TrimSuffix(prettyPrint.String(), "\n")
}

// GetErrors aggregates all errors from the initialisation status.
func (is *InitStatus) GetErrors() []string {
if is == nil {
Expand Down
Loading
Loading