123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import json
- import logging
- import socket
- import uuid
- from threading import Thread
- # 广播端口
- broadcast_port = 8888
- # 管理端口
- manager_port = 9999
- # 监听广播消息
- def listen_broadcast():
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
- s.bind(('', broadcast_port))
- logging.info('* Listening broadcast at %s' % str(s.getsockname()))
- while True:
- data, address = s.recvfrom(2048)
- # 接收到客户端广播消息
- client_info = json.loads(data.decode('utf-8'))
- logging.info('broadcast from %s | %s' % (address[0], client_info))
- # 将服务端配置信息发送给客户端
- server_info = json.dumps({
- 'version': '0.1',
- 'id': str(uuid.uuid1()),
- 'manager_port': manager_port
- })
- logging.info('send to %s | %s' % (address[0], server_info))
- s.sendto(server_info.encode('utf-8'), address)
- # 处理客户端消息
- def message_handle(client_conn, client_address):
- while True:
- try:
- msg_bytes = client_conn.recv(2048)
- msg_str = msg_bytes.decode(encoding='utf8')
- logging.info('recv from %s | %s' % (client_address[0], str(msg_str)))
- try:
- # msg_json = json.loads(msg_str)
- result = json.dumps({
- 'version': '0.1',
- 'id': str(uuid.uuid1()),
- # 消息类型: msg, file, command
- 'type': 'command',
- 'value': 'echo 111 >> /tmp/run.log'
- })
- client_conn.send(result.encode('utf-8'))
- except TypeError as te:
- result = json.dumps({
- 'msg': str(te)
- })
- logging.error(te)
- client_conn.send(result.encode('utf-8'))
- except ConnectionResetError as ce:
- logging.error(ce)
- break
- # noinspection DuplicatedCode
- def init_logging(log_filename):
- log_format = '%(asctime)s, %(filename)s - line %(lineno)-4d: %(levelname)-8s %(message)s'
- date_format = '%Y-%m-%d %H:%M:%S'
- # 打印到文件
- logging.basicConfig(filename=log_filename, level=logging.INFO, format=log_format, datefmt=date_format)
- formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
- # 同时输出到终端
- console = logging.StreamHandler()
- console.setLevel(logging.INFO)
- console.setFormatter(formatter)
- logging.getLogger('').addHandler(console)
- if __name__ == '__main__':
- init_logging('server.log')
- # 启动接收广播线程
- thread_broadcast = Thread(target=listen_broadcast)
- thread_broadcast.setDaemon(True)
- thread_broadcast.start()
- # 服务端监听
- try:
- socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- # 立即释放端口
- socket_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- socket_server.bind(('', manager_port))
- socket_server.listen(128)
- logging.info('* Server start at %s, waiting for client...' % str(socket_server.getsockname()))
- while True:
- # 为每个客户端启一个线程
- client, address = socket_server.accept()
- thread = Thread(target=message_handle, args=(client, address))
- thread.setDaemon(True)
- thread.start()
- except KeyboardInterrupt as ki:
- logging.warning('KeyboardInterrupt %s' % ki)
|