使用Django/Flask返回图像,相对简单很多,但掌握使用Python CGI也是有用的。 以生成一个随机二维码为范例。

使用Python2.7,请直接参考:http://lost-theory.org/python/dynamicimg.html  

Python >= 3.4

Step 1 生成一个随机二维码,并保存到文件。

qrcode 使用PIL或Pillow生成二维码。

  • QRCode.make_image() 创建的Image对象可等同Pillow时Image.new()创建的对象。
#!/usr/bin/env python3
# Imports
import random
import qrcode

qr = qrcode.QRCode(error_correction=qrcode.constants.ERROR_CORRECT_H, border=1)
qr.add_data('INT:%d' % random.randint(4000, 4999))
qr.make(fit=True)
img = qr.make_image(fill_color="white", back_color="black")

img.save("image.PNG", format='PNG')

将文件保存到任意目录并运行,每次执行会产生不同的二维码。  

Step 2 修改为CGI脚本

已经创建了动态的二维码,现在希望每次浏览器刷新都显示不同的图像,但不希望产生在服务器端的文件写入操作,。

  • 需要使用file-like对象代替文件,在Python2.7中使用StringIO.StringIO,在Python 3.4以上版本中,使用io.BytesIO处理二进制文件,使用io.StringIO处理文本。
    • 使用getvalue()获取完整的缓冲区二进制字节。
    • 使用close()关闭对象并丢弃内存缓冲区
  • 参考HTTP/1.1协议
    • 返回图像,Header至少需要Content-typeContent-length字段。
    • header和body之间需要有个空行。
  • 使用sys.stdout.flush 强制先输出header再输出body。
  • 文本使用sys.stdout.writeprint,二进制则需使用sys.stdout.buffer.write 。
#!/usr/bin/env python3
# Imports
import sys
import random
from io import BytesIO
import qrcode

qr = qrcode.QRCode(error_correction=qrcode.constants.ERROR_CORRECT_H, border=1)
qr.add_data('INT:%d' % random.randint(4000, 4999))
qr.make(fit=True)
img = qr.make_image(fill_color="white", back_color="black")

# img.save("image.PNG", format='PNG')
f = BytesIO()
img.save(f, format='PNG')
data = f.getvalue()
f.close()

l = len(data)
sys.stdout.write("Content-type: image/png\n")
sys.stdout.write("Content-length: %d\n\n" % l)
sys.stdout.flush()
sys.stdout.buffer.write(data)

为什么要避免服务器端的文件写入

  • 安全性的要求:执行cgi的用户往往只有极小权限。
  • 性能的要求:IO性能是大多数VPS的性能瓶颈。

a = [1, 2, 3, 4, 4, 5]
b = [3, 4]

How to get result:

a - b = [1, 2, 4, 5]

incorrect 1:

list(set(a) - set(b))
[1, 2, 5]

incorrect 2:

[i for i in a if i not in b]
[1, 2, 5]

correct:

def list_sub(list1, list2):
    for i in list1:
        if i in list2:
        list1.remove(i)
        list2.remove(i)
        return list1

GAE的数据库额度存在3个关键:

  1. Small Op目前是免费的,keys_only=True可以随便用。
  2. get()和get_multi()查询会被自动memcache。
  3. indexed会倍增write Op

NOTE1:提取单条数据,使用get_by_key_name(),而不是fetch(1) / first()

user = User.query(User.username = "tom").first()

替换为

user = User.get_by_key_name("tom")

原方法会消耗1 Fetch Op + 1 Query Op = 2 read Op,修改后,会产生1 Small Op + 1 Read Op,而且这个Read Op会被自动cache。

NOTE2:提取多条数据时,使用keys_only + get_multi()

比如一个表有,我想一次取出N条数据时,常规ORM的写法:

feeds = Feed.query().fetch(N)

每次查询,都会消耗1+N Read Op,为了优化额度,可以修改成:

q = Feed.query()
feeds = ndb.get_multi(q.fetch(N,keys_only=True))

