国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

python中pika模塊問題的深入探究

2020-01-04 14:23:07
字體:
來源:轉載
供稿:網(wǎng)友

前言

工作中經(jīng)常用到rabbitmq,而用的語言主要是python,所以也就經(jīng)常會用到python中的pika模塊,但是這個模塊的使用,也給我?guī)Я撕芏鄦栴},這里整理一下關于這個模塊我在使用過程的改變歷程已經(jīng)中間碰到一些問題的解決方法

關于MQ:

MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數(shù)據(jù))來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發(fā)送數(shù)據(jù)進行通信,而不是通過直接調用彼此來通信,直接調用通常是用于諸如遠程過程調用的技術。排隊指的是應用程序通過隊列來通信。隊列的使用除去了接收和發(fā)送應用程序同時執(zhí)行的要求。

剛開寫代碼的小菜鳥

在最開始使用這個rabbitmq的時候,因為本身業(yè)務需求,我的程序既需要從rabbitmq消費消息,也需要給rabbitmq發(fā)布消息,代碼的邏輯圖為如下:

python,pika模塊

下面是我的模擬代碼:

#! /usr/bin/env python3# .-*- coding:utf-8 .-*-import pikaimport timeimport threadingimport osimport jsonimport datetimefrom multiprocessing import Process# rabbitmq 配置信息MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice"}class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self, recv_serverid, send_serverid):  # self.serverid = MQ_CONFIG.get("serverid")  self.exchange = MQ_CONFIG.get("exchange")  self.channel = None  self.connection = None  self.recv_serverid = recv_serverid  self.send_serverid = send_serverid def reconnect(self):  if self.connection and not self.connection.is_closed():   self.connection.close()  credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd"))  parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"),            credentials)  self.connection = pika.BlockingConnection(parameters)  self.channel = self.connection.channel()  self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct")  result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True)  queue_name = result.method.queue  self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid)  self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) def consumer_callback(self, channel, method, properties, body):  """  消費消息  :param channel:  :param method:  :param properties:  :param body:  :return:  """  channel.basic_ack(delivery_tag=method.delivery_tag)  process_id = os.getpid()  print("current process id is {0} body is {1}".format(process_id, body)) def publish_message(self, to_serverid, message):  """  發(fā)布消息  :param to_serverid:  :param message:  :return:  """  message = dict_to_json(message)  self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) def run(self):  while True:   self.channel.start_consuming() @classmethod def get_instance(cls, *args, **kwargs):  """  單例模式  :return:  """  if not hasattr(cls, "_instance"):   with cls._instance_lock:    if not hasattr(cls, "_instance"):     cls._instance = cls(*args, **kwargs)  return cls._instancedef process1(recv_serverid, send_serverid): """ 用于測試同時訂閱和發(fā)布消息 :return: """ # 線程1 用于去 從rabbitmq消費消息 rabbitmq_server = RabbitMQServer.get_instance(recv_serverid, send_serverid) rabbitmq_server.reconnect() recv_threading = threading.Thread(target=rabbitmq_server.run) recv_threading.start() i = 1 while True:  # 主線程去發(fā)布消息  message = {"value": i}  rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message)  i += 1  time.sleep(0.01)class CJsonEncoder(json.JSONEncoder): def default(self, obj):  if isinstance(obj, datetime.datetime):   return obj.strftime('%Y-%m-%d %H:%M:%S')  elif isinstance(obj, datetime.date):   return obj.strftime("%Y-%m-%d")  else:   return json.JSONEncoder.default(self, obj)def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstrdef json_to_dict(jsonstr): if isinstance(jsonstr, bytes):  jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return dif __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 進程1 用于模擬模擬程序1  p = Process(target=process1, args=(recv_serverid, send_serverid, )) p.start()  # 主進程用于模擬程序2 process1(send_serverid, recv_serverid)

上面是我的將我的實際代碼更改的測試模塊,其實就是模擬實際業(yè)務中,我的rabbitmq模塊既有訂閱消息,又有發(fā)布消息的時候,同時,訂閱消息和發(fā)布消息用的同一個rabbitmq連接的同一個channel

