Score:1

ไม่มีการเชื่อมต่อกับ Kafka จากลูกค้า Faust

ธง pe

ฉันมีปัญหาในการเชื่อมต่อกับเครื่องที่ใช้ Kafka จากไคลเอ็นต์ที่เรียกใช้สคริปต์ Faust สคริปต์มีลักษณะดังนี้:

นำเข้า faust
บันทึกการนำเข้า
จาก asyncio นำเข้าโหมดสลีป


การทดสอบในชั้นเรียน (faust.Record):
    ข้อความ: str


app = faust.App('myapp', นายหน้า='kafka://10.0.0.20:9092')
หัวข้อ = app.topic('ทดสอบ', value_type=Test)


@app.agent(หัวข้อ)
async def สวัสดี (ข้อความ):
    async สำหรับข้อความในข้อความ:
        พิมพ์ (f'ได้รับ {message.msg}')


@app.timer(ช่วงเวลา=5.0)
async def example_sender():
    รอสวัสดีส่ง (
        ค่า=ทดสอบ(msg='สวัสดีชาวโลก!'),
    )


ถ้า __name__ == '__main__':
    app.main()

เมื่อฉันรันสคริปต์:

# faust -A myapp worker -l ข้อมูล
âÆaµSâ v0.8.1ââ¬ââââââââââââ ว âââââââââââââââââ âââââ
â รหัส â myapp â
âการขนส่ง â [URL('kafka://10.0.0.20:9092')] â
â จัดเก็บ â หน่วยความจำ: â
â เว็บ â http://hubbabubba:6066 â
â บันทึก â -stderr- (ข้อมูล) â
pid â 260765 â
â ชื่อโฮสต์ â hubbabubba â
â แพลตฟอร์ม â CPython 3.8.10 (Linux x86_64) â
â ไดรเวอร์ â â
â ขนส่ง â aiokafka=0.7.2 â
â เว็บ â aiohttp=3.8.1 â
â datadir â /Git/faust-kafka/myapp-ข้อมูล â
â appdir â /Git/faust-kafka/myapp-data/v1 â
âââââââââââââââ´ââ âââââââââââââââââ ว âââââââââââââââ
[2022-01-28 13:09:57,018] [260765] [INFO] [^คนทำงาน]: กำลังเริ่ม... 
[2022-01-28 13:09:57,021] [260765] [INFO] [^-App]: กำลังเริ่ม... 
[2022-01-28 13:09:57,021] [260765] [INFO] [^--Monitor]: กำลังเริ่ม... 
[2022-01-28 13:09:57,021] [260765] [INFO] [^--Producer]: กำลังเริ่ม... 
[2022-01-28 13:09:57,022] [260765] [INFO] [^---ProducerBuffer]: กำลังเริ่ม... 
[2022-01-28 13:09:57,024] [260765] [ข้อผิดพลาด] ไม่สามารถเชื่อมต่อกับ "10.0.0.20:9092": [Errno 113] เชื่อมต่อการโทรล้มเหลว ('10.0.0.20', 9092) 
[2022-01-28 13:09:57,025] [260765] [ข้อผิดพลาด] [^คนทำงาน]: ข้อผิดพลาด: KafkaConnectionError("ไม่สามารถบู๊ตสแตรปจาก [('10.0.0.20', 9092, <AddressFamily.AF_INET: 2>) ]") 
Traceback (การโทรครั้งล่าสุดล่าสุด):
  ไฟล์ "/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/worker.py", บรรทัด 276 ใน execute_from_commandline
    self.loop.run_until_complete(ตัวเอง._starting_fut)
  ไฟล์ "/usr/lib/python3.8/asyncio/base_events.py", บรรทัด 616, ใน run_until_complete
    ส่งคืน future.result()
  ไฟล์ "/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/services.py", บรรทัด 759 ในตอนเริ่มต้น
    รอ self._default_start()
  ไฟล์ "/media/eric/DISK3/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/services.py", บรรทัด 766 ใน _default_start
    รอ self._actually_start()...
  ไฟล์ "/Git/faust-kafka/venv/lib/python3.8/site-packages/aiokafka/client.py", บรรทัด 249 ใน bootstrap
    เพิ่ม KafkaConnectionError(
kafka.errors.KafkaConnectionError: KafkaConnectionError: ไม่สามารถบู๊ตสแตรปจาก [('10.0.0.20', 9092, <AddressFamily.AF_INET: 2>)]
[2022-01-28 13:09:57,027] [260765] [INFO] [^คนทำงาน]: กำลังหยุด... 
[2022-01-28 13:09:57,027] [260765] [INFO] [^-App]: กำลังหยุด... 
[2022-01-28 13:09:57,027] [260765] [INFO] [^-App]: ล้างบัฟเฟอร์ตัวสร้าง... 
[2022-01-28 13:09:57,028] [260765] [INFO] [^--TableManager]: กำลังหยุด... 
[2022-01-28 13:09:57,028] [260765] [INFO] [^---Fetcher]: กำลังหยุด... 
[2022-01-28 13:09:57,028] [260765] [INFO] [^---Conductor]: กำลังหยุด... 
[2022-01-28 13:09:57,028] [260765] [INFO] [^--AgentManager]: กำลังหยุด... 
[2022-01-28 13:09:57,029] [260765] [INFO] [^Agent: myapp.hello]: กำลังหยุด... 
[2022-01-28 13:09:57,029] [260765] [INFO] [^--ReplyConsumer]: กำลังหยุด... 
[2022-01-28 13:09:57,029] [260765] [INFO] [^--LeaderAssignor]: กำลังหยุด... 
[2022-01-28 13:09:57,029] [260765] [INFO] [^--Consumer]: กำลังหยุด... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^--เว็บ]: กำลังหยุด... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^--CacheBackend]: กำลังหยุด... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^--Producer]: กำลังหยุด... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^---ProducerBuffer]: กำลังหยุด... 
[2022-01-28 13:09:57,031] [260765] [INFO] [^--Monitor]: กำลังหยุด... 
[2022-01-28 13:09:57,032] [260765] [INFO] [^Worker]: รวบรวมงานบริการ... 
[2022-01-28 13:09:57,032] [260765] [INFO] [^คนทำงาน]: รวบรวมอนาคตทั้งหมด... 
[2022-01-28 13:09:58,033] [260765] [INFO] [^Worker]: ปิดการวนรอบเหตุการณ์

