一聚教程网:一个值得你收藏的教程网站

热门教程

Python通过跳板机访问数据库代码方法

时间:2022-06-25 01:37:56 编辑:袖梨 来源:一聚教程网

本篇文章小编给大家分享一下Python通过跳板机访问数据库代码方法,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。

什么是跳板机?

跳板机(Jump Server):也称堡垒机,是一类可作为跳板批量操作的远程设备的网络设备,是系统管理员和运维人员常用的操作平台之一。

那么具体是做什么的呢?

现在一些比较大的互联网企业,往往拥有大量的服务器,为了能够统一方便管理,运维人员通过跳板机去管理对应的服务器。我们在访问服务器的时候,先通过登陆跳板机,然后再进入相应服务器。从一定程度上提升了服务器的数据安全性,也提升了服务器的可维护性。

sshtunnel 连接堡垒机

跳板机的本质还是ssh连接,通过paramiko 自己造轮子的话理论也是可以实现的,这边推荐 sshtunnel 这个包,省的自己写了。

理解了跳板机的原理,这边的配置就比较好理解了。拿这张图举例,首先需要先访问跳板机,其实就是最常见的ssh连接,可以是账户密码,也可以是密钥登录的,如果是密钥的话需要在 跳板机的.ssh 目录下的某个许可文件中添加上自己的公钥

最后还需要补充一下 目标主机的ip 和端口。理解了这些,以下的这些参数就知道怎么配置了,

local_bind_address 可以不配置,不配置会随机获取本地的端口号,如果指定的话就会占用本地固定的端口号,推荐配置这一参数。

填写本地的端口是因为这边会将目标服务器ip的端口映射为本地端口,然后访问本地端口就能实现目标端口的访问了。

from sshtunnel import SSHTunnelForwarder
server = SSHTunnelForwarder(
        (host_jump, int(port_jump)),  # 跳板机的配置
        ssh_pkey=ssh_pk_jump, 
        #密钥的访问方式,如果是密码 ssh_password=???
        ssh_username=user_name_jump,
        remote_bind_address=(host_mysql, int(port_mysql)))  # 数据库服务器的配置
        local_bind_address=(local_ip, local_port)  # 开启本地转发端口
        )
	server.start()
	server.close()

有些人习惯用 上下文管理器,看起来更舒服些

from sshtunnel import SSHTunnelForwarder
with SSHTunnelForwarder(
        (host_jump, int(port_jump)),  # 跳板机的配置
        ssh_pkey=ssh_pk_jump, 
        #密钥的访问方式,如果是密码 ssh_password=???
        ssh_username=user_name_jump,
        remote_bind_address=(host_mysql, int(port_mysql)))  # 数据库服务器的配置
        local_bind_address=(local_ip, local_port)  # 开启本地转发端口
        )as server
	server.start()

连接数据库

建立好通道后就能通过本地映射的端口访问目标的数据库端口了。

常见的mysql 数据库访问 比如 mysqldb,pymysql,mysql.connector,

mysqldb跟 pymysql 差不多,只不过前者适用python2的版本,后者适用python3的版本,mysql.connector是官方推荐的。

pymysql 跟 mysql.connector 还是有一些区别的

我这边使用的是 mysql.connector ,在实践中老是无法连接上,而pymysql却是可以的。

最终终于找到了解决办法。。。

以下是我的mysql.connector 连接数据库的样例。

with SSHTunnelForwarder(
        (host_jump, int(port_jump)),  # 跳板机的配置
        ssh_pkey=ssh_pk_jump, 
        #密钥的访问方式,如果是密码 ssh_password=???
        ssh_username=user_name_jump,
        remote_bind_address=(host_mysql, int(port_mysql)))  # 数据库服务器的配置
        local_bind_address=(local_ip, local_port)  # 开启本地转发端口
        )as server
	server.start()
    conn = mysql.connector.connect(**{'host': '127.0.0.1',
                              'port': server.local_bind_port,
                              'user': dbcfg["user"],
                              'password': dbcfg["password"],
                              'db': dbcfg["db"],
                              'charset': dbcfg["charset"],
                              'collation': dbcfg["collation"],
                              'use_unicode': dbcfg["use_unicode"],
                              'use_pure': True})
        cursor = conn.cursor(dictionary=True)
        cursor.execute(sql, params)
        rows = cursor.fetchall()
        # conn.commit()
        cursor.close()
        conn.close()

这样,数据库就能正常访问啦

代码重构和优化

首先,就上面这样的代码而言,在每次查询都需要进行跳板机的连接,对于短时间突然大量请求的情况,这种是效率极低的。所以最好是能够将建立管道保存下来。

其次,如果同时访问多种数据库的情况,存在有无跳板机的场景,所以最好能够先将所有的数据库都进行连接,将conn保存在字典中,根据要访问的机器调用对应的conn,最后再统一释放。

def get_slave_conn_byProxy(dbcfg):
    server = SSHTunnelForwarder(
        eval(dbcfg["addr"]),
        ssh_username=dbcfg["ssh_username"],
        ssh_pkey=os.getenv("HOME") + "/.ssh/id_rsa",
        remote_bind_address=eval(dbcfg["remote_bind_address"]),
        local_bind_address=eval(dbcfg["local_bind_address"])  # 开启本地转发端口
    )
    server.start()
    conn = get_db_connection({'host': '127.0.0.1',
                              'port': server.local_bind_port,
                              'user': dbcfg["user"],
                              'password': dbcfg["password"],
                              'db': dbcfg["db"],
                              'charset': dbcfg["charset"],
                              'collation': dbcfg["collation"],
                              'use_unicode': dbcfg["use_unicode"],
                              'use_pure': True})
    return server, conn

def get_db_connection(dbcfg):
    # logger.info(f"============ debug db config: {dbcfg}")
    conn = mysql.connector.connect(**dbcfg)
    return conn

# 通过输入的连接参数判断是否需要通过跳板机的方式
def get_slave_conn(dbcfg):
    if "addr" in dbcfg:
        return get_slave_conn_byProxy(dbcfg)
    else:
        conn = get_db_connection(dbcfg)
        return None, conn
       
def read_slave_db_data(sql, conn, params=None, use_pdf=True):
        cursor = conn.cursor(dictionary=True)
        cursor.execute(sql, params)
        rows = cursor.fetchall()
        # conn.commit()
        cursor.close()
        if use_pdf:
            return pd.DataFrame(rows)
        else:
            return rows

热门栏目