但是這段代碼運行之后基本沒有運行多久就會看到如下錯誤信息:

Traceback (most recent call last): File "/app/python3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/app/python3/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/app/py_code//udce5/udc85/udcb3/udce4/udcba/udc8erabbitmq/udce9/udc97/udcae/udce9/udca2/udc98/low_rabbitmq.py", line 109, in process1 rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) File "/app/py_code//udce5/udc85/udcb3/udce4/udcba/udc8erabbitmq/udce9/udc97/udcae/udce9/udca2/udc98/low_rabbitmq.py", line 76, in publish_message self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish mandatory, immediate) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2206, in publish immediate=immediate) File "/app/python3/lib/python3.6/site-packages/pika/channel.py", line 415, in basic_publish raise exceptions.ChannelClosed()pika.exceptions.ChannelClosedTraceback (most recent call last): File "/app/py_code//udce5/udc85/udcb3/udce4/udcba/udc8erabbitmq/udce9/udc97/udcae/udce9/udca2/udc98/low_rabbitmq.py", line 144, in <module> process1(send_serverid, recv_serverid) File "/app/py_code//udce5/udc85/udcb3/udce4/udcba/udc8erabbitmq/udce9/udc97/udcae/udce9/udca2/udc98/low_rabbitmq.py", line 109, in process1 rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) File "/app/py_code//udce5/udc85/udcb3/udce4/udcba/udc8erabbitmq/udce9/udc97/udcae/udce9/udca2/udc98/low_rabbitmq.py", line 76, in publish_message self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish mandatory, immediate) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2206, in publish immediate=immediate) File "/app/python3/lib/python3.6/site-packages/pika/channel.py", line 415, in basic_publish raise exceptions.ChannelClosed()pika.exceptions.ChannelClosedException in thread Thread-1:Traceback (most recent call last): File "/app/python3/lib/python3.6/threading.py", line 916, in _bootstrap_inner self.run() File "/app/python3/lib/python3.6/threading.py", line 864, in run self._target(*self._args, **self._kwargs) File "/app/py_code//udce5/udc85/udcb3/udce4/udcba/udc8erabbitmq/udce9/udc97/udcae/udce9/udca2/udc98/low_rabbitmq.py", line 80, in run self.channel.start_consuming() File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1822, in start_consuming self.connection.process_data_events(time_limit=None) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 749, in process_data_events self._flush_output(common_terminator) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output result.reason_text)pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead')

而這個時候你查看rabbitmq服務的日志信息,你會看到兩種情況的錯誤日志如下:

情況一:

=INFO REPORT==== 12-Oct-2018::18:32:37 ===accepting AMQP connection <0.19439.2> (192.168.90.11:42942 -> 192.168.90.11:5672)=INFO REPORT==== 12-Oct-2018::18:32:37 ===accepting AMQP connection <0.19446.2> (192.168.90.11:42946 -> 192.168.90.11:5672)=ERROR REPORT==== 12-Oct-2018::18:32:38 ===AMQP connection <0.19446.2> (running), channel 1 - error:{amqp_error,unexpected_frame,   "expected content header for class 60, got non content header frame instead",   'basic.publish'}=INFO REPORT==== 12-Oct-2018::18:32:38 ===closing AMQP connection <0.19446.2> (192.168.90.11:42946 -> 192.168.90.11:5672)=ERROR REPORT==== 12-Oct-2018::18:33:59 ===AMQP connection <0.19439.2> (running), channel 1 - error:{amqp_error,unexpected_frame,   "expected content header for class 60, got non content header frame instead",   'basic.publish'}=INFO REPORT==== 12-Oct-2018::18:33:59 ===closing AMQP connection <0.19439.2> (192.168.90.11:42942 -> 192.168.90.11:5672)

情況二:

