franz/producer

Types

A builder for creating and configuring a Kafka producer.

pub opaque type Builder

Wrapper type for message offsets in callbacks.

pub type CbOffset {
  CbOffset(Int)
}

Constructors

  • CbOffset(Int)

Wrapper type for partition numbers in callbacks.

pub type CbPartition {
  CbPartition(Int)
}

Constructors

  • CbPartition(Int)

Different strategies for partitioning messages across Kafka partitions.

pub type Partitioner {
  PartitionFun(
    fn(String, Int, BitArray, BitArray) -> Result(Int, Nil),
  )
  Random
  Hash
}

Constructors

  • PartitionFun(
      fn(String, Int, BitArray, BitArray) -> Result(Int, Nil),
    )

    Partititoner function with Topic, PartitionCount, Key and Value that should return a partition number.

  • Random

    Random partitioner.

  • Hash

    Hash partititoner

Specifies how to select the partition for produced messages.

pub type ProducerPartition {
  Partition(Int)
  Partitioner(Partitioner)
}

Constructors

The value to be produced to Kafka, optionally with headers and timestamp.

pub type Value {
  Value(value: BitArray, headers: List(#(String, String)))
  ValueWithTimestamp(
    value: BitArray,
    timestamp: Int,
    headers: List(#(String, String)),
  )
}

Constructors

  • Value(value: BitArray, headers: List(#(String, String)))
  • ValueWithTimestamp(
      value: BitArray,
      timestamp: Int,
      headers: List(#(String, String)),
    )

Values

pub fn new(client: franz.Client, topic: String) -> Builder

Creates a new producer builder with the given Franz client and topic.

pub fn produce_cb(
  client client: franz.Client,
  topic topic: String,
  partition partition: ProducerPartition,
  key key: BitArray,
  value value: Value,
  callback callback: fn(CbPartition, CbOffset) -> any,
) -> Result(ProducerPartition, franz.FranzError)

This function will await the kafka ack before returning and calling the callback. A producer for the particular topic has to be already started (by calling producer.start()), unless you have specified AutoStartProducers(True) when starting the client. Returns the partititon of the produced message. The callback expects the partition and offset of the produced message.

pub fn produce_no_ack(
  client client: franz.Client,
  topic topic: String,
  partition partition: ProducerPartition,
  key key: BitArray,
  value value: Value,
) -> Result(Nil, franz.FranzError)

Produce one or more messages. A producer for the particular topic has to be already started (by calling producer.start()), unless you have specified AutoStartProducers(True) when starting the client.

pub fn produce_sync(
  client client: franz.Client,
  topic topic: String,
  partition partition: ProducerPartition,
  key key: BitArray,
  value value: Value,
) -> Result(Nil, franz.FranzError)

This function will await the kafka ack before returning. A producer for the particular topic has to be already started (by calling producer.start()), unless you have specified AutoStartProducers(True) when starting the client.

pub fn produce_sync_offset(
  client client: franz.Client,
  topic topic: String,
  partition partition: ProducerPartition,
  key key: BitArray,
  value value: Value,
) -> Result(Int, franz.FranzError)

This function will await the kafka ack before returning. A producer for the particular topic has to be already started (by calling producer.start()), unless you have specified AutoStartProducers(True) when starting the client. This function will return the offset of the produced message.

pub fn start(builder: Builder) -> Result(Nil, franz.FranzError)

Start a producer with the given configuration. A producer for the particular topic has to be already started (by calling producer.start()), unless you have specified AutoStartProducers(True) when starting the client.

pub fn with_config(
  builder: Builder,
  config: producer_config.ProducerConfig,
) -> Builder

Add a producer configuration to the producer builder.

Search Document