python小工具-自动放行开发者公网ip至安全组

1,273次阅读
没有评论

共计 7298 个字符,预计需要花费 19 分钟才能阅读完成。

python小工具-自动放行开发者公网ip至安全组

近期遇到一些初创团队,单薄的资源,超载的使用是为常态( ̄_ ̄|||)

由于没有额外的资源,对于远程操作的安全管理,只能依靠防火墙和云上安全组了,此篇讲下安全组下如何自动放行

工具功能

  • 获取执行用户的本地出口公网ip
  • 在安全组上放行该指定公网ip的入向流量
  • 定时清理由工具自动添加的安全组规则

先准备下阿里云RAM用户权限

{
  "Version": "1",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "ecs:DescribeSecurityGroupReferences",
        "ecs:DescribeSecurityGroupAttribute",
        "ecs:DescribeSecurityGroups",
        "ecs:AuthorizeSecurityGroup",
        "ecs:RevokeSecurityGroup"
      ],
      "Resource": "*"
    }
  ]
}

接下来就是获取公网IP,也挺简单,打开百度搜索输入ip即可,查看百度搜索第一个返回的条目是:https://www.ip138.com/,用开发者模式查看下页面结构后,参考网友的方式写个简单的爬取方法

    @staticmethod
    def get_client_public_ip():
        # ip138.com中使用iframe,这里先获得iframe中的src
        # 每年iframe中的地址会变,比如 2019.ip138.com 2022.ip138.com
        headers = ("User-Agent",
                   "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36")
        opener = urllib.request.build_opener()
        opener.addheaders = [headers]
        data = opener.open("http://ip138.com")
        doc = pq(data.read())

        # 获得 iframe 标签的 src 属性的值
        # 获得出来大概是这样 "//2022.ip138.com/"
        # 再去掉两头多余的 "/"  就获得到实际的显示地址了
        url = "http://" + doc('iframe').eq(0).attr('src').replace('/', '')
        # print(url)
        opener.close()

        # 获取ip地址
        opener = urllib.request.build_opener()
        opener.addheaders = [headers]
        data = opener.open(url)
        doc = pq(data.read().decode('utf8'))

        # 取得素有的 <a> 元素
        lista = doc('body p a')

        # 取得第一个<a> 元素
        firstaddr = lista.eq(0).text()
        # print(firsta)
        return firstaddr

剩下就是调用阿里云openapi来操作安全组,需要安装如下依赖:

# 调用openapi需要
pip3.8 install alibabacloud_tea_openapi
pip3.8 install alibabacloud_ecs20140526==3.0.1

# 爬虫需要
pip3.8 install pyquery

看下完整代码:

# -*- coding: utf-8 -*-

import time
import datetime

import urllib
import urllib.parse
import urllib.request
from pyquery import PyQuery as pq
from typing import List