=INFO REPORT==== 12-Oct-2018::17:41:28 ===accepting AMQP connection <0.19045.2> (192.168.90.11:33004 -> 192.168.90.11:5672)=INFO REPORT==== 12-Oct-2018::17:41:28 ===accepting AMQP connection <0.19052.2> (192.168.90.11:33008 -> 192.168.90.11:5672)=ERROR REPORT==== 12-Oct-2018::17:41:29 ===AMQP connection <0.19045.2> (running), channel 1 - error:{amqp_error,unexpected_frame,   "expected content body, got non content body frame instead",   'basic.publish'}=INFO REPORT==== 12-Oct-2018::17:41:29 ===closing AMQP connection <0.19045.2> (192.168.90.11:33004 -> 192.168.90.11:5672)=ERROR REPORT==== 12-Oct-2018::17:42:23 ===AMQP connection <0.19052.2> (running), channel 1 - error:{amqp_error,unexpected_frame,   "expected method frame, got non method frame instead",none}=INFO REPORT==== 12-Oct-2018::17:42:23 ===closing AMQP connection <0.19052.2> (192.168.90.11:33008 -> 192.168.90.11:5672)

對于這種情況我查詢了很多資料和文檔,都沒有找到一個很好的答案,查到關于這個問題的連接有:

https://stackoverflow.com/questions/49154404/pika-threaded-execution-gets-error-505-unexpected-frame

http://rabbitmq.1065348.n5.nabble.com/UNEXPECTED-FRAME-expected-content-header-for-class-60-got-non-content-header-frame-instead-td34981.html

這個問題其他人碰到的也不少,不過查了最后的解決辦法基本都是創(chuàng)建兩個rabbitmq連接,一個連接用于訂閱消息,一個連接用于發(fā)布消息,這種情況的時候,就不會出現(xiàn)上述的問題

在這個解決方法之前,我測試了用同一個連接,不同的channel,讓訂閱消息用一個channel, 發(fā)布消息用另外一個channel,但是在測試過程依然會出現(xiàn)上述的錯誤。

有點寫代碼能力了

最后我也是選擇了用兩個連接的方法解決出現(xiàn)上述的問題,現(xiàn)在是一個測試代碼例子:

#! /usr/bin/env python3# .-*- coding:utf-8 .-*-import pikaimport threadingimport jsonimport datetimeimport osfrom pika.exceptions import ChannelClosedfrom pika.exceptions import ConnectionClosed# rabbitmq 配置信息MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice"}class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self):  self.recv_serverid = ""  self.send_serverid = ""  self.exchange = MQ_CONFIG.get("exchange")  self.connection = None  self.channel = None def reconnect(self):  if self.connection and not self.connection.is_closed:   self.connection.close()  credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd"))  parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"),            credentials)  self.connection = pika.BlockingConnection(parameters)  self.channel = self.connection.channel()  self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct")  if isinstance(self, RabbitComsumer):   result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True)   queue_name = result.method.queue   self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid)   self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False)class RabbitComsumer(RabbitMQServer): def __init__(self):  super(RabbitComsumer, self).__init__() def consumer_callback(self, ch, method, properties, body):  """  :param ch:  :param method:  :param properties:  :param body:  :return:  """  ch.basic_ack(delivery_tag=method.delivery_tag)  process_id = threading.current_thread()  print("current process id is {0} body is {1}".format(process_id, body)) def start_consumer(self):  while True:   self.reconnect()   self.channel.start_consuming() @classmethod def run(cls, recv_serverid):  consumer = cls()  consumer.recv_serverid = recv_serverid  consumer.start_consumer()class RabbitPublisher(RabbitMQServer): def __init__(self):  super(RabbitPublisher, self).__init__() def start_publish(self):  self.reconnect()  i = 1  while True:   message = {"value": i}   message = dict_to_json(message)   self.channel.basic_publish(exchange=self.exchange, routing_key=self.send_serverid, body=message)   i += 1 @classmethod def run(cls, send_serverid):  publish = cls()  publish.send_serverid = send_serverid  publish.start_publish()class CJsonEncoder(json.JSONEncoder): def default(self, obj):  if isinstance(obj, datetime.datetime):   return obj.strftime('%Y-%m-%d %H:%M:%S')  elif isinstance(obj, datetime.date):   return obj.strftime("%Y-%m-%d")  else:   return json.JSONEncoder.default(self, obj)def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstrdef json_to_dict(jsonstr): if isinstance(jsonstr, bytes):  jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return dif __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 這里分別用兩個線程去連接和發(fā)送 threading.Thread(target=RabbitComsumer.run, args=(recv_serverid,)).start() threading.Thread(target=RabbitPublisher.run, args=(send_serverid,)).start() # 這里也是用兩個連接去連接和發(fā)送, threading.Thread(target=RabbitComsumer.run, args=(send_serverid,)).start() RabbitPublisher.run(recv_serverid)

