查看: 90|回复: 0

Python抓取高德地图API(一)获取地理空间数据

[复制链接]

1

主题

2

帖子

3

积分

新手上路

Rank: 1

积分
3
发表于 2022-12-10 20:47:33 | 显示全部楼层 |阅读模式
时隔三月,欢迎自己回归,没更新文章主要是最近一个Q加班很多,距离23年还有不到50天,接下来会逐步将落下的找补回来,顺便问一句亲爱的读者朋友们,年初计划完成的怎么样了?
日常工作中,也有用到中国地理空间数据做维表去做关联分析的case,本文分享从0-1抓取高德地图数据,并将数据存储到Mysql数据库。
你可以从这里进入高德地图API
[高德地图API](https://lbs.amap.com/api/webservice/guide/api/district)
开始之前,我们要先去高德地图的官网申请自己用的 key。有了官方提供的钥匙,才能开启我们的调接口之旅。如何申请自己的key 我把申请key附上,有需要自取。
[申请key](https://zhuanlan.zhihu.com/p/84677243)


除了获取行政区划,还有很多实用的接口,比如

  • 地址和经纬度互转
  • 搜索POI(比如我在杭州,我想搜索所有的奶茶店)
  • 天气查询
  • 路径规划
接下来介绍如何通过python获取行政区划数据:
-- mysql建表语句

-- mysql建表语句
DROP TABLE IF EXISTS `districtsx`;
CREATE TABLE `districtsx`  (
  `districtId` int(11) NOT NULL AUTO_INCREMENT,
  `districtPid` int(11) NULL DEFAULT NULL COMMENT '上级ID',
  `pname` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '上级名称_省',
  `cityname` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '上级名称_市',
  `districtname` varchar(60) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '上级名称_区县',
  `name` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '行政区名称',
  `citycode` varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '城市编码',
  `adcode` varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '城市区域编码',
  `lng` float(13, 10) NULL DEFAULT NULL COMMENT '经度',
  `lat` float(13, 10) NULL DEFAULT NULL COMMENT '纬度',
  `level` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '行政区划级别',
  `createTime` timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP(0),
  `updateTime` timestamp(0) NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP(0),
  PRIMARY KEY (`districtId`) USING BTREE,
  INDEX `districtsx_idx1`(`name`) USING BTREE,
  INDEX `districtsx_idx2`(`districtId`) USING BTREE,
  INDEX `districtsx_idx3`(`cityname`) USING BTREE,
  INDEX `districtsx_idx4`(`districtname`) USING BTREE,
  INDEX `districtsx_idx5`(`districtPid`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 44216 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;



-- 抓高德API需要的包
import json
import pymysql
import requests

-- 落阿里云Mysql需要的包
import zipfile
import pymysql
from sqlalchemy import create_engine
from sqlalchemy.types import String, Float, Integer,DateTime,Date,Text
import pickle

mydb  = pymysql.connect(host = '127.0.0.1',
    user = 'root',
    password="123456",
    database='test1',
    port=3306,
    charset='utf8')
engine_remote = create_engine('mysql+pymysql://root:123456@localhost:3306/python_db')

mycursor = mydb.cursor()

-- 先清空数据再上传
mycursor.execute("truncate table districtsx")
mycursor.execute("alter table districtsx auto_increment = 1")

insert_sql = "INSERT INTO `districtsx` (`districtpid`,`pname`,`cityname`, `districtname`, `name`, `adcode`, `lng`, `lat`, `level`) VALUES  (%s, %s, %s, %s, %s, %s, %s, %s, %s);"
insert_city_sql = "INSERT INTO `districtsx` ( `districtpid`,`pname`,`cityname`, `districtname`, `name`, `citycode`,`adcode`, `lng`, `lat`, `level`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"

select_sql = 'select districtId from districtsx where adcode = %s'

def updatedb(sql, *args):
    try:
        mycursor.execute(sql, args)
        mydb.commit()
    except Exception as e:
        print(e)

def selectdb(sql, *args):
    mycursor.execute(sql, args)
    return mycursor

--抓数据,注意将code_url :key=替换为自己申请的key

updatedb(insert_sql, 0, '中华人民共和国', '中国', '中国', '中国', '100000', '116.3683244', '39.915085', 'country')
provinces = ['北京市', '天津市', '河北省', '山西省', '内蒙古自治区', '辽宁省', '吉林省', '黑龙江省', '上海市', '江苏省', '浙江省', '安徽省', '福建省', '江西省',
             '山东省', '河南省', '湖北省', '湖南省', '广东省', '广西壮族自治区', '海南省', '重庆市', '四川省', '贵州省', '云南省', '西藏自治区', '陕西省', '甘肃省',
             '青海省', '宁夏回族自治区', '新疆维吾尔自治区', '台湾省', '香港特别行政区', '澳门特别行政区']
for i in provinces:
    code_url = 'https://restapi.amap.com/v3/config/district?key=你的key&keywords={}&subdistrict=3&extensions=base'.format(i)
    res = requests.get(code_url, headers=header)
    print(code_url)
    print(res.text)
    province = json.loads(res.text)['districts']
    adcode = province[0]['adcode']
    pname = province[0]['name']
    center = province[0]['center']
    pcitycode = province[0]['citycode']
    level = province[0]['level']
    lng = province[0]['center'].split(',')[0]
    lat = province[0]['center'].split(',')[1]
    city_list = province[0]['districts']

    print(pname, pcitycode, adcode, lng, lat, level)
    updatedb(insert_sql, 1, pname, pname, pname, pname, adcode, lng, lat, level)
    districtpid = selectdb(select_sql, adcode).fetchone()[0]
    # print(city_list)
    for city in city_list:
        citycode = city['citycode']
        adcode = city['adcode']
        name_c = city['name']
        level = city['level']
        lng = city['center'].split(',')[0]
        lat = city['center'].split(',')[1]
        district_list = city['districts']
        # print(name, citycode, adcode, lng, lat, level)
        updatedb(insert_city_sql, districtpid, pname, name_c, name_c, name_c, citycode, adcode, lng, lat, level)
        citypid = selectdb(select_sql, adcode).fetchone()[0]
        for district in district_list:
            citycode = district['citycode']
            adcode = district['adcode']
            name_d = district['name']
            level = district['level']
            lng = district['center'].split(',')[0]
            lat = district['center'].split(',')[1]
            street_list = district['districts']
            if level in ['district']:
                updatedb(insert_city_sql, citypid, pname, name_c, name_d, name_d, citycode, adcode, lng, lat, level)
                print(pname, name_c, name_d, name_d, citycode, adcode, lng, lat, level)
                streetpid = selectdb(select_sql, adcode).fetchone()[0]
                for street in street_list:
                    citycode = street['citycode']
                    adcode = street['adcode']
                    name_s = street['name']
                    level = street['level']
                    lng = street['center'].split(',')[0]
                    lat = street['center'].split(',')[1]
                    # street_list = street['districts']
                    if level in ['street']:
                        updatedb(insert_city_sql, streetpid, pname, name_c, name_d, name_s, citycode, adcode, lng, lat, level)
                        print(pname, name_c, name_d, name_s, citycode, adcode, lng, lat, level)
            elif level in ['street']:
                updatedb(insert_city_sql, citypid, pname, name_c, '', name_d, citycode, adcode, lng, lat, level)
                print(pname, name_c, name_d, name_d, citycode, adcode, lng, lat, level)
print('采集完成')


mycursor.execute("SELECT COUNT(1) FROM districtsx")
data = mycursor.fetchone()
print("生成记录数 : %s " % data)



最终得到的数据长这样:


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

快速回复 返回顶部 返回列表