Cluster object¶
Cluster base object
Cluster object is your interface to all the Managed Objects and other features of PGMob. You use it to connect to your PostgreSQL
server using the adapter, such as psycopg2
. A connected Cluster object gives you the following capabilities:
Managing server connections
Managing database objects (current database only)
Execute ad-hoc SQL queries and shell commands
Run backup/restore operations
- class pgmob.cluster.Cluster(connection=None, become=None, adapter: BaseAdapter | None = None, shell: _BaseShellEnv | None = None, *args, **kwargs)¶
Provides a management interface for postgres cluster configuration.
- Parameters:
connection (Any) – a connection object. Overrides other connectivity args.
become (str) – become this role after connecting. Executes SET ROLE <rolename>
adapter (adapters.BaseAdapter) – Postgres connectivity Adapter object. Defines which underlying driver to be used for database communications.
*args – underlying adapter connection parameters
**kwargs – underlying adapter connection parameters
- become¶
become this role after connecting. Executes SET ROLE <rolename>
- Type:
str
- current_database¶
Database name, to which the connection is currently established
- Type:
str
- adapter¶
Postgres connectivity Adapter object. Defines which underlying driver to be used for database communications.
- Type:
Examples
To connect to the PostgreSQL server use the same arguments as you would when using an underlying adapter, such as psycopg2:
>>> cluster = Cluster(host="127.0.0.1", user="postgres", password="s3cur3p@ss")
Validate that the adapter is connected:
>>> cluster.adapter.is_connected True
- property databases: DatabaseCollection¶
Postgres database objects
Example
Reassign database ownership: >>> db = “somedatabase” # doctest: +SKIP >>> role = “someotherrole” # doctest: +SKIP >>> cluster.databases[db].owner = role >>> cluster.databases[db].alter() >>> cluster.databases[db].refresh() >>> cluster.databases[db].owner == role True
- execute(query: Composable | str, params: Tuple[Any] | Any | None = None) List[Tuple[Any]] ¶
Execute a query against Postgres server. Transaction would be automatically committed upon completion.
- Parameters:
query (Union[Composable, str]) – Query text or a Composable object
params (Union[Tuple[Any], Any]) – Tuple of parameter values (or a single value) to replace parameters in the query
- Returns:
List of tuples returned from the server, or empty list if no rows were selected.
- Return type:
List[Tuple[Any]]]
- Raises:
AdapterError – Whenever the adapter returns an error.
Example
To execute a simple query with parameters, pass the parameters either as a tuple or as *args:
>>> cluster.execute("SELECT schemaname, viewowner FROM pg_views WHERE viewname = %s", "pg_tables") [('pg_catalog', 'postgres')] >>> cluster.execute("SELECT schemaname, viewowner FROM pg_views WHERE viewname = ANY(%s)", (["pg_tables", "pg_views"], )) [('pg_catalog', 'postgres'), ('pg_catalog', 'postgres')] >>> cluster.execute("SELECT schemaname, viewowner FROM pg_views WHERE 1=2") []
You can also use Composable objects to properly handle identifiers and literals in your SQL code: >>> from pgmob.sql import SQL, Identifier, Literal >>> sql = SQL(“SELECT schemaname, viewowner FROM {table} WHERE viewname = %s”).format(table=Identifier(“pg_views”)) >>> cluster.execute(sql, “pg_tables”) [(‘pg_catalog’, ‘postgres’)] >>> sql = SQL(“SET LOCAL TIME ZONE {tz}”).format(tz=Literal(“PST8PDT”)) >>> cluster.execute(sql) []
- execute_with_cursor(task: Callable[[BaseCursor], Any], *args, **kwargs) Any ¶
Executes a task against a cursor. Transaction would be automatically committed upon finish.
- Parameters:
task (Callable[[BaseCursor], Any]) – callable with cursor object as an only argument
- Returns:
List of tuples returned from the server or None if no rows selected.
- Return type:
Optional[List[Tuple[Any]]]]
Example
Run a task that fetches a row from a cursor
>>> def task(cursor): ... cursor.execute("SELECT 1, 2") ... return cursor.fetchone() >>> cluster.execute_with_cursor(task) (1, 2)
- property hba_rules: HBARuleCollection¶
Postgres HBA rules
Allows you to modify pg_hba on the fly by comparing, adding, and removing entries from the pg_hba file.
Example
Adding an entry to pg_hba
>>> entry = "host all postgres 127.0.0.1/32 trust" >>> entry in cluster.hba_rules False >>> cluster.hba_rules.extend(entry) >>> cluster.hba_rules.alter()
- property large_objects: LargeObjectCollection¶
Postgres large objects
- property procedures: ProcedureCollection¶
Postgres procedures, functions, aggregates and window functions
Example
Retrieving procedures from the server
>>> cluster.execute("CREATE FUNCTION foo () RETURNS int AS 'SELECT 1' LANGUAGE SQL") [] >>> cluster.execute("CREATE FUNCTION foo (int) RETURNS int AS 'SELECT 1' LANGUAGE SQL") [] >>> cluster.procedures['foo'] [Function('foo'), Function('foo')]
- reassign_owner(new_owner: str, owner: str | None = None, objects: list | None = None)¶
Reassigns ownership of Postgres objects. When “objects” parameter is provided, only the ownership for those objects will be changed. When both “owner” and “new_owner” are specified, reassigns ownership of all objects owned by “owner”. This includes database ownership.
- Parameters:
new_owner (str) – new owner name
owner (str) – old owner name
objects (list) – An iterable containing pgcmo objects, such as tables or databases.
- Raises:
AttributeError – when neither owner nor objects is specified.
- refresh()¶
Refresh dynamic properties of the object. They will be retrieved from the Postgres cluster next time they are called.
Example
Refresh cluster objects after dropping a database
>>> db = "somedb" >>> cluster.databases[db].drop() # drop the database from the server >>> db in cluster.databases # the database is still in the local cache True >>> cluster.refresh() >>> db in cluster.databases False
- reload() bool ¶
Reload PostgreSQL configuration by executing pg_reload_conf()
- Returns:
whether configuration reload was successful
- Return type:
bool
Example
Reload PostgreSQL configuration
>>> cluster.reload() True
- property replication_slots: ReplicationSlotCollection¶
Postgres replication slot objects
- property roles: RoleCollection¶
Postgres role objects
Example
Create a new role:
>>> owner_role = cluster.roles.new(name="db1owner", password="foobar") >>> owner_role.create() >>> cluster.roles.refresh() >>> "db1owner" in cluster.roles True
Clone an existing role
>>> sql = cluster.roles["db1owner"].script().decode("utf8").replace("db1owner", "newrole") >>> cluster.execute(sql) [] >>> cluster.refresh() >>> cluster.roles["newrole"] Role('newrole')
- run_os_command(command: str, raise_exception=True) OSCommandResult ¶
Runs an OS command by reading output from a binary exectuted via COPY FROM command. The command string would be put into an EOF block. Make sure to follow shell symbol escape rules.
- Parameters:
command (str) – command to execute
raise_exception (bool) – raise PostgresShellCommandError when command returns a non-zero exit code
- Returns:
command execution results
- Return type:
- Raises:
PostgresShellCommandError – on non-zero exit code when raise_exception == True
Example
Run the command and return results
>>> iam = cluster.run_os_command("whoami") >>> iam.exit_code 0 >>> iam.text 'postgres'
- property schemas: SchemaCollection¶
Postgres schema objects
Example
Migrate tables from one schema to another
>>> schema = "myschema" >>> new_schema = "app_schema" >>> cluster.schemas.new(new_schema).create() >>> for t in [t for t in cluster.tables if t.schema == schema]: ... t.schema = new_schema ... t.alter()
- property sequences: SequenceCollection¶
Postgres sequence objects
Example
Retrieving sequences from the server
>>> cluster.execute("CREATE SEQUENCE seq1") [] >>> cluster.sequences['seq1'] Sequence('seq1') >>> cluster.sequences['seq1'].owner 'postgres'
- property tables: TableCollection¶
Postgres table objects
Example
Retrieving tables from the server after refreshing table objects
>>> cluster.execute("CREATE TABLE tab1(a int)") [] >>> cluster.tables.refresh() >>> cluster.tables['tab1'] Table('tab1') >>> cluster.tables['tab1'].owner 'postgres'
- terminate(all_connections: bool | None = None, databases: List[str] | None = None, pids: List[int] | None = None, roles: List[str] | None = None, exclude_roles: List[str] | None = None, exclude_databases: List[str] | None = None, exclude_pids: List[int] | None = None) List[int] ¶
Terminates connections based on provided parameters. Will avoid terminating system PIDs and self.
- Parameters:
databases (List[str]) – names of target databases
roles (List[str]) – roles which connections should be terminated
pids (List[int]) – pids to terminate
exclude_roles (List[str]) – roles to exclude
exclude_databases (List[str]) – databases to exclude
exclude_pids (List[str]) – pids to exclude
all_connections (bool) – terminate all connections (except own and system PIDs)
- Returns:
PIDs of the terminated connections
- Return type:
List[int]
Example
Terminate connections from a specific role to a specific database
>>> cluster.terminate(databases=["somedb"], roles=["someapp"]) []
- property views: ViewCollection¶
Postgres views
Example
Retrieving views from the server
>>> cluster.execute("CREATE VIEW v1 AS SELECT 1") [] >>> cluster.views['v1'] View('v1') >>> cluster.views['v1'].owner 'postgres'