Kafka (v.2.8.1) ทำงานบน 10.0.0.20 พอร์ต 9092 การกำหนดค่า Kafka มีลักษณะดังนี้:

# ได้รับอนุญาตจาก Apache Software Foundation (ASF) ภายใต้หนึ่งรายการขึ้นไป
# ข้อตกลงใบอนุญาตผู้ร่วมให้ข้อมูล ดูไฟล์ประกาศที่แจกจ่ายด้วย
# งานนี้สำหรับข้อมูลเพิ่มเติมเกี่ยวกับการเป็นเจ้าของลิขสิทธิ์
# ASF ให้สิทธิ์ใช้งานไฟล์นี้แก่คุณภายใต้ Apache License เวอร์ชัน 2.0
# ("ใบอนุญาต"); คุณไม่สามารถใช้ไฟล์นี้ได้เว้นแต่จะเป็นไปตาม
#ใบอนุญาติ. คุณสามารถขอรับสำเนาใบอนุญาตได้ที่
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# เว้นแต่จะกำหนดโดยกฎหมายที่ใช้บังคับหรือตกลงเป็นลายลักษณ์อักษร ซอฟต์แวร์
# เผยแพร่ภายใต้ใบอนุญาตเผยแพร่บนพื้นฐาน "ตามที่เป็น"
#โดยไม่มีการรับประกันหรือเงื่อนไขใดๆ ทั้งสิ้น ไม่ว่าโดยชัดแจ้งหรือโดยปริยาย
# ดูใบอนุญาตสำหรับภาษาเฉพาะที่ควบคุมการอนุญาตและ
#ข้อ จำกัด ภายใต้ใบอนุญาต

# โปรดดูที่ kafka.server.KafkaConfig สำหรับรายละเอียดเพิ่มเติมและค่าเริ่มต้น

############################# พื้นฐานของเซิร์ฟเวอร์ ################### ##########

# ID ของนายหน้า ต้องตั้งค่าเป็นจำนวนเต็มเฉพาะสำหรับแต่ละโบรกเกอร์
นายหน้า.id=0

############################### การตั้งค่าเซิร์ฟเวอร์ซ็อกเก็ต ################## ###########