上面代碼中我分別用了兩個連接去訂閱和發(fā)布消息,同時另外一對訂閱發(fā)布也是用的兩個連接來執(zhí)行訂閱和發(fā)布,這樣當再次運行程序之后,就不會在出現(xiàn)之前的問題

關于斷開重連

上面的代碼雖然不會在出現(xiàn)之前的錯誤,但是這個程序非常脆弱,當rabbitmq服務重啟或者斷開之后,程序并不會有重連接的機制,所以我們需要為代碼添加重連機制,這樣即使rabbitmq服務重啟了或者

rabbitmq出現(xiàn)異常我們的程序也能進行重連機制

#! /usr/bin/env python3# .-*- coding:utf-8 .-*-import pikaimport threadingimport jsonimport datetimeimport timefrom pika.exceptions import ChannelClosedfrom pika.exceptions import ConnectionClosed# rabbitmq 配置信息MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice"}class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self):  self.recv_serverid = ""  self.send_serverid = ""  self.exchange = MQ_CONFIG.get("exchange")  self.connection = None  self.channel = None def reconnect(self):  try:   if self.connection and not self.connection.is_closed:    self.connection.close()   credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd"))   parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"),             credentials)   self.connection = pika.BlockingConnection(parameters)   self.channel = self.connection.channel()   self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct")   if isinstance(self, RabbitComsumer):    result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True)    queue_name = result.method.queue    self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid)    self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False)  except Exception as e:   print(e)class RabbitComsumer(RabbitMQServer): def __init__(self):  super(RabbitComsumer, self).__init__() def consumer_callback(self, ch, method, properties, body):  """  :param ch:  :param method:  :param properties:  :param body:  :return:  """  ch.basic_ack(delivery_tag=method.delivery_tag)  process_id = threading.current_thread()  print("current process id is {0} body is {1}".format(process_id, body)) def start_consumer(self):  while True:   try:    self.reconnect()    self.channel.start_consuming()   except ConnectionClosed as e:    self.reconnect()    time.sleep(2)   except ChannelClosed as e:    self.reconnect()    time.sleep(2)   except Exception as e:    self.reconnect()    time.sleep(2) @classmethod def run(cls, recv_serverid):  consumer = cls()  consumer.recv_serverid = recv_serverid  consumer.start_consumer()class RabbitPublisher(RabbitMQServer): def __init__(self):  super(RabbitPublisher, self).__init__() def start_publish(self):  self.reconnect()  i = 1  while True:   message = {"value": i}   message = dict_to_json(message)   try:    self.channel.basic_publish(exchange=self.exchange, routing_key=self.send_serverid, body=message)    i += 1   except ConnectionClosed as e:    self.reconnect()    time.sleep(2)   except ChannelClosed as e:    self.reconnect()    time.sleep(2)   except Exception as e:    self.reconnect()    time.sleep(2) @classmethod def run(cls, send_serverid):  publish = cls()  publish.send_serverid = send_serverid  publish.start_publish()class CJsonEncoder(json.JSONEncoder): def default(self, obj):  if isinstance(obj, datetime.datetime):   return obj.strftime('%Y-%m-%d %H:%M:%S')  elif isinstance(obj, datetime.date):   return obj.strftime("%Y-%m-%d")  else:   return json.JSONEncoder.default(self, obj)def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstrdef json_to_dict(jsonstr): if isinstance(jsonstr, bytes):  jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return dif __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 這里分別用兩個線程去連接和發(fā)送 threading.Thread(target=RabbitComsumer.run, args=(recv_serverid,)).start() threading.Thread(target=RabbitPublisher.run, args=(send_serverid,)).start() # 這里也是用兩個連接去連接和發(fā)送, threading.Thread(target=RabbitComsumer.run, args=(send_serverid,)).start() RabbitPublisher.run(recv_serverid)

