共计 7298 个字符,预计需要花费 19 分钟才能阅读完成。
近期遇到一些初创团队,单薄的资源,超载的使用是为常态( ̄_ ̄|||)
由于没有额外的资源,对于远程操作的安全管理,只能依靠防火墙和云上安全组了,此篇讲下安全组下如何自动放行
工具功能
- 获取执行用户的本地出口公网ip
- 在安全组上放行该指定公网ip的入向流量
- 定时清理由工具自动添加的安全组规则
先准备下阿里云RAM用户权限
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ecs:DescribeSecurityGroupReferences",
"ecs:DescribeSecurityGroupAttribute",
"ecs:DescribeSecurityGroups",
"ecs:AuthorizeSecurityGroup",
"ecs:RevokeSecurityGroup"
],
"Resource": "*"
}
]
}
接下来就是获取公网IP,也挺简单,打开百度搜索输入ip即可,查看百度搜索第一个返回的条目是:https://www.ip138.com/,用开发者模式查看下页面结构后,参考网友的方式写个简单的爬取方法
@staticmethod
def get_client_public_ip():
# ip138.com中使用iframe,这里先获得iframe中的src
# 每年iframe中的地址会变,比如 2019.ip138.com 2022.ip138.com
headers = ("User-Agent",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36")
opener = urllib.request.build_opener()
opener.addheaders = [headers]
data = opener.open("http://ip138.com")
doc = pq(data.read())
# 获得 iframe 标签的 src 属性的值
# 获得出来大概是这样 "//2022.ip138.com/"
# 再去掉两头多余的 "/" 就获得到实际的显示地址了
url = "http://" + doc('iframe').eq(0).attr('src').replace('/', '')
# print(url)
opener.close()
# 获取ip地址
opener = urllib.request.build_opener()
opener.addheaders = [headers]
data = opener.open(url)
doc = pq(data.read().decode('utf8'))
# 取得素有的 <a> 元素
lista = doc('body p a')
# 取得第一个<a> 元素
firstaddr = lista.eq(0).text()
# print(firsta)
return firstaddr
剩下就是调用阿里云openapi来操作安全组,需要安装如下依赖:
# 调用openapi需要
pip3.8 install alibabacloud_tea_openapi
pip3.8 install alibabacloud_ecs20140526==3.0.1
# 爬虫需要
pip3.8 install pyquery
看下完整代码:
# -*- coding: utf-8 -*-
import time
import datetime
import urllib
import urllib.parse
import urllib.request
from pyquery import PyQuery as pq
from typing import List
from alibabacloud_ecs20140526.client import Client as Ecs20140526Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ecs20140526 import models as ecs_20140526_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class AutoModSecPolicy:
def __init__(self, region_id, endpoint, access_key_id, access_key_secret):
self.region_id = region_id
self.endpoint = endpoint
self.access_key_id = access_key_id
self.access_key_secret = access_key_secret
self.client = self.create_client()
def create_client(self) -> Ecs20140526Client:
"""
使用AK&SK初始化账号Client
@param access_key_id:
@param access_key_secret:
@return: Client
@throws Exception
"""
config = open_api_models.Config(
# 您的 AccessKey ID,
access_key_id=self.access_key_id,
# 您的 AccessKey Secret,
access_key_secret=self.access_key_secret
)
# 访问的域名
config.endpoint = self.endpoint
return Ecs20140526Client(config)
def create_sec_policy(self,
source_ip: str,
description: str,
security_group_id: str,
) -> None:
permissions_0 = ecs_20140526_models.AuthorizeSecurityGroupRequestPermissions(
policy='accept',
priority='1',
port_range='-1/-1',
ip_protocol='ALL',
source_cidr_ip=source_ip,
description=description
)
authorize_security_group_request = ecs_20140526_models.AuthorizeSecurityGroupRequest(
region_id=self.region_id,
security_group_id=security_group_id,
permissions=[
permissions_0
]
)
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印 API 的返回值
resp = self.client.authorize_security_group_with_options(authorize_security_group_request, runtime)
print(resp)
except Exception as error:
# 如有需要,请打印 error
resp = UtilClient.assert_as_string(error.message)
print(resp)
def get_sec_group_ip(self,
security_group_id: str,
description: str,
) -> List:
describe_security_group_attribute_request = ecs_20140526_models.DescribeSecurityGroupAttributeRequest(
security_group_id=security_group_id,
region_id=self.region_id,
direction='ingress'
)
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印 API 的返回值
resp = self.client.describe_security_group_attribute_with_options(describe_security_group_attribute_request,
runtime).to_map()
policy_lists = resp['body']['Permissions']['Permission']
# ip_lists = [ x['SourceCidrIp'] for x in policy_lists ]
ip_lists = []
nowDate = datetime.datetime.utcnow()
for i in policy_lists:
create_time = datetime.datetime.strptime(i['CreateTime'], "%Y-%m-%dT%H:%M:%SZ")
# 删除48小时前策略
if i['Description'] == description and create_time < nowDate - datetime.timedelta(hours=48):
ip_lists.append(i['SourceCidrIp'])
# print(ip_lists)
# print(policy_lists)
return ip_lists
except Exception as error:
# 如有需要,请打印 error
UtilClient.assert_as_string(error.message)
def del_sec_group_ip(self,
ip_lists: list,
security_group_id: str,
description: str,
) -> None:
if not ip_lists:
print("not found ip!")
return 0
policy_lists = [
ecs_20140526_models.RevokeSecurityGroupRequestPermissions(
policy='accept',
priority='1',
ip_protocol='ALL',
port_range='-1/-1',
source_cidr_ip=x,
description=description
) for x in ip_lists
]
revoke_security_group_request = ecs_20140526_models.RevokeSecurityGroupRequest(
region_id=self.region_id,
security_group_id=security_group_id,
permissions=policy_lists
)
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印 API 的返回值
resp = self.client.revoke_security_group_with_options(revoke_security_group_request, runtime)
print(resp)
except Exception as error:
# 如有需要,请打印 error
UtilClient.assert_as_string(error.message)
@staticmethod
def get_client_public_ip():
# ip138.com中使用iframe,这里先获得iframe中的src
# 每年iframe中的地址会变,比如 2019.ip138.com 2022.ip138.com
headers = ("User-Agent",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36")
opener = urllib.request.build_opener()
opener.addheaders = [headers]
data = opener.open("http://ip138.com")
doc = pq(data.read())
# 获得 iframe 标签的 src 属性的值
# 获得出来大概是这样 "//2022.ip138.com/"
# 再去掉两头多余的 "/" 就获得到实际的显示地址了
url = "http://" + doc('iframe').eq(0).attr('src').replace('/', '')
# print(url)
opener.close()
# 获取ip地址
opener = urllib.request.build_opener()
opener.addheaders = [headers]
data = opener.open(url)
doc = pq(data.read().decode('utf8'))
# 取得素有的 <a> 元素
lista = doc('body p a')
# 取得第一个<a> 元素
firstaddr = lista.eq(0).text()
# print(firsta)
return firstaddr
if __name__ == '__main__':
region_id = 'cn-guangzhou'
endpoint = f'ecs.cn-guangzhou.aliyuncs.com'
access_key_id = 'xxxx'
access_key_secret = 'xxxxx'
security_group_id = 'sg-xxxxx'
description_by_create = 'auto create by generator'
# 生成对象
auto_obj = AutoModSecPolicy(region_id,endpoint, access_key_id, access_key_secret)
# openapi有接口限流,限制下
time.sleep(1)
ip_lists = auto_obj.get_sec_group_ip(security_group_id=security_group_id, description=description_by_create)
print('Cleanning IP:',ip_lists)
time.sleep(1)
auto_obj.del_sec_group_ip(ip_lists=ip_lists, security_group_id=security_group_id,
description=description_by_create)
source_ip = auto_obj.get_client_public_ip()
print("Adding IP",source_ip)
auto_obj.create_sec_policy(source_ip=source_ip, security_group_id=security_group_id,
description=description_by_create)
为方便运行,也可以制作二进制执行文件,让用户用自己机器编译,避免平台或python版本差异导致异常
# 安装依赖
(venv) xadocker@xadocker-virtual-machine:~/PycharmProjects/untitled$
pip3.8 install pyinstaller
# 创建二进制文件
(venv) xadocker@xadocker-virtual-machine:~/PycharmProjects/untitled$
pyinstaller -F auto_modify_sercurity_group.py
# 制作完成后路劲
(venv) xadocker@xadocker-virtual-machine:~/PycharmProjects/untitled$ ll
total 36
drwxrwxr-x 6 xadocker xadocker 4096 7月 28 14:24 ./
drwxrwxr-x 3 xadocker xadocker 4096 7月 28 10:48 ../
-rw------- 1 xadocker xadocker 7608 7月 28 14:19 auto_modify_sercurity_group.py
-rw-rw-r-- 1 xadocker xadocker 854 7月 28 14:24 auto_modify_sercurity_group.spec
drwxrwxr-x 3 xadocker xadocker 4096 7月 28 14:24 build/
drwxrwxr-x 2 xadocker xadocker 4096 7月 28 14:25 dist/
drwxrwxr-x 3 xadocker xadocker 4096 7月 28 14:19 .idea/
drwxrwxr-x 6 xadocker xadocker 4096 7月 28 10:48 venv/
(venv) xadocker@xadocker-virtual-machine:~/PycharmProjects/untitled$ ll dist/
total 20124
drwxrwxr-x 2 xadocker xadocker 4096 10月 28 14:25 ./
drwxrwxr-x 6 xadocker xadocker 4096 10月 28 14:24 ../
-rwxr-xr-x 1 xadocker xadocker 20595280 10月 28 14:25 auto_modify_sercurity_group*
# 后面自己创个桌面快捷方式就可以了,略...
正文完