# ที่อยู่ซ็อกเก็ตเซิร์ฟเวอร์รับฟัง จะได้ค่าที่คืนมาจาก 
# java.net.InetAddress.getCanonicalHostName() หากไม่ได้กำหนดค่า
#   รูปแบบ:
# ผู้ฟัง = ผู้ฟัง_ชื่อ://โฮสต์_ชื่อ:พอร์ต
#   ตัวอย่าง:
# ผู้ฟัง = PLAINTEXT://your.host.name:9092
ผู้ฟัง=PLAINTEXT://:9092

# ชื่อโฮสต์และพอร์ตนายหน้าจะโฆษณาไปยังผู้ผลิตและผู้บริโภค หากไม่ได้ตั้งค่า 
# มันใช้ค่าสำหรับ "ผู้ฟัง" หากกำหนดค่า มิฉะนั้นจะใช้ค่า
# ส่งคืนจาก java.net.InetAddress.getCanonicalHostName()
โฆษณา.listeners=PLAINTEXT://localhost:9092

# จับคู่ชื่อผู้ฟังกับโปรโตคอลความปลอดภัย ค่าเริ่มต้นคือให้เหมือนกัน ดูเอกสารการกำหนดค่าสำหรับรายละเอียดเพิ่มเติม
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# จำนวนเธรดที่เซิร์ฟเวอร์ใช้สำหรับรับคำขอจากเครือข่ายและส่งการตอบกลับไปยังเครือข่าย
num.network.threads=3

# จำนวนเธรดที่เซิร์ฟเวอร์ใช้สำหรับการประมวลผลคำขอ ซึ่งอาจรวมถึงดิสก์ I/O
num.io.threads=8

# บัฟเฟอร์การส่ง (SO_SNDBUF) ที่ใช้โดยเซิร์ฟเวอร์ซ็อกเก็ต
socket.send.buffer.bytes=102400

# บัฟเฟอร์รับ (SO_RCVBUF) ที่ใช้โดยเซิร์ฟเวอร์ซ็อกเก็ต
socket.receive.buffer.bytes=102400

# ขนาดสูงสุดของคำขอที่เซิร์ฟเวอร์ซ็อกเก็ตจะยอมรับ (ป้องกัน OOM)
socket.request.max.bytes=104857600


############################# ข้อมูลพื้นฐานเกี่ยวกับล็อก ################### ##########

# รายการที่คั่นด้วยเครื่องหมายจุลภาคของไดเร็กทอรีที่ใช้จัดเก็บไฟล์บันทึก
log.dirs=/tmp/kafka-logs

# จำนวนพาร์ติชันบันทึกเริ่มต้นต่อหัวข้อ พาร์ติชันเพิ่มเติมช่วยให้ได้มากขึ้น
# ความขนานสำหรับการบริโภค แต่จะส่งผลให้มีไฟล์มากขึ้น
#นายหน้า.
num.partitions=1

# จำนวนเธรดต่อไดเร็กทอรีข้อมูลที่จะใช้สำหรับการกู้คืนบันทึกเมื่อเริ่มต้นและล้างข้อมูลเมื่อปิดระบบ
# แนะนำให้เพิ่มค่านี้สำหรับการติดตั้งที่มี data dirs อยู่ในอาร์เรย์ RAID
num.recovery.threads.per.data.dir=1

############################### การตั้งค่าหัวข้อภายใน ################## ###########
# ปัจจัยการจำลองสำหรับหัวข้อภายในของข้อมูลเมตาของกลุ่ม "__consumer_offsets" และ "__transaction_state"
# สำหรับสิ่งอื่นใดนอกเหนือจากการทดสอบการพัฒนา แนะนำให้ใช้ค่าที่มากกว่า 1 เพื่อให้แน่ใจว่ามีความพร้อมใช้งาน เช่น 3
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################### นโยบายการล้างบันทึก ################## ###########