from alibabacloud_ecs20140526.client import Client as Ecs20140526Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ecs20140526 import models as ecs_20140526_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class AutoModSecPolicy:
    def __init__(self, region_id, endpoint, access_key_id, access_key_secret):
        self.region_id = region_id
        self.endpoint = endpoint
        self.access_key_id = access_key_id
        self.access_key_secret = access_key_secret
        self.client = self.create_client()

    def create_client(self) -> Ecs20140526Client:
        """
        使用AK&SK初始化账号Client
        @param access_key_id:
        @param access_key_secret:
        @return: Client
        @throws Exception
        """
        config = open_api_models.Config(
            # 您的 AccessKey ID,
            access_key_id=self.access_key_id,
            # 您的 AccessKey Secret,
            access_key_secret=self.access_key_secret
        )
        # 访问的域名
        config.endpoint = self.endpoint
        return Ecs20140526Client(config)

    def create_sec_policy(self,
                          source_ip: str,
                          description: str,
                          security_group_id: str,
                          ) -> None:
        permissions_0 = ecs_20140526_models.AuthorizeSecurityGroupRequestPermissions(
            policy='accept',
            priority='1',
            port_range='-1/-1',
            ip_protocol='ALL',
            source_cidr_ip=source_ip,
            description=description

        )
        authorize_security_group_request = ecs_20140526_models.AuthorizeSecurityGroupRequest(
            region_id=self.region_id,
            security_group_id=security_group_id,
            permissions=[
                permissions_0
            ]
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            resp = self.client.authorize_security_group_with_options(authorize_security_group_request, runtime)
            print(resp)
        except Exception as error:
            # 如有需要,请打印 error
            resp = UtilClient.assert_as_string(error.message)
            print(resp)

    def get_sec_group_ip(self,
                         security_group_id: str,
                         description: str,
                         ) -> List:
        describe_security_group_attribute_request = ecs_20140526_models.DescribeSecurityGroupAttributeRequest(
            security_group_id=security_group_id,
            region_id=self.region_id,
            direction='ingress'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            resp = self.client.describe_security_group_attribute_with_options(describe_security_group_attribute_request,
                                                                         runtime).to_map()
            policy_lists = resp['body']['Permissions']['Permission']
            # ip_lists = [ x['SourceCidrIp'] for x in policy_lists ]
            ip_lists = []
            nowDate = datetime.datetime.utcnow()

            for i in policy_lists:
                create_time = datetime.datetime.strptime(i['CreateTime'], "%Y-%m-%dT%H:%M:%SZ")

                # 删除48小时前策略
                if i['Description'] == description and create_time < nowDate - datetime.timedelta(hours=48):
                    ip_lists.append(i['SourceCidrIp'])
            # print(ip_lists)
            # print(policy_lists)
            return ip_lists
        except Exception as error:
            # 如有需要,请打印 error
            UtilClient.assert_as_string(error.message)


    def del_sec_group_ip(self,
                         ip_lists: list,
                         security_group_id: str,
                         description: str,
                         ) -> None:
        if not ip_lists:
            print("not found ip!")
            return 0
        policy_lists = [
            ecs_20140526_models.RevokeSecurityGroupRequestPermissions(
                policy='accept',
                priority='1',
                ip_protocol='ALL',
                port_range='-1/-1',
                source_cidr_ip=x,
                description=description
            ) for x in ip_lists
        ]

        revoke_security_group_request = ecs_20140526_models.RevokeSecurityGroupRequest(
            region_id=self.region_id,
            security_group_id=security_group_id,
            permissions=policy_lists
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            resp = self.client.revoke_security_group_with_options(revoke_security_group_request, runtime)
            print(resp)
        except Exception as error:
            # 如有需要,请打印 error
            UtilClient.assert_as_string(error.message)


    @staticmethod
    def get_client_public_ip():

        # ip138.com中使用iframe,这里先获得iframe中的src
        # 每年iframe中的地址会变,比如 2019.ip138.com 2022.ip138.com
        headers = ("User-Agent",
                   "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36")
        opener = urllib.request.build_opener()
        opener.addheaders = [headers]
        data = opener.open("http://ip138.com")
        doc = pq(data.read())

        # 获得 iframe 标签的 src 属性的值
        # 获得出来大概是这样 "//2022.ip138.com/"
        # 再去掉两头多余的 "/"  就获得到实际的显示地址了
        url = "http://" + doc('iframe').eq(0).attr('src').replace('/', '')
        # print(url)
        opener.close()

        # 获取ip地址
        opener = urllib.request.build_opener()
        opener.addheaders = [headers]
        data = opener.open(url)
        doc = pq(data.read().decode('utf8'))

        # 取得素有的 <a> 元素
        lista = doc('body p a')

        # 取得第一个<a> 元素
        firstaddr = lista.eq(0).text()
        # print(firsta)
        return firstaddr


if __name__ == '__main__':
    region_id = 'cn-guangzhou'
    endpoint = f'ecs.cn-guangzhou.aliyuncs.com'
    access_key_id = 'xxxx'
    access_key_secret = 'xxxxx'

    security_group_id = 'sg-xxxxx'
    description_by_create = 'auto create by generator'

    # 生成对象
    auto_obj = AutoModSecPolicy(region_id,endpoint, access_key_id, access_key_secret)

    # openapi有接口限流,限制下
    time.sleep(1)
    ip_lists = auto_obj.get_sec_group_ip(security_group_id=security_group_id, description=description_by_create)
    print('Cleanning IP:',ip_lists)

    time.sleep(1)
    auto_obj.del_sec_group_ip(ip_lists=ip_lists, security_group_id=security_group_id,
                              description=description_by_create)

    source_ip = auto_obj.get_client_public_ip()
    print("Adding IP",source_ip)

    auto_obj.create_sec_policy(source_ip=source_ip, security_group_id=security_group_id,
                               description=description_by_create)

为方便运行,也可以制作二进制执行文件,让用户用自己机器编译,避免平台或python版本差异导致异常

# 安装依赖
(venv) xadocker@xadocker-virtual-machine:~/PycharmProjects/untitled$ 
 pip3.8 install pyinstaller

# 创建二进制文件
(venv) xadocker@xadocker-virtual-machine:~/PycharmProjects/untitled$ 
 pyinstaller -F auto_modify_sercurity_group.py

# 制作完成后路劲
(venv) xadocker@xadocker-virtual-machine:~/PycharmProjects/untitled$ ll
total 36
drwxrwxr-x 6 xadocker xadocker 4096 7月 28 14:24 ./
drwxrwxr-x 3 xadocker xadocker 4096 7月 28 10:48 ../
-rw------- 1 xadocker xadocker 7608 7月 28 14:19 auto_modify_sercurity_group.py
-rw-rw-r-- 1 xadocker xadocker  854 7月 28 14:24 auto_modify_sercurity_group.spec
drwxrwxr-x 3 xadocker xadocker 4096 7月 28 14:24 build/
drwxrwxr-x 2 xadocker xadocker 4096 7月 28 14:25 dist/
drwxrwxr-x 3 xadocker xadocker 4096 7月 28 14:19 .idea/
drwxrwxr-x 6 xadocker xadocker 4096 7月 28 10:48 venv/
(venv) xadocker@xadocker-virtual-machine:~/PycharmProjects/untitled$ ll dist/
total 20124
drwxrwxr-x 2 xadocker xadocker     4096 10月 28 14:25 ./
drwxrwxr-x 6 xadocker xadocker     4096 10月 28 14:24 ../
-rwxr-xr-x 1 xadocker xadocker 20595280 10月 28 14:25 auto_modify_sercurity_group*

# 后面自己创个桌面快捷方式就可以了,略...

正文完
 
xadocker
版权声明:本站原创文章,由 xadocker 2022-07-16发表,共计7298字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)