Cluster object

Cluster base object

Cluster object is your interface to all the Managed Objects and other features of PGMob. You use it to connect to your PostgreSQL server using the adapter, such as psycopg2. A connected Cluster object gives you the following capabilities:

  • Managing server connections

  • Managing database objects (current database only)

  • Execute ad-hoc SQL queries and shell commands

  • Run backup/restore operations

class pgmob.cluster.Cluster(connection=None, become=None, adapter: BaseAdapter | None = None, shell: _BaseShellEnv | None = None, *args, **kwargs)

Provides a management interface for postgres cluster configuration.

Parameters:
  • connection (Any) – a connection object. Overrides other connectivity args.

  • become (str) – become this role after connecting. Executes SET ROLE <rolename>

  • adapter (adapters.BaseAdapter) – Postgres connectivity Adapter object. Defines which underlying driver to be used for database communications.

  • *args – underlying adapter connection parameters

  • **kwargs – underlying adapter connection parameters

become

become this role after connecting. Executes SET ROLE <rolename>

Type:

str

current_database

Database name, to which the connection is currently established

Type:

str

adapter

Postgres connectivity Adapter object. Defines which underlying driver to be used for database communications.

Type:

BaseAdapter

Examples

To connect to the PostgreSQL server use the same arguments as you would when using an underlying adapter, such as psycopg2:

>>> cluster = Cluster(host="127.0.0.1", user="postgres", password="s3cur3p@ss")  

Validate that the adapter is connected:

>>> cluster.adapter.is_connected
True
property databases: DatabaseCollection

Postgres database objects

Example

Reassign database ownership: >>> db = “somedatabase” # doctest: +SKIP >>> role = “someotherrole” # doctest: +SKIP >>> cluster.databases[db].owner = role >>> cluster.databases[db].alter() >>> cluster.databases[db].refresh() >>> cluster.databases[db].owner == role True

execute(query: Composable | str, params: Tuple[Any] | Any | None = None) List[Tuple[Any]]

Execute a query against Postgres server. Transaction would be automatically committed upon completion.

Parameters:
  • query (Union[Composable, str]) – Query text or a Composable object

  • params (Union[Tuple[Any], Any]) – Tuple of parameter values (or a single value) to replace parameters in the query

Returns:

List of tuples returned from the server, or empty list if no rows were selected.

Return type:

List[Tuple[Any]]]

Raises:

AdapterError – Whenever the adapter returns an error.

Example

To execute a simple query with parameters, pass the parameters either as a tuple or as *args:

>>> cluster.execute("SELECT schemaname, viewowner FROM pg_views WHERE viewname = %s", "pg_tables")
[('pg_catalog', 'postgres')]
>>> cluster.execute("SELECT schemaname, viewowner FROM pg_views WHERE viewname = ANY(%s)", (["pg_tables", "pg_views"], ))
[('pg_catalog', 'postgres'), ('pg_catalog', 'postgres')]
>>> cluster.execute("SELECT schemaname, viewowner FROM pg_views WHERE 1=2")
[]

You can also use Composable objects to properly handle identifiers and literals in your SQL code: >>> from pgmob.sql import SQL, Identifier, Literal >>> sql = SQL(“SELECT schemaname, viewowner FROM {table} WHERE viewname = %s”).format(table=Identifier(“pg_views”)) >>> cluster.execute(sql, “pg_tables”) [(‘pg_catalog’, ‘postgres’)] >>> sql = SQL(“SET LOCAL TIME ZONE {tz}”).format(tz=Literal(“PST8PDT”)) >>> cluster.execute(sql) []

execute_with_cursor(task: Callable[[BaseCursor], Any], *args, **kwargs) Any

Executes a task against a cursor. Transaction would be automatically committed upon finish.

Parameters:

task (Callable[[BaseCursor], Any]) – callable with cursor object as an only argument

Returns:

List of tuples returned from the server or None if no rows selected.

Return type:

Optional[List[Tuple[Any]]]]

Example

Run a task that fetches a row from a cursor

>>> def task(cursor):
...     cursor.execute("SELECT 1, 2")
...     return cursor.fetchone()
>>> cluster.execute_with_cursor(task)
(1, 2)
property hba_rules: HBARuleCollection

Postgres HBA rules

Allows you to modify pg_hba on the fly by comparing, adding, and removing entries from the pg_hba file.

Example

Adding an entry to pg_hba

>>> entry = "host all postgres 127.0.0.1/32 trust"
>>> entry in cluster.hba_rules
False
>>> cluster.hba_rules.extend(entry)
>>> cluster.hba_rules.alter()
property large_objects: LargeObjectCollection

Postgres large objects

property procedures: ProcedureCollection

Postgres procedures, functions, aggregates and window functions

Example

Retrieving procedures from the server