# ข้อความจะถูกเขียนไปยังระบบไฟล์ทันที แต่โดยค่าเริ่มต้น เราจะซิงค์เฉพาะ fsync() เท่านั้น
# แคช OS อย่างเกียจคร้าน การกำหนดค่าต่อไปนี้ควบคุมการล้างข้อมูลไปยังดิสก์
# มีการแลกเปลี่ยนที่สำคัญบางประการที่นี่:
# 1. ความทนทาน: ข้อมูลที่ไม่ถูกล้างอาจสูญหายหากคุณไม่ได้ใช้การจำลองแบบ
# 2. เวลาแฝง: ช่วงเวลาการล้างที่มากอาจทำให้เวลาแฝงพุ่งสูงขึ้นเมื่อการล้างเกิดขึ้น เนื่องจากจะมีข้อมูลจำนวนมากที่ต้องล้าง
# 3. ทรูพุต: โดยทั่วไป การล้างข้อมูลเป็นการดำเนินการที่แพงที่สุด และช่วงล้างข้อมูลเพียงเล็กน้อยอาจนำไปสู่การค้นหาที่มากเกินไป
# การตั้งค่าด้านล่างอนุญาตให้กำหนดค่านโยบายการล้างเพื่อล้างข้อมูลหลังจากช่วงเวลาหนึ่งหรือ
# ทุก N ข้อความ (หรือทั้งสองอย่าง) ซึ่งสามารถทำได้ทั่วโลกและลบล้างเป็นรายหัวข้อ

# จำนวนข้อความที่จะยอมรับก่อนที่จะบังคับให้ล้างข้อมูลลงดิสก์
#log.flush.interval.messages=10000

# ระยะเวลาสูงสุดที่ข้อความสามารถอยู่ในบันทึกก่อนที่เราจะบังคับให้ล้างข้อมูล
#log.flush.interval.ms=1000

############################# นโยบายการเก็บรักษาบันทึก ################## ###########

# การกำหนดค่าต่อไปนี้ควบคุมการกำจัดส่วนบันทึก กรมธรรม์สามารถ
# ถูกตั้งค่าให้ลบส่วนหลังจากช่วงระยะเวลาหนึ่งหรือหลังจากสะสมขนาดที่กำหนด
# กลุ่มจะถูกลบเมื่อใดก็ตามที่เป็นไปตามเกณฑ์เหล่านี้ * อย่างใดอย่างหนึ่ง * การลบเกิดขึ้นเสมอ
#จากท้ายบันทึก.

# อายุขั้นต่ำของไฟล์บันทึกที่จะมีสิทธิ์ถูกลบเนื่องจากอายุ
log.retention.hours=168

# นโยบายการเก็บรักษาตามขนาดสำหรับบันทึก ส่วนจะถูกตัดออกจากบันทึกเว้นแต่จะเหลือ
# ส่วนลดลงด้านล่าง log.retention.bytes ฟังก์ชันเป็นอิสระจาก log.retention.hours
#log.retention.bytes=1073741824

# ขนาดสูงสุดของไฟล์ส่วนบันทึก เมื่อถึงขนาดนี้ จะมีการสร้างส่วนบันทึกใหม่
log.segment.bytes=1073741824

# ช่วงเวลาที่ส่วนบันทึกถูกตรวจสอบเพื่อดูว่าสามารถลบได้หรือไม่
#นโยบายการเก็บรักษา
log.retention.check.interval.ms=300000

############################# คนดูแลสวนสัตว์ #################### #########

# สตริงการเชื่อมต่อผู้ดูแลสวนสัตว์ (ดูรายละเอียดเอกสารผู้ดูแลสวนสัตว์)
# นี่คือคู่โฮสต์:พอร์ตที่คั่นด้วยเครื่องหมายจุลภาค แต่ละคู่สอดคล้องกับ zk
#เซิฟเวอร์. เช่น. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
# คุณยังสามารถเพิ่มสตริง chroot ที่เป็นทางเลือกต่อท้าย URL เพื่อระบุ
# ไดเรกทอรีรากสำหรับ kafka znodes ทั้งหมด
Zookeeper.connect=localhost:2181

# หมดเวลาในหน่วยมิลลิวินาทีสำหรับการเชื่อมต่อกับผู้ดูแลสวนสัตว์
Zookeeper.connection.timeout.ms=18000


############################# การตั้งค่าผู้ประสานงานกลุ่ม ################## ###########

