博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【kafka】celery与kafka的联用问题
阅读量:6991 次
发布时间:2019-06-27

本文共 1598 字,大约阅读时间需要 5 分钟。

背景:一个小应用,用celery下发任务,任务内容为kafka生产一些数据。

问题:使用confluent_kafka模块时,单独启用kafka可以正常生产消息,但是套上celery后,kafka就无法将新消息生产到topic队列中了。

解决:换了个pykafka模块,结果问题就没有了。

 

我很疑惑啊,是我调用confluent_kafka的方法不对吗,怎么套上celery就不行了呢?

 

可以用的pykafka代码:

tasks.py

from celery import Celeryfrom pykafka import KafkaClientimport jsonapp = Celery('tasks', backend='amqp', broker='amqp://xxx:xxxxxx@localhost/xxxhost')@app.taskdef produce():    client = KafkaClient(hosts="localhost:9092")    print client.topics    topic = client.topics['test_result']    with topic.get_sync_producer() as producer:        for i in range(3):            data = {
"info": {
"ip": "1.2.3.4", "port": i}, "type": "test", "task_id": "test_celery_kafka"} print('Producing message: %s' % data) producer.produce(json.dumps(data)) print "finish produce" producer.stop() print "stop"

run_worker.py

from tasks import producefor i in range(1000):    result = produce.delay()    print result.status

 

 

无法正常生产数据的confluent_kafka代码:

tasks.py

from celery import Celeryfrom kafka_producer import pimport jsonapp = Celery('tasks', backend='amqp', broker='amqp://xxx:xxxxxx@localhost/xxxhost')@app.taskdef produce():    for i in range(3000):        data = {
"info": {
"ip": "1.2.3.4"}, "type": "test", "task_id": "test_celery_kafka"} print('Producing message: %s' % data) p.produce('test_result3', json.dumps(data)) print "finish produce" p.flush() print "finish flush"

run_worker.py

from tasks import produceresult = produce.delay()print result.statusprint result.ready()print result.get()print result.status

 

转载地址:http://qjbvl.baihongyu.com/

你可能感兴趣的文章
Centos7中安装最新版maven3.5.0
查看>>
python学习之老男孩python全栈第九期_数据库day003 -- 作业
查看>>
深度优先遍历
查看>>
常用类型转换 一.常用int和string类型转换
查看>>
Ext Js简单Grid分页及选择器的使用
查看>>
slice 定义和用法
查看>>
分类游戏 结构体
查看>>
导出、恢复、上传镜像
查看>>
java第一个程序提示找不到符号-System.out.printIn
查看>>
LineageOS源码定制手机系统
查看>>
flask怎样获取authorization
查看>>
Python3 Selenium自动化web测试 ==> 第六节 WebDriver高级应用 -- 操作web页面的滚动条...
查看>>
HTMl5的sessionStorage和localStorage的一些区别
查看>>
Find Minimum in Rotated Sorted Array
查看>>
Android Studio模拟器的问题及解决办法
查看>>
实现Android ListView 自动加载更多内容
查看>>
高淇Struts2.0教程之视频笔记(6)
查看>>
python二进制读写文件
查看>>
sql server 高可用性技术总结
查看>>
近阶段学习总结(EasyUI的使用)
查看>>