>>> cluster.execute("CREATE FUNCTION foo () RETURNS int AS 'SELECT 1' LANGUAGE SQL")
[]
>>> cluster.execute("CREATE FUNCTION foo (int) RETURNS int AS 'SELECT 1' LANGUAGE SQL")
[]
>>> cluster.procedures['foo']
[Function('foo'), Function('foo')]
reassign_owner(new_owner: str, owner: str | None = None, objects: list | None = None)

Reassigns ownership of Postgres objects. When “objects” parameter is provided, only the ownership for those objects will be changed. When both “owner” and “new_owner” are specified, reassigns ownership of all objects owned by “owner”. This includes database ownership.

Parameters:
  • new_owner (str) – new owner name

  • owner (str) – old owner name

  • objects (list) – An iterable containing pgcmo objects, such as tables or databases.

Raises:

AttributeError – when neither owner nor objects is specified.

refresh()

Refresh dynamic properties of the object. They will be retrieved from the Postgres cluster next time they are called.

Example

Refresh cluster objects after dropping a database

>>> db = "somedb"  
>>> cluster.databases[db].drop()  # drop the database from the server
>>> db in cluster.databases  # the database is still in the local cache
True
>>> cluster.refresh()
>>> db in cluster.databases
False
reload() bool

Reload PostgreSQL configuration by executing pg_reload_conf()

Returns:

whether configuration reload was successful

Return type:

bool

Example

Reload PostgreSQL configuration

>>> cluster.reload()
True
property replication_slots: ReplicationSlotCollection

Postgres replication slot objects

property roles: RoleCollection

Postgres role objects

Example

Create a new role:

>>> owner_role = cluster.roles.new(name="db1owner", password="foobar")
>>> owner_role.create()
>>> cluster.roles.refresh()
>>> "db1owner" in cluster.roles
True

Clone an existing role

>>> sql = cluster.roles["db1owner"].script().decode("utf8").replace("db1owner", "newrole")
>>> cluster.execute(sql)
[]
>>> cluster.refresh()
>>> cluster.roles["newrole"]
Role('newrole')
run_os_command(command: str, raise_exception=True) OSCommandResult

Runs an OS command by reading output from a binary exectuted via COPY FROM command. The command string would be put into an EOF block. Make sure to follow shell symbol escape rules.

Parameters:
  • command (str) – command to execute

  • raise_exception (bool) – raise PostgresShellCommandError when command returns a non-zero exit code

Returns:

command execution results

Return type:

OSCommandResult

Raises:

PostgresShellCommandError – on non-zero exit code when raise_exception == True

Example

Run the command and return results

>>> iam = cluster.run_os_command("whoami")
>>> iam.exit_code
0
>>> iam.text
'postgres'
property schemas: SchemaCollection

Postgres schema objects

Example

Migrate tables from one schema to another

>>> schema = "myschema"  
>>> new_schema = "app_schema"
>>> cluster.schemas.new(new_schema).create()
>>> for t in [t for t in cluster.tables if t.schema == schema]:
...     t.schema = new_schema
...     t.alter()
property sequences: SequenceCollection

Postgres sequence objects

Example

Retrieving sequences from the server

>>> cluster.execute("CREATE SEQUENCE seq1")
[]
>>> cluster.sequences['seq1']
Sequence('seq1')
>>> cluster.sequences['seq1'].owner
'postgres'
property tables: TableCollection

Postgres table objects

Example

Retrieving tables from the server after refreshing table objects

>>> cluster.execute("CREATE TABLE tab1(a int)")
[]
>>> cluster.tables.refresh()
>>> cluster.tables['tab1']
Table('tab1')
>>> cluster.tables['tab1'].owner
'postgres'
terminate(all_connections: bool | None = None, databases: List[str] | None = None, pids: List[int] | None = None, roles: List[str] | None = None, exclude_roles: List[str] | None = None, exclude_databases: List[str] | None = None, exclude_pids: List[int] | None = None) List[int]

Terminates connections based on provided parameters. Will avoid terminating system PIDs and self.

Parameters:
  • databases (List[str]) – names of target databases

  • roles (List[str]) – roles which connections should be terminated

  • pids (List[int]) – pids to terminate

  • exclude_roles (List[str]) – roles to exclude

  • exclude_databases (List[str]) – databases to exclude

  • exclude_pids (List[str]) – pids to exclude

  • all_connections (bool) – terminate all connections (except own and system PIDs)

Returns:

PIDs of the terminated connections

Return type:

List[int]

Example

Terminate connections from a specific role to a specific database

>>> cluster.terminate(databases=["somedb"], roles=["someapp"])
[]
property views: ViewCollection

Postgres views

Example

Retrieving views from the server

>>> cluster.execute("CREATE VIEW v1 AS SELECT 1")
[]
>>> cluster.views['v1']
View('v1')
>>> cluster.views['v1'].owner
'postgres'