# การกำหนดค่าต่อไปนี้ระบุเวลาเป็นมิลลิวินาทีที่ GroupCoordinator จะชะลอการปรับสมดุลผู้บริโภคเริ่มต้น
# การปรับสมดุลจะล่าช้าออกไปอีกตามค่า group.initial.rebalance.delay.ms เมื่อสมาชิกใหม่เข้าร่วมกลุ่ม สูงสุดไม่เกิน max.poll.interval.ms
# ค่าเริ่มต้นสำหรับสิ่งนี้คือ 3 วินาที
# เราแทนที่สิ่งนี้เป็น 0 เนื่องจากทำให้ประสบการณ์การพัฒนาและการทดสอบนอกกรอบดีขึ้น
# อย่างไรก็ตาม ในสภาพแวดล้อมการผลิต ค่าเริ่มต้น 3 วินาทีจะเหมาะสมกว่า เนื่องจากจะช่วยหลีกเลี่ยงการปรับสมดุลที่ไม่จำเป็นและอาจมีราคาแพงระหว่างการเริ่มต้นแอปพลิเคชัน
group.initial.rebalance.delay.ms=0

นายหน้าคาฟคาเริ่มต้นด้วย:

$ sudo bin/kafka-server-start.sh -daemon config/server.properties 

ฉันได้รับหัวข้อไปด้วย:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic test

ฉันจะตรวจสอบกับ:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
ทดสอบ

ฉันเลยสงสัยว่าฉันทำพลาดตรงไหน BTW: เซิร์ฟเวอร์สามารถเข้าถึงได้จากเครื่องไคลเอ็นต์:

$ ping -c 5 10.0.0.20 -p 9092
รูปแบบ: 0x9092
PING 10.0.0.20 (10.0.0.20) 56(84) ไบต์ของข้อมูล
64 ไบต์จาก 10.0.0.20: icmp_seq=1 ttl=64 เวลา=0.468 ms
64 ไบต์จาก 10.0.0.20: icmp_seq=2 ttl=64 เวลา=0.790 ms
64 ไบต์จาก 10.0.0.20: icmp_seq=3 ttl=64 เวลา=0.918 ms
64 ไบต์จาก 10.0.0.20: icmp_seq=4 ttl=64 เวลา=0.453 ms
64 ไบต์จาก 10.0.0.20: icmp_seq=5 ttl=64 เวลา=0.827 ms

--- สถิติ ping 10.0.0.20 ---
ส่ง 5 แพ็กเก็ต ได้รับ 5 แพ็กเก็ต สูญเสียแพ็กเก็ต 0% เวลา 4095ms
rtt นาที/เฉลี่ย/สูงสุด/mdev = 0.453/0.691/0.918/0.192 มิลลิวินาที
Score:1
ธง cn

สิ่งนี้ดูผิดสำหรับฉัน เพราะมันบ่งบอกว่าไคลเอนต์ระยะไกลของคุณกำลังพยายามเชื่อมต่อ โลคัลโฮสต์ เมื่อมีการพูดคุยกับเซิร์ฟเวอร์ bootstrap ไม่ใช่ที่อยู่ระยะไกลของอินสแตนซ์ kafka ของคุณ:

โฆษณา.listeners=PLAINTEXT://localhost:9092

ฉันจะเปลี่ยนเป็น IP ภายนอก (10.x.x.x) ของอินสแตนซ์ kafka ของคุณ รีสตาร์ททุกอย่างแล้วลองอีกครั้ง

ElToro1966 avatar
pe flag
ตั้งค่า ads.listeners เป็น 10.0.0.20 และเพิ่มพอร์ต 9092 ลงในพอร์ตที่อนุญาต เริ่มต้นใหม่ทุกอย่าง ได้ผล! ขอบคุณ.

โพสต์คำตอบ

คนส่วนใหญ่ไม่เข้าใจว่าการถามคำถามมากมายจะปลดล็อกการเรียนรู้และปรับปรุงความสัมพันธ์ระหว่างบุคคล ตัวอย่างเช่น ในการศึกษาของ Alison แม้ว่าผู้คนจะจำได้อย่างแม่นยำว่ามีคำถามกี่ข้อที่ถูกถามในการสนทนา แต่พวกเขาไม่เข้าใจความเชื่อมโยงระหว่างคำถามและความชอบ จากการศึกษาทั้ง 4 เรื่องที่ผู้เข้าร่วมมีส่วนร่วมในการสนทนาด้วยตนเองหรืออ่านบันทึกการสนทนาของผู้อื่น ผู้คนมักไม่ตระหนักว่าการถามคำถามจะมีอิทธิพลหรือมีอิทธิพลต่อระดับมิตรภาพระหว่างผู้สนทนา