PostgreSQL 通过python 监控逻辑复制

网友投稿 770 2022-11-24

PostgreSQL 通过python 监控逻辑复制

PostgreSQL 通过python 监控逻辑复制

上期是讲逻辑复制,本期是通过PYTHON 来对逻辑复制中的配置参数,publication 定义, 打印不适合进行逻辑复制的表,打印没有在使用的复制槽,另外包含当前发布端和接收端两边的LSN对比。

以下是代码,对于逻辑复制中主要的监控点有

1  是不是存在复制槽不使用的情况

2  是不是存在主库和从库之间的复制延迟(异步)

3  当前库是不是存在不适合进行逻辑复制的表

4  当前库是不是有设置发布

#!/usr/bin/python3import osimport sysimport psycopg2import reimport subprocess#检测当前PG是否具备进行逻辑复制的参数配置def parameter():    conn = None    conn = psycopg2.connect(database="postgres",user="postgres",password="",host="localhost",port="5432")    conn_sub = psycopg2.connect(database="postgres",user="admin",password="admin",host="192.168.198.101",port="5432")    cur = conn.cursor()    cur.execute("""show wal_level;""")    rows = cur.fetchall()    for row in rows:        print("启用逻辑复制wal_level必须为logical")        print("________________")        print(row)    cur = conn.cursor()    cur.execute("""show max_worker_processes;""")    rows = cur.fetchall()    for row in rows:        print("启用逻辑复制,请注意max_worker_process的数字")        print("________________")        print(row)    cur = conn.cursor()    cur.execute("""show max_replication_slots""")    rows = cur.fetchall()    for row in rows:        print("启用逻辑复制,请注意复制槽的数字")        print("________________")        print(row)    cur = conn.cursor()    cur.execute("""show max_wal_senders;""")    rows = cur.fetchall()    for row in rows:        print("启用逻辑复制,请注意最大max_wal_sender数字")        print("________________")        print(row)    cur = conn.cursor()    cur.execute("""show max_logical_replication_workers;""")    rows = cur.fetchall()    for row in rows:        print("启用逻辑复制,请注意最大max_logical_replication_workers数字")        print("________________")        print(row)

cur = conn.cursor()    cur.execute("""show max_sync_workers_per_subscription;""")    rows = cur.fetchall()    for row in rows:        print("启用逻辑复制,请注意最大max_sync_workers_per_subscription数字")        print("________________")        print(row)     cur = conn.cursor()    cur.execute("""select pubname,puballtables,pubinsert,pubupdate,pubdelete,pubtruncate from pg_publication;""")    rows = cur.fetchmany(100)    print("当前库publication定义发布信息")    print("-----------------------------")    print("发布定义名  是否对全库表进行发布定义  追踪插入  追踪更新  追踪删除        追踪truncate")    for row in rows:        print(row)

print("----------------------------------------------")    cur = conn.cursor()    cur.execute("""select pubname,tablename from pg_publication_tables;""")    rows = cur.fetchmany(100)    print("当前库发布的数据表")    print("------------------")    print("发布定义名  发布表名")    for row in rows:        print(row)    print("----------------------------------------------")    cur = conn.cursor()    cur.execute("""select relname,relhasindex from pg_catalog.pg_class as cinner join pg_namespace as n on (c.relnamespace = n.oid and n.nspname NOT IN ('information_schema', 'pg_catalog') and c.relkind='r') where c.relhasindex = 'f';""")    rows = cur.fetchmany(100)    print("当前库不适合建立逻辑复制表list")    print("_______________________________")    print("表名  无主键")    for row in rows:        print(row)    print("----------------------------------------------")    cur = conn.cursor()    cur.execute("""SELECT slot_name,slot_type,active FROM pg_replication_slots where active= 'f';""")    rows = cur.fetchmany(100)    print("注意当前没有被使用的复制槽,确认不使用请立即删除")    print("_______________________________")    print("复制槽名   复制槽类型   目前不在使用")    for row in rows:        print(row)    print("删除复制槽语句 select pg_drop_replication('复制槽名')")    print("------------------------------------------------------")    cur = conn.cursor()    cur.execute("""select sent_lsn,replay_lsn from pg_stat_replication;""")    rows = cur.fetchmany(100)    print("主库lsn信息")    print("_______________________________")    print("发送LSN     回执LSN")       for row in rows:        print(row)    print("------------------------------------------------------")    cur = conn_sub.cursor()    cur.execute("""select subname,received_lsn,latest_end_time::varchar(20) from pg_stat_subscription;""")    rows = cur.fetchmany(100)    print("查看目的端数据接收状态")    print("-----------------------")    print("接收端名    接收到的LSN    最后接收到LSN时间")    for row in rows:        print(row)    conn.commit()    conn_sub.commit()    conn_sub.close    conn.closeif __name__ == "__main__":    parameter()

下图为执行后的效果

下面有两个地方需要注意

1  设定subscription 的连接,需要在接收端的位置设置用户密码

2  可以将连接设置为及时输入的模式,会更方便,这里没有写死了。

另逻辑复制中最怕的是接收端数据出现问题,导致复制停止,目前需要通过日志来查询出现的问题。程序里面并未有及时分析日志的部分。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:《财富自由之路》--解读
下一篇:解析springboot集成AOP实现日志输出的方法
相关文章

 发表评论

暂时没有评论,来抢沙发吧~