首次查询,消耗1 Small Op + N Read Op,但是在重复查询是,则只消耗1 Small Op + m*N Read Op,m是memcache未命中的概率,理想情况是0。

至于性能,可以参看这里,大概75%缓存命中是性能的分界线。

Memcache hit ratio: 100% (everything was in cache)

  Query for entities:              3755 ms
  Query/memcache/ndb:              3239 ms
    Keys-only query:       834 ms
    Memcache.get_multi:   2387 ms
    ndb.get_mutli:           0 ms

Memcache hit ratio: 75%

  Query for entities:              3847 ms
  Query/memcache/ndb:              3928 ms
    Keys-only query:       859 ms
    Memcache.get_multi:   1564 ms
    ndb.get_mutli:        1491 ms

Memcache hit ratio: 50%

  Query for entities:              3507 ms
  Query/memcache/ndb:              5170 ms
    Keys-only query:       825 ms
    Memcache.get_multi:   1061 ms
    ndb.get_mutli:        3168 ms

Memcache hit ratio: 25%

  Query for entities:              3799 ms
  Query/memcache/ndb:              6335 ms
    Keys-only query:       835 ms
    Memcache.get_multi:    486 ms
    ndb.get_mutli:        4875 ms

Memcache hit ratio: 0% (no memcache hits)

  Query for entities:              3828 ms
  Query/memcache/ndb:              8866 ms
    Keys-only query:       836 ms
    Memcache.get_multi:     13 ms
    ndb.get_mutli:        8012 ms</pre>

NOTE3:尽可能的禁用索引。

  • 为所有不需要的被query()和order()的字段,使用indexed=False

  • 当你插入一条数据时,每个索引字段都会产生write Op,特别当操作对象是ListProperty,会根据list的数量,倍数消耗写配额。

  • 对于一些查询,有些和实际逻辑需求相左,但能大幅节约Op的手段。。

对于下面这个EntryCollect对象

class EntryCollect(ndb.Model):
    published = ndb.DateTimeProperty()
    need_collect_word = ndb.BooleanProperty(default=True, indexed=False)
    key_word = ndb.StringProperty(repeated=True, indexed=False)

in(List)的查询:

keys = EntryCollect.query().order(-EntryCollect.published)
entrys = ndb.get_multi(keys.fetch(PER_PAGE*2, keys_only=True))
new_entry = []
for entry in entrys:
    if keyword.decode('utf-8') in entry.key_word:
        new_entry.append(entry)

list.IN(other_list)的查询:

keys = EntryCollect.query().order(-EntryCollect.published)
entrys = ndb.get_multi(keys.fetch(PER_PAGE*2, keys_only=True))
top_entry = []
for entry in entrys:
    if set(other_list).intersection(set(entry.key_word)):
        top_entry.append(entry)

Boolean的字段:

keys = EntryCollect.query().order(-EntryCollect.published)
entrys = ndb.get_multi(kesy.fetch(CONT*2, keys_only=True))
for entry in entrys:
    if entry.need_collect_word:
        # do something

NOTE4:projected()的利弊权衡

  • 使用projected()的字段,必须被indexed。
  • 使用projected()的查询,算一次small Op。

这里就有个权衡,如果read Op紧张,write Op富裕,那么就可以使用projected()。

NOTE5:使用Memcache

  • Memcache是免费的! Memcache是免费的! 这个必须说两遍,Query太贵了。
  • Query.get()会自动被缓存。
  • 将查询的参数作为key,取md5,查询的结果用json存储起来。
json_data = memcache.get('{}:XXXXXXX'.format(md5sum))
if json_data is None:
# do something....
json_data = json.dumps(data)
memcache.add('{}:Analyse'.format(md5sum), json_data, MEMCACHE_TIMEOUT)

TextProperty 和 StringProperty的区别

  • 在管理后台,你无法添加TextProperty的字段,StringProperty可以。
  • TextProperty无法生成索引,StringProperty可以。
  • StringProperty的最大长度是 1500 bytes。

