Python的一道题以及我的代码,求大神看看有什么可以改进的地方


题目如下:
spam系统,假设我们可以获得线上的实时请求(按时间顺序)
每个请求包含如下信息:
时间(unix时间戳)
用户名
动作(提问、回答、评论)
内容

依次考虑如何解决以下问题:
1.当发现动作频率较高的用户时,输出报警(1分钟内超过10次评论、回答、提问)
2.当发现一个用户连续发重复的内容时,输出报警(连续发了10个相同的评论、回答、提问)
3.使用你觉得最优美的方法将上面的策略与程序分离开。使得上面的策略可以随时更新而不用修改程序。

要求:
服务监听一个端口,使用测试程序模拟用户行为不断向服务发送请求,
请求格式为json,如:
{"time":1323333"user":"xxx","action":"answer","content":"abcde"}

服务输出报警格式如下
xxx,"频繁提问"

一下是我的代码:
client.py

import socket
import json
import time
import gevent
import random

def generateContents():
    """
    随机生成内容
    """
    chars = [chr(c) for c in range(ord('a'), ord('z')+1)]
    result = [''.join(random.sample(chars, 5)) for i in range(3)]
    return result

class Worker():
    def __init__(self, name, socket):
        self.name = name
        self.socket = socket
        self.actions = ['question', 'answer', 'comment']
        self.contents = generateContents()

    def __call__(self):
        while 1:
            data = self._generateData()
            data = json.dumps(data)
            self._request(data)

    def _request(self, data):
        s = self.socket.socket()
        socket = self.socket.gethostname()
        port = 1234
        s.connect((socket, port))
        s.recv(1024)
        s.send(data)
        s.close()
        gevent.sleep(random.randint(1, 3) * 0.5)

    def _generateData(self):
        data = {}
        data['time'] = time.time()
        data['user'] = self.name
        data['action'] = random.choice(self.actions)
        data['content'] = random.choice(self.contents)
        return data


if __name__ == '__main__':
    threads = [gevent.spawn(Worker(chr(i), socket))
               for i in range(ord('a'), ord('f'))]
    gevent.joinall(threads)

server.py

import socket
import json
from functools import wraps
from MyExamine import MyExamine

def run():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    host = ''
    port = 1234
    s.bind((host, port))
    s.listen(10)
    while 1:
        c, addr = s.accept()
        c.send('This is a simple server'.encode('utf-8'))
        rec = c.recv(1024)
        do_something(rec)
        c.close()

def format2Json(func):
    def f(*args):
        data = json.loads(args[0])
        func(data)
    return f

@format2Json
@MyExamine
def do_something(data):
    """
    模拟后台处理用户发来的请求
    """
    print data

if __name__ == '__main__':
    run()

MyExamine.py

from baseExamine import BaseExamine
from collections import deque

class MyExamine(BaseExamine):

    def __init__(self, func):
        self.rateCache = {}
        self.timesCache = {}
        self.func = func

    def examineRate(self, data):
        userQueue = self.rateCache.setdefault(data['user'], deque())
        while userQueue:
            item = userQueue.pop()
            if data['time'] - item['time'] < 60:
                userQueue.append(item)
                break
        userQueue.appendleft(data)
        if len(userQueue) > 10:
            self.showWarning('User: %s 频繁操作'.decode('utf8') % data['user'])

    def examineContentTimes(self, data):
        userActions = self.timesCache.setdefault(data['user'], {})
        contentTimes = self.timesCache.setdefault(data['action'], {})
        key = data['content']
        contentTimes[key] = contentTimes.get(key, 0) + 1
        if contentTimes[key] > 10:
            self.showWarning('User: %s %s %s 超过10次'.decode('utf8') %
                             (data['user'], data['action'], key))

    def showWarning(self, msg):
        print '\033[;31m' + msg + '\033[0m'

    def __call__(self, *args):
        data = args[0]
        self.examineRate(data)
        self.examineContentTimes(data)
        self.func(*args)

baseExamine.py

class BaseExamine():
    """
    模板方法的基类
    """
    def __init__(self, func):
        pass

    def __call__(self, *args):
        pass

求大神指点,谢谢!

python

幻月D静马 10 years, 9 months ago

这个没有最佳解法吧,看个人理解……

下面是我的理解:

1. 不同的策略分为不同的类,提供一个统一的接口,比如

#strategy.py
class UserRate(object):
  def __init__(self, comment_per_user_min=10):
    #init

  def check(self, userdata):
    #检查用户数据,超过限制即报警

class UserDupContent(object):
  def __init__(self, content_send_per_user=10):
    #init

  def check(self, userdata):
    #检查用户数据

2. 使用依赖注入将策略注入到检查程序:

class Guarder(object):
  def addStrategy(self, strategy):
    #添加一个策略

  def check(self, userdata):
    #使用已经添加的策略逐个检查
    #返回检查结果

  def reload_from(self, conf):
    #解析配置并创建相应对象
    self.addStrategy(strategy)  

  @classmethod
  def create(cls, conf=''):
    obj = cls()
    if conf: 
      obj.reload_from(conf)

3. 调用Guarder实例检查

guarder=Guarder.create('antispam.ini')

def index():
  if guarder.check(userdata):
    pass
  else:
    #error

def admin_reload_guarder():
  '''根据web请求运行时重载配置'''
  import strategy
  reload(strategy)
  guarder.reload(conf)

示例配置文件:

#antispam.ini
[strategies]
inst=usercontent,userrate

[usercontent]
type=strategy.UserDupContent
init.comment_per_user_min=5

[userrate]
type=strategy.UserRate
init.content_send_per_user=5

以上能够完成的功能如下:
1. 隔离了策略代码,使用依赖注入的方式完成
2. 策略本身是duck typing,灵活扩充
3. 策略代码文件strategy.py不停止服务器热部署

当然,配置文件可以调整格式,直接用python代码写都可以

WINKW answered 10 years, 9 months ago

Your Answer