上面的代碼運行運行之后即使rabbitmq的服務出問題了,但是當rabbitmq的服務好了之后,我們的程序依然可以重新進行連接,但是上述這種實現(xiàn)方式運行了一段時間之后,因為實際的發(fā)布消息的地方的消息是從其他線程或進程中獲取的數(shù)據(jù),這個時候你可能通過queue隊列的方式實現(xiàn),這個時候你的queue中如果長時間沒有數(shù)據(jù),在一定時間之后來了數(shù)據(jù)需要發(fā)布出去,這個時候你發(fā)現(xiàn),你的程序會提示連接被rabbitmq 服務端給斷開了,但是畢竟你設置了重連機制,當然也可以重連,但是這里想想為啥會出現(xiàn)這種情況,這個時候查看rabbitmq的日志你會發(fā)現(xiàn)出現(xiàn)了如下錯誤:

=ERROR REPORT==== 8-Oct-2018::15:34:19 ===
closing AMQP connection <0.30112.1> (192.168.90.11:54960 -> 192.168.90.11:5672):
{heartbeat_timeout,running}

這是我之前測試環(huán)境的日志截取的,可以看到是因為這個錯誤導致的,后來查看pika連接rabbitmq的連接參數(shù)中有這么一個參數(shù)

python,pika模塊

這個參數(shù)默認沒有設置,那么這個heatbeat的心跳時間,默認是不設置的,如果不設置的話,就是根絕服務端設置的,因為這個心跳時間是和服務端進行協(xié)商的結果

當這個參數(shù)設置為0的時候則表示不發(fā)送心跳,服務端永遠不會斷開這個連接,所以這里我為了方便我給發(fā)布消息的線程的心跳設置為0,并且我這里,我整理通過抓包,看一下服務端和客戶端的協(xié)商過程

python,pika模塊

從抓包分析中可以看出服務端和客戶端首先協(xié)商的是580秒,而客戶端回復的是:

python,pika模塊

這樣這個連接就永遠不會斷了,但是如果我們不設置heartbeat這個值,再次抓包我們會看到如下

python,pika模塊

python,pika模塊

 

從上圖我們可以刪除最后服務端和客戶端協(xié)商的結果就是580,這樣當時間到了之后,如果沒有數(shù)據(jù)往來,那么就會出現(xiàn)連接被服務端斷開的情況了

特別注意

需要特別注意的是,經(jīng)過我實際測試python的pika==0.11.2 版本及以下版本設置heartbeat的不生效的,只有0.12.0及以上版本設置才能生效

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對VEVB武林網(wǎng)的支持。


注:相關教程知識閱讀請移步到python教程頻道。
發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 康马县| 瓦房店市| 若羌县| 凌源市| 普格县| 攀枝花市| 顺义区| 林周县| 丰县| 禹州市| 莎车县| 正安县| 潮州市| 封丘县| 紫阳县| 襄樊市| 华坪县| 宜章县| 凤台县| 景宁| 台湾省| 石家庄市| 望城县| 海晏县| 滨州市| 红河县| 满洲里市| 南川市| 中西区| 西城区| 郎溪县| 惠安县| 克东县| 西和县| 桃源县| 玉田县| 临沂市| 雷州市| 榆林市| 乐都县| 眉山市|