conda是一个包管理工具。
anaconda是一个使用conda的python发行版本,包含大量科学计算包。
anaconda的优点:相比pip,很多科学计算包的安装更加简便。

安装conda

# 从官方下载miniconda3。
wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh

# 如果曾经安装过,那自然是先要删了。
sudo rm -rf /opt/miniconda3

# 安装到/opt/miniconda3。
sudo bash Miniconda3-latest-Linux-x86_64.sh -b -f -p /opt/miniconda3

# 更新下conda,这里可以看出来,conda将自身也当做package对待。
sudo /opt/miniconda3/bin/conda update conda

可以使用/opt/miniconda3/bin/conda info查询conda的配置

  • 当前用户创建的环境会保存到~/.conda/envs
  • /opt/miniconda3是只读的(read only)

在非root用户下使用conda

首先将conda添加到PATH,便于日常使用。

echo  'export PATH="/opt/miniconda3/bin:$PATH"' >> ~/.bashrc 

创建一个名为py36的环境,使用python 3.6.x(x为最新版本),并安装anaconda

/opt/miniconda3/bin/conda  create --name py36 python=3.6 anaconda

创建一个名为py27的环境,使用python 2.7.x(x为最新版本):

/opt/miniconda3/bin/conda  create --name py27 python=2.7

使用activate激活py36:

source activate py36

查询py36已安装的包:

conda list

查询当前的python版本

$ python -V
Python 3.6.1 :: Anaconda 4.4.0 (64-bit)

一个App拆分成多个App,是最简单的,倍翻利用app engine的方法,但涉及到应用间通信的问题。

在被请求的应用上实施访问控制

官方文档在此,介绍了被请求端,利用请求端urlfetch产生的headers中X-Appengine-Inbound-Appid字段来限制来源,特别注意:在请求端,需要在urlfetch请求过程中,使用follow_redirects = False

server:

import webapp2
ALLOWED_APP_IDS = ('cp-thre-pio', 'r-two-deetu')

class AssertIDHandler(webapp2.RequestHandler):
    def dispatch(self):
        app_id = self.request.headers.get('X-Appengine-Inbound-Appid', None)

        if app_id in ALLOWED_APP_IDS:
            # do something ...
        else:
            self.abort(403)</pre>

client:

result = urlfetch.fetch(url="http://*.appspot.com/xxx",
                        payload=form_data,
                        method=urlfetch.POST,
                        headers={'Content-Type': 'application/x-www-form-urlencoded'},
                        follow_redirects=False)

使用什么格式最效率?

我的测试结果如下。

marshal取胜,而且处理utf-8更舒服一些,但切记marshal不能用于两个不同版本的python之间序列化数据。

如果使用json,要随时注意编码:

form_fields = {
    "something": self.request.get("something", default_value="").encode("utf-8"),
}
form_data = urllib.urlencode(form_fields)
result = urlfetch.fetch(url=SOME_URL,
                        payload=form_data,
                        method=urlfetch.POST,
                        headers={'Content-Type': 'application/x-www-form-urlencoded'})
self.response.headers['Content-Type'] = 'application/json'
self.response.out.write(result.content)

为什么要用fping

1. 纯python的python3-ping需要改造。

http://stackoverflow.com/questions/8888880/python-icmp-ping-implementation-when-pinging-multiple-ips-from-threads

This is happening because of the nature of ICMP. ICMP has no concept of ports, so all ICMP messages are received by all listeners.

The usual way to disambiguate is to set a unique identifier in the ICMP ECHO REQUEST payload, and look for it in the response. This code appears to do that, but it uses the current process id to compose the ID. Since this is multithreaded code, they will share a process id and all listeners in the current process will think all ECHO REPLYs are ones they themselves sent!

这是因为ICMP的性质不同于TCP/UDP。 ICMP没有端口的概念,所有ICMP包被所有监听ICMP的程序接收。python3-ping使用进程id组成标识符,多线程将共享一个进程ID。
处理这个问题的的常用方法是ICMP ECHO REQUEST字段设置一个唯一标识符,这个标识符同时由线程ID和进程ID组成,并在响应中处理。

2. 相比多线程的python-ping,使用fping,并使用正则处理返回内容,更省资源。

这个是我万万没想到的。但实际测试确实如此,fping扫描100个c段,欲在相同时间内完成,使用基于current.futures的多线程完成,fping要节省80%的内存。

和C比,多线程下,Python真是太耗内存了...

3.更安全

因为扫描程序会在服务器上跑在一个受限用户下,相比赋予python3权限使用raw socket的权限,只赋予一个fping,更满足公司的安全制度。

CODE

def ping_subnet(subnet):
    p = subprocess.Popen(['/usr/bin/fping', '-g', subnet.cidr(), '-c1', '-q'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    output, error = p.communicate(timeout=120)

    fping_regex = re.compile('(?P<ip_address>^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*(?P<xmt>\d{1,2})\/(?P<rcv>\d{1,2})\/(?P<loss>\d{1,3})%.*', re.DOTALL)
    fping_result = {}

    for line in error.decode().splitlines():
        try:
            fping_result[fping_regex.search(line).groupdict()["ip_address"]] = int(fping_regex.search(line).groupdict()["loss"])
        except:
            pass

    pprint.pprint(fping_result)
    with transaction.atomic():
        for host in subnet.hosts.all():
            host.ping_latest_time = timezone.datetime.now()
            if fping_result.get(host.ip_address, 1) == 0:
                host.ping_last_success_time = timezone.datetime.now()
            host.save()
        subnet.ping_latest_time = timezone.datetime.now()
        subnet.save()
    return subnet

环境

Debian “testing” Vultr

ChangeLog

# 2017.04
* 升级到debian 9+ (stretch/testing)

# 2017.06
* squid 不再劫持流量,使用sstunel转发。
+ 使用gfwlist生成proxy.pac
+ 使用privoxy将socks5转化成http 

通过中转端口加速连接远端服务器,实现加速网络访问的的效果。

在Linux平台上,通过iptables可以简单实现,在Windows平台,其实通过netsh也很简单,只需要:

netsh interface portproxy reset
netsh interface portproxy add v4tov4 listenport=[本地端口] listenaddress=0.0.0.0 connectport=[远程端口] connectaddress=[远程地址]

参考:https://technet.microsoft.com/en-us/library/cc731068

缘起

毕竟不是程序员,写代码不为赚钱,都是带着自己的目的性,笔记什么的,似乎也就没记过。 其实第一次接触GAE是2010年,也是最早接触python的时候,做了一个简单的随机头像的应用,那时候GAE还没被墙,大家还在探索GAE有什么奇奇怪怪的用途。 借助GAE,完成了python的自学和启蒙,感谢GAE。 但后来,goagent被发扬光大,GAE被墙,就接着几年没碰过了。 直到最近,django做了一个股票新闻抓取并关键词提醒的app。但毕竟爱折腾,在折腾各种乱七八糟的过程中,vps瘫痪了,突然想起来,程序如果跑在GAE上,似乎可以杜绝折腾VPS引起的问题。但一切并不顺利,一倍时间django写的程序,用了2倍时间,才迁移到GAE。

NOTE1:免费的GAE配额,以及一些坑

  • CPU:600 Mhz, 内存:128 MB, 28个执行小时。
    • 占用过多的cpu和内存,会导致你的程序跑不满24小时就额度用尽。
    • 当CPU占用高时,GAE会再起新Instance来应对新的请求,多个Instance多倍扣CPU时间。
    • 当内存超限时,app可能直接被stop,不会死掉。 - 这对于jieba分词简直是灾难,在不修改代码的情况下,jieba初始化时就内存超限了,并且初始化词库需要高达12秒。
  • 5万次数据库读作业,5万次写作业。
    • 大坑!这里要特别注意索引,可能倍翻你的配额用量。
  • 不支持Python3:处理中文,要花费大量时间在utf-8上。
  • 不支持tempfile:很多库需要改造。
  • 免费配额不支持socket:无法使用外部数据库。收费配额皆不可socket listen。

相比之下,其他的配额对应用的影响微不足道:Memcache是免费的。UrlFetch除非抓来的数据不做任何处理,Mail除非用来滥发邮件。

NOTE2:节省各种配额

  • 在一个Instance内,不管cpu占有率高低,cpu time都一样计费。 - 删除数据库也占用write Op,没用的资源尽早删除。 - 绞尽脑汁优化内存和cpu使用。

NOTE3:节省CPU配额:使用asynchronous urlfetch

为节约网络延迟而浪费的cputime,使用异步urlfetch就十分重要。 官方手册在这里,例如:在抓取多个feed时:

q = Feed.query()
results = ndb.get_multi(q.fetch(keys_only=True))

rpcs = []
for f in results:
    rpc = urlfetch.create_rpc()
    urlfetch.make_fetch_call(rpc, f.url)
    rpcs.append(rpc)

for rpc in rpcs:
    rpc.wait()
    result = rpc.get_result()
    d = feedparser.parse(result.content)
    for e in d['entries']:
        # do something....

NOTE4:节省CPU配额:需要初始化的资源,在本地进行序列化,GAE上直接读取序列化的资源。

以jieba词库为例:默认情况,jieba每次初始化,都会将本地词库dict.txt进行readline操作,生成字典,这个过程在GAE默认的CPU上需要将近6秒。而在本地,先将这个字典使用marshal.dump,在GAE中在load,初始化阶段则只消耗1.x秒。

try:
    with open(cache_file, 'rb') as cf:
        object_a, object_b = marshal.load(cf)
except :
    for line in open(dict, 'rb').read().decode('utf-8').splitlines():
        # do something....
with open(cache_file, 'wb') as cf:
    marshal.dump((object_a, object_b), cf)

NOTE5:节省CPU配额:减少服务器端的请求

self.response.headers['Cache-Control'] = 'public, max-age:300'
self.response.headers['Pragma'] = 'Public'

NOTE6:资源优化:删掉过时的数据

节约数据库存储空间最简单的方法,就是删掉过时的数据,而对于ndb,不存在Object.query().del() 这样的方法,需要使用:

earliest = datetime.datetime.now() - datetime.timedelta(days=10)
keys = EntryCollect.query(EntryCollect.published <= earliest).fetch(keys_only=True)
ndb.delete_multi(keys)

NOTE6:使用robots.txt 减少搜索引擎对app的负载

不失为一个办法,一个个位数pv的app,被bot拖到配额超限真的好23333...

NOTE7:自定义webapp2模板的tags

和django不同,webapp2稍微麻烦一点,这里以新建一个在模板里使用时区的tags为例。

首先,新建一个目录common,新建一个文件common\customtags.py

from datetime import timedelta
from google.appengine.ext.webapp import template
register = template.create_template_register()

def timezone(value, offset):
    return value + timedelta(hours=offset)

register.filter(timezone)

在views.py顶端加入

from google.appengine.ext.webapp import template
template.register_template_library('common.customtags')

最后,在模板内可以直接使用,不需要 load xxxxx published|timezone:8|date:"H:i"

NOTE8:使用代理服务器部署代码

set HTTP_PROXY=http://127.0.0.1:1080
set HTTPs_PROXY=http://127.0.0.1:1080
c:\Python27\python.exe x:\google_appengine\appcfg.py update x:\ProjectDir 

待编辑...

心里只有一万个WTF,一万个草泥马。 GAE的Datastore收费方法简直令人恶心,一个App的开发过程中,想着怎么优化write/read Op,浪费的时间和精力,你定可以转换为很多创新点子...特别是对于一个不存在code review的个人App来说。 如果app自用且足够小,放到GAE上追求个稳定,还是可以接受的,但也仅限如此了...