server.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import json
  4. import logging
  5. import socket
  6. import uuid
  7. from threading import Thread
  8. # 广播端口
  9. broadcast_port = 8888
  10. # 管理端口
  11. manager_port = 9999
  12. # 监听广播消息
  13. def listen_broadcast():
  14. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  15. s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
  16. s.bind(('', broadcast_port))
  17. logging.info('* Listening broadcast at %s' % str(s.getsockname()))
  18. while True:
  19. data, address = s.recvfrom(2048)
  20. # 接收到客户端广播消息
  21. client_info = json.loads(data.decode('utf-8'))
  22. logging.info('broadcast from %s | %s' % (address[0], client_info))
  23. # 将服务端配置信息发送给客户端
  24. server_info = json.dumps({
  25. 'version': '0.1',
  26. 'id': str(uuid.uuid1()),
  27. 'manager_port': manager_port
  28. })
  29. logging.info('send to %s | %s' % (address[0], server_info))
  30. s.sendto(server_info.encode('utf-8'), address)
  31. # 处理客户端消息
  32. def message_handle(client_conn, client_address):
  33. while True:
  34. try:
  35. msg_bytes = client_conn.recv(2048)
  36. msg_str = msg_bytes.decode(encoding='utf8')
  37. logging.info('recv from %s | %s' % (client_address[0], str(msg_str)))
  38. try:
  39. # msg_json = json.loads(msg_str)
  40. result = json.dumps({
  41. 'version': '0.1',
  42. 'id': str(uuid.uuid1()),
  43. # 消息类型: msg, file, command
  44. 'type': 'command',
  45. 'value': 'echo 111 >> /tmp/run.log'
  46. })
  47. client_conn.send(result.encode('utf-8'))
  48. except TypeError as te:
  49. result = json.dumps({
  50. 'msg': str(te)
  51. })
  52. logging.error(te)
  53. client_conn.send(result.encode('utf-8'))
  54. except ConnectionResetError as ce:
  55. logging.error(ce)
  56. break
  57. # noinspection DuplicatedCode
  58. def init_logging(log_filename):
  59. log_format = '%(asctime)s, %(filename)s - line %(lineno)-4d: %(levelname)-8s %(message)s'
  60. date_format = '%Y-%m-%d %H:%M:%S'
  61. # 打印到文件
  62. logging.basicConfig(filename=log_filename, level=logging.INFO, format=log_format, datefmt=date_format)
  63. formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
  64. # 同时输出到终端
  65. console = logging.StreamHandler()
  66. console.setLevel(logging.INFO)
  67. console.setFormatter(formatter)
  68. logging.getLogger('').addHandler(console)
  69. if __name__ == '__main__':
  70. init_logging('server.log')
  71. # 启动接收广播线程
  72. thread_broadcast = Thread(target=listen_broadcast)
  73. thread_broadcast.setDaemon(True)
  74. thread_broadcast.start()
  75. # 服务端监听
  76. try:
  77. socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  78. # 立即释放端口
  79. socket_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  80. socket_server.bind(('', manager_port))
  81. socket_server.listen(128)
  82. logging.info('* Server start at %s, waiting for client...' % str(socket_server.getsockname()))
  83. while True:
  84. # 为每个客户端启一个线程
  85. client, address = socket_server.accept()
  86. thread = Thread(target=message_handle, args=(client, address))
  87. thread.setDaemon(True)
  88. thread.start()
  89. except KeyboardInterrupt as ki:
  90. logging.warning('KeyboardInterrupt %s' % ki)