Python 3.7 - 检测Linux系统上指定端口是否被占用

2023-06-15  本文已影响0人  日垒一砖

检测端口占用

import socket
import sys

# 定义待检测的端口
reserved_ports = [8080, 9090, "30000-32767"]

# 将声明的待检测端口 展开并转换为整数
def convert_ports(ports):
    list_int_port = []
    for port in ports:
        if type(port) == int:
            list_int_port.append(port)
        elif type(port) == str:
            if '-' in port:
                start, end = map(int, port.split('-'))
                for p in range(start, end+1):
                    list_int_port.append(p)
            else:
                list_int_port.append(int(port))
    return list_int_port


# function 1: 检测端口处于 LISTEN 状态
def is_port_listening(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        s.bind(('0.0.0.0', port))
    except socket.error as e:
        if e.errno == 98:
            # 端口已被占用
            return True
        else:
            return False
    s.close()
    return False

# function 2: 检测端口是否可以建联
def is_port_connect(port):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        try: 
            s.settimeout(0.2)
            s.connect(('127.0.0.1', port))
        except Exception:
            # 建联失败,端口未被占用
            return False
        s.close()
        return True

# 检测所有待检测端口的状态
def check_ports(ports):
    results = {}
    for port in ports:
        results[port] = is_port_listening(port)
    return results

if __name__ == '__main__':
    ports = convert_ports(reserved_ports)
    results = check_ports(ports)

    opend_ports = []
    closed_ports = []
    for port, status in results.items():
        if status:
            opend_ports.append(port)
        else:
            closed_ports.append(port)
    if len(opend_ports) > 0:
        print(f'Some reserved ports have already been opened({len(opend_ports)}): {opend_ports}')

Q&A

Q: connect的方式在k8s NodePort场景下存在建联不响应的情况(k8s网络插件自动创建iptables规则-ipvs模式下,drop 127.0.0.1 + NodePort的访问请求)
A: 由connect改为bind,端口绑定成功则说明未被占用,否则端口已被占用

Q: 使用0.0.0.0 bind端口,如果端口被127.0.0.1 connect(非listen)使用,bind依然失败
A: 由bind改为connect,并添加超时失败s.settimeout(0.2)

AI-辅助写代码

检测端口连通性

端口监听

import socket
import select

# List of ports to listen on
ports = [8000, 10000, 12000]

def handle_client(connection, address):
    try:
        print(f"New connection from {address[0]}:{address[1]}")
        while True:
            data = connection.recv(1024)
            if not data:
                break
            print(f"Received data from {address[0]}:{address[1]}: {data.decode()}")

    except socket.error as e:
        print(f"Error: {e}")

    finally:
        connection.close()
        print(f"Connection closed with {address[0]}:{address[1]}")


if __name__ == '__main__':
    # Create server sockets for each port
    server_sockets = []
    for port in ports:
        try:
            server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            server_socket.bind(('0.0.0.0', port))
            server_socket.listen(5)
            server_sockets.append(server_socket)
            print(f"Listening on port {port}...")
        except socket.error as e:
            print(f"Failed to listen on port {port}...")
            print(f"Error: {e}")

    while True:
        # Use select to monitor sockets for incoming connections
        readable, _, _ = select.select(server_sockets, [], [])

        for sock in readable:
            connection, address = sock.accept()
            handle_client(connection, address)

端口建联

import socket
import sys


# List of ports to listen on
ports = [8000, 10000, 12000]

# 检测端口是否处于 LISTEN 状态
def is_port_listening(ip, port):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        try: 
            s.settimeout(0.2)
            s.connect((ip, port))
        except Exception:
            # 建联失败,端口无法建联
            return False
        s.close()
        return True

# 检测到所有待检测端口的建联状态
def check_ports(ip, ports):
    results = {}
    for port in ports:
        results[port] = is_port_listening(ip, port)
    return results

#  get the Host IP address
def get_host_ip():
    try:
        # Get the hostname
        hostname = socket.gethostname()
        # Get the IP address
        ip = socket.gethostbyname(hostname)
        return ip
    except socket.error as e:
        print(f"Error: {e}")
        return None

if __name__ == '__main__':
    local_ip = get_host_ip()
    try:
        remote_ip  = sys.argv[1]
    except IndexError:
        sys.stdout.write(f"No arg: remote host ip")
        sys.exit(1)

    results = check_ports(remote_ip, ports)
    connected_ports = []
    refused_ports = []
    for port, status in results.items():
        if status:
            connected_ports.append(port)
        else:
            refused_ports.append(port)
    if len(refused_ports) > 0:
        sys.stdout.write(f'Cannot connect to some ports({refused_ports}). From {local_ip} To {remote_ip}')
        sys.exit(1)
    sys.stdout.write(f"Pass the ports connection check! ports({connected_ports}) From {local_ip} To {remote_ip}\n")

FAQ

Failed to listen on port 443, Permission denied

1024以下的端口为特权端口,需使用root账号启动监听进程

AI - 辅助写代码

上一篇 下一篇

猜你喜